본문 바로가기
Language/Java

[Java] 데이터 병렬 처리(Java 8)

by 기몬식 2024. 1. 8.

Java 8

자바 8은 스트림 객체와 람다 표현식과 같은 Feature들을 기반으로 함수형 프로그래밍 스타일을 지원합니다. 병렬 스트림은 스트림 API를 기반으로 하여 작성되어 훨씬 편리하게 사용할 수 있으며 람다 표현식 및 함수형 인터페이스를 활용하여 코드를 간결하게 작성할 수 있습니다. 자바에서 지원해주는 병렬 스트림에 대해서 알아 보겠습니다.

병렬 스트림

출처


병렬 스트림은 내부적으로 스트림의 요소를 여러 청크(chunk)로 Fork/Join 프레임워크를 사용하여 분할하고 이 청크들을 각각의 스레드에서 병렬로 처리하는 방식으로 동작합니다. 여러 청크로 분할하여 각 스레드가 독립적으로 작업을 수행하고 그 결과를 나중에 모아서 최종 결과를 생성함으로써 멀티코어 프로세서를 활용하여 작업을 효율적으로 분산시키고 성능을 향상시킬 수 있습니다. 사용 방법도 비교적 간단합니다. 특정 컬렉션의 요소를 병렬로 작업을 하기 위해 병렬 스트림을 생성하는 코드는 다음과 같습니다.


 @Test
 void testCreateParallelStream() throws Exception {
     List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0);

     Stream<Integer> stream = list.parallelStream();
 }

Collection 또는 Stream 과 같은 클래스는 대부분의 정적 스트림 메소드를 통해 스트림 객체 생성할 수 있는데 내부적으로는 StreamSupport라는 스트림을 생성하고 조작하기 위한 저수준 유틸리티 클래스를 이용합니다.


public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
  Objects.requireNonNull(spliterator);
  return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel);

해당 메소드는 병렬 스트림 생성 여부에 대한 boolean 의 플래그가 설정되어 있어 플래그 값을 통해 Spliterator 에서 새로운 순차 스트림 또는 병렬 스트림을 생성합니다.

Spliterator

Spliterator 란 '분할할 수 있는 반복자'라는 뜻으로 Iterator 처럼 요소 탐색 기능을 제공하지만 병렬 작업에 특화되어 있다는 점이 큰 차이점입니다. 개발자가 명시적으로 데이터를 분할하는 작업을 수행하지 않아도 되는 이유는 Spliterator 가 내부적으로 데이터를 청크 단위로 분할하기 때문입니다.


public interface Spliterator<T> {

    boolean tryAdvance(Consumer<? super T> action);

    default void forEachRemaining(Consumer<? super T> action) {
        do { } while (tryAdvance(action));
    }

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();

병렬 스트림은 각 자료구조에 맞는 Spliterator 가 구현되어 있기 때문에 분할에 관한 구현을 안 해도 병렬로 수행이 가능합니다. 주요 메소드에 대한 설명은 다음과 같습니다.


  • tryAdvance: 나머지 요소가 존재하는 경우 해당 요소에 대해 지정된 작업을 수행하고 true를 반환합니다. 그렇지 않으면 false를 반환합니다.
  • forEachRemaining: 모든 요소가 처리되거나 작업에서 예외가 발생할 때까지 현재 스레드에서 순차적으로 나머지 각 요소에 대해 지정된 작업을 수행합니다.
  • trySplit: Spliterator가 더 분할될 수 있는 경우 요소를 포함한 Spliterator 를 반환합니다. 현재 실행 중인 Spliterator 는 포함되지 않습니다.
  • estimateSize: forEachRemaining 순회에서 발견된 요소 수의 추정치를 반환하거나 무한하거나 알 수 없거나 계산 비용이 너무 비싼 경우 Long.MAX_VALUE를 반환합니다.
  • getExactSizeIfKnown: 해당 Spliterator의 발견된 요소 수의 추정치를 반환하거나 -1을 반환하는 편의 메소드입니다.
  • characteristics: Spliterator 객체에 포함된 모든 특성 값의 합을 반환합니다. 요소의 타입에 따라 특성 값이 다르기 때문에 값이나 내부 동작 방식이 상이할 수 있습니다.

Characteristic

특성은 단어 그대로 Spliterator 의 요소가 가지는 특성을 정의한 상수들을 비트로 표현한 것으로 Spliterator 동작 방식에 대한 정보를 제공합니다.


  1. ORDERED(0x00000010)
    • 요소들이 순서를 가지는 경우에 해당합니다. 이 Spliterator 는 trySplit 메소드가 요소의 엄격한 접두사(strict prefix of elements)를 분할하고 tryAdvance 메소드가 접두사 순서에 따라 한 요소씩 진행하며 순서에 따라 작업을 수행하도록 보장합니다. List 와 같은 순서가 있는 컬렉션의 Spliterator 가 포함됩니다.
  2. DISTINCT(0x00000001)
    • 요소들이 서로 다른(unique) 경우에 해당합니다. 즉 발견된 각 요소 쌍에 대해 x, y, !x.equals(y)를 나타내는 특성 값입니다. Set 과 같이 중복된 요소가 없는 경우에 이 특성을 사용할 수 있습니다.
  3. SORTED(0x00000004)
    • 요소들이 정렬되어 있는 경우에 해당합니다. 사용되는 요소가 Comparable 인터페이스를 구현하고 있어야하며 ORDERED 특성 값도 가지고 있어야합니다.
  4. SIZED(0x00000040)
    • Spliterator가 정확한 크기 정보를 제공할 수 있는 경우에 해당합니다. Spliterator 의 순회 또는 분할 이전에 반환되는 estimateSize 값이 요소 수의 정확한 개수를 나타내는 유한 크기를 나타낼 수 있을 경우 사용합니다. 배열이나 컬렉션과 같이 크기를 알 수 있는 경우에 사용할 수 있습니다.
  5. SUBSIZED(0x00000080)
    • Spliterator가 SIZED 특성을 가질 때 특정 범위의 요소를 나타낼 수 있는 경우에 해당합니다.
  6. IMMUTABLE(0x00000400)
    • 요소 구조적으로 변경될 수 없을 경우에 해당합니다. 즉 요소를 추가, 교체 또는 제거할 수 없으므로 순회 중에 변경이 발생할 수 없습니다.
  7. CONCURRENT(x00001000)
    • 외부 동기화 없이 여러 스레드에 의해 요소가 안전하게 동시에 수정할 수 있는 경우에 해당합니다. 최상위 Spliterator는 CONCURRENT와 IMMUTABLE 의 상호 배타적인 속성 때문에 동시에 사용할 수 없습니다. 또한 SIZED도 해당하는데 고정된 크기가 알려진 경우 순회 중에 수정되어 변경될 수 있기 때문입니다.

Fork/Join Framework

병렬 스트림은 Spliterator에 의해 분리된 청크들을 Fork/Join Framework 를 사용하여 작업을 나누어 처리합니다. Fork/Join Framework에 대한 글은 해당 페이지에서 내용을 확인할 수 있기 때문에 넘어가겠습니다. 다만 Fork/Join Framework 에서 태스크를 분할하는 과정에 대한 추가 설명을 작성하고자 합니다.

출처


ExecutorService 의 구현체인 ForkJoinPool은 다른 종류의 구현체들과는 다르게 Work-Stealing 메커니즘을 사용합니다. CPU의 유휴 상태를 최소화 하고자 최대한 많은 서브 테스크를 생성하는 기법으로 모든 스레드를 공정하게 분할하는 것을 목표로합니다. 각각의 스레드는 자신에게 할당된 테스크를 포함하는 이중 연결리스트를 참조하여 작업이 끝날 때마다 큐의 헤드에서 다른 테스크를 가져와 작업을 처리합니다. 한 스레드는 다른 스레드보다 빠르게 작업을 처리하게 되면 다른 스레드 큐의 꼬리(tail)에서 작업을 훔쳐옵니다.


오탈자 및 오류 내용을 댓글 또는 메일로 알려주시면, 검토 후 조치하겠습니다.

'Language > Java' 카테고리의 다른 글

[Java] Native Method  (0) 2024.06.15
[Java] 동기화  (0) 2024.03.13
[Java] 데이터 병렬 처리(Java 5, 7)  (1) 2023.12.31
[Library] Assertions  (1) 2023.11.27
[Java] JPMS(Java 9 Platform Module System)  (1) 2023.10.23