본문 바로가기
Language/Java

[Java] 데이터 병렬 처리(Java 5, 7)

by 기몬식 2023. 12. 31.

병렬 처리

병렬 처리(Parallel Operation)란 멀티 코어 환경에서 하나의 큰 작업을 여러 개의 작은 작업으로 나누어 각각의 코어가 병렬적으로 작업을 처리하는 것을 의미합니다. 기존의 직렬 처리(Sequential Processing)에서는 작업이 순차적으로 진행되어 한 번에 하나의 작업만 처리할 수 있었지만 병렬 처리는 여러 작업을 동시에 실행함으로써 전체적인 성능을 향상시킬 수 있습니다.

동시성(Concurrency)과 병렬성(Parallelism)

출처

동시성은 한 번에 많은 일을 처리하는 것이고 병렬성은 많은 일을 동시에 수행하는 것 입니다.
-롭 파이크(Robert Pike)-

동시성

동시성은 여러 작업이 시간의 흐름 상에서 서로 겹치게 실행되는 개념입니다. 즉 동시성이란 독립적인 작업을 작은 단위의 연산으로 나누어 동시에 실행하는 것처럼 보이게하여 유휴 시간(Idle Time)을 최소화 하는 구조나 개념을 의미합니다. 여기서 유휴 시간이란 시스템이 현재 작업을 수행하지 않고 대기하거나 아무런 동작을 하지 않는 시간을 말합니다.

위의 설명과 같이 동시성은 CPU가 작업마다 시간을 분할해 적절하게 Context Switching 함으로써 동시에 실행되는 것처럼 보이게 하는 것을 동시성이라고 부르며 절대적인 시간 관점으로 봤을 때는 동시에 처리가 되지 않는 특징을 가지고 있습니다.

병렬성

병렬성은 여러 작업이 실제로 물리적으로 동시에 동작하는 개념으로 각 작업이 서로 다른 프로세서나 코어에서 동시에 실행되는 것을 의미합니다. 병렬성은 별도의 코어에서 동시에 실행되기 때문에 멀티 코어에서만 구현 가능한 특징이 있습니다. 또한 병렬 처리는 다양한 기법으로 처리될 수 있는데 데이터 병렬성(Data Parallelism)과 작업 병렬성(Task Parallelism)에 대해 설명하겠습니다.


  • 데이터 병렬성: 전체 데이터를 나누어 서브 데이터를 만들고 서브 데이터를 병렬 처리해 작업하는 것을 의미합니다. 데이터 셋을 여러 부분으로 나눈 다음 여러 처리 유닛(프로세서, 코어 등)에 분배되고 각 유닛은 자신이 담당한 데이터 부분에 동일한 연산을 적용합니다.
  • 작업 병렬성: 단일 작업을 여러 부분으로 나누어 동시에 실행함으로써 전체적인 처리 속도를 향상시키는 방법 중 하나입니다. 각 작업 간에는 의존성이 없거나 최소한으로 존재할 때 사용됩니다.

Java 병렬 처리

자바5 이전의 자바에서는 데이터 컬렉션을 병렬 처리하기 위해서는 Thread, Runnable API를 통해서 개발자가 직접 데이터를 분할하고 각각의 스레드를 생성하여 할당해야 했습니다. 또한 경쟁 상태(Race Condition)를 방지하기 위해 직접 동기화를 해야했고 마지막에는 각 스레드에서 발생한 부분 결과를 하나로 합치는 과정이 필요했습니다. 위와 같은 저수준의 API를 사용하는 것 대신 병렬 처리를 위해 각 버전별로 추가된 새로운 API를 소개하겠습니다.

Java 5

Callable

@FunctionalInterface
public interface Callable<V> {

    V call() throws Exception;
}

Callable 인터페이스는 Runnable 인터페이스와 유사하게 인스턴스가 잠재적(potentially)으로 다른 스레드에 의해 실행되는 클래스용으로 설계되었습니다. 그러나 Runnable 인터페이스와는 다르게 결과를 반환할 수 있으며 예외를 발생시킬 수 있습니다. 또한 자바 5에서 추가된 제네릭을 사용해 결과를 받을 수 있는 것이 큰 특징입니다.

Future

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future 인터페이스는 비동기 계산의 결과를 나타내기 위해 설계되었습니다. 가용 가능한 쓰레드가 없어 실행이 미루어졌을 때 작업이 완료되었는지 확인하고 완료될 때까지 기다리고 작업 결과를 검색하는 메소드가 제공됩니다. 결과는 작업이 완료 되었을 때 get() 메소드를 사용하여 결괏값을 확인할 수 있으며 필요한 경우 준비될 때까지 차단됩니다.

Executor Framework

Executor Framework는 자바에서 스레드를 관리하고 작업을 비동기적으로 실행하는데 필요한 고수준의 인터페이스와 클래스들의 집합을 제공하는 프레임워크로 자바 5버전에 도입된 프레임워크입니다.

Executor

Executor는 Runnable 작업을 실행하는 인터페이스입니다. 해당 인터페이스는 스레드의 세부 사항, 예약 등을 포함하여 각 작업이 실행되는 방식에 대한 메커니즘에 대해서는 관심사를 가지지 않으며 오로지 등록된 작업을 실행하는 책임만을 가지는 인터페이스입니다. 기존 명시적으로 스레드를 생성하는 것 대신 다음과 같이 사용하는 것으로 대체될 수 있습니다.

  @Test
  void testExecute() throws Exception {
      Executor executor = Runnable::run;
      executor.execute(() -> System.out.println("Thread Name: " + Thread.currentThread().getName()));
      executor.execute(() -> System.out.println("Thread Name: " + Thread.currentThread().getName()));
  }

그리고 Executor 인터페이스는 기본적으로 실행되는 작업이 비동기임을 엄격하게 보장(strictly required)하지 않습니다. 위와 같은 이유로 새로운 스레드 아닌 메인 쓰레드에서 실행됩니다. 따라서 새로운 Executor 객체를 생성해야지만 새로운 스레드가 생성됨으로 아래와 같이 사용해야합니다.

  @Test
  void testExecuteThreadPerTask() throws Exception {
      Executor executor = new ThreadPerTaskExecutor();

      executor.execute(() -> System.out.println("Thread Name: " + Thread.currentThread().getName()));
  }

  static class ThreadPerTaskExecutor implements Executor {
      public void execute(Runnable r) {
          new Thread(r).start();
      }
  }

ExecutorService

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService는 Executor 인터페이스를 구현한 스레드 풀(Thread Pool)을 나타내는 인터페이스입니다. 주로 스레드를 관리하고 비동기 작업을 실행하는 데 사용됩니다.

스레드 관리

ExecutorService는 작업 상태를 추적하고 스레드 풀을 관리하는 기능을 제공합니다. 다음은 ExecutorService에서 제공 되는 메소드들에 대한 설명입니다.


  • shutdown: 이전에 제출된 작업은 실행되지만 새 작업은 허용되지 않는 순차적 종료를 의미합니다. 이전 작업이 실행을 완료할 때까지 기다리지 않습니다.
  • shutdownNow: 현재 실행 중인 모든 작업을 종료합니다. 대기 중인 작업의 처리를 중지(interrupt)하고 실행 대기 중인 작업 목록을 반환합니다. 이전 작업이 실행을 완료할 때까지 기다리지 않습니다.
  • awaitTermination: 종료 요청 후 모든 작업의 실행이 완료되거나 시간 초과가 발생하거나 현재 스레드가 중단될 때까지 대기합니다. 지정한 시간 내에 모든 작업의 종료 여부를 반환합니다.

비동기 작업

ExecutorService는 새로운 작업을 제출하고 실행하는 기능을 제공합니다. 다음은 ExecutorService에서 제공 되는 메소드들에 대한 설명입니다.


  • submit: 실행을 위해 작업을 제출(추가)하고 작업의 결과를 나타내는 Future를 반환합니다.
  • invokeAll: 주어진 작업을 모두 실행하고 모든 작업이 완료되면 작업의 결과를 나타내는 Future 목록을 반환합니다.
  • invokeAny: 주어진 작업 중 가장 빠르게 실행된 결과를 나타내는 Future를 반환합니다. 정상 또는 예외에 상관없이 결과가 반환되면 완료되지 않은 모든 작업은 취소됩니다. 또한 작업이 진행되는 동안 제출한 작업 목록이 수정되면 정의되지 않(undefined)습니다.

Executors

Executor, ExecutorService, ScheduledExecutorService, ThreadFactory 및 Callable 클래스에 대한 팩토리 및 유틸리티 메소드를 제공하는 클래스입니다. Executors는 고수준의 동시성 프로그래밍 모델로 스레드 풀을 생성하고 관리하기 위한 유틸리티 클래스로 여러 정적 팩토리 메소드를 제공합니다.


  • newFixedThreadPool: 재사용 가능한 고정된 수의 스레드를 사용하는 스레드 풀을 만듭니다. 모든 스레드가 활성 상태일 때 추가 작업이 제출되면 스레드를 사용할 수 있을 때까지 대기열에서 대기합니다. 풀의 스레드는 명시적으로 shutdonw()메소드가 호출되어 종료될 때까지 존재합니다.
  • newCachedThreadPool: 필요한만큼 새 스레드를 생성하지만 사용 가능한 스레드가 존재한다면 재사용하는 스레드 풀을 생성합니다. 기존 스레드를 사용할 수 없으면 새 스레드가 생성되어 풀에 추가됩니다. 60초 동안 사용되지 않은 스레드는 종료되고 캐시에서 제거됨으로 유휴 상태에 따른 리소스 낭비가 없습니다.
  • newScheduledThreadPool: 일정 시간 뒤 혹은 주기적(scheduled)으로 실행되도록 명령을 예약할 수 있는 쓰레드 풀을 생성합니다.

Java 7

Fork/Join Framework

출처

Fork/Join Framework는 작업을 여러 작은 조각으로 분할하고 이를 병렬로 실행한 후 결과를 합치는 방식으로 동작하는 데이터 병렬 처리를 위한 고성능을 갖춘 세분화된 작업 실행 프레임워크로 자바 7버전에 도입된 프레임워크입니다.

RecursiveTask

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
  private static final long serialVersionUID = 5232453952276485270L;

  V result;

  protected abstract V compute();

  public final V getRawResult() {
      return result;
  }

  protected final void setRawResult(V value) {
      result = value;
  }

  protected final boolean exec() {
      result = compute();
      return true;
  }

}

RecursiveTask 추상 클래스는 ForkJoinTask 클래스를 확장하며 서브 클래스를 구현함으로써 스레드 풀을 사용할 수 있습니다. 테스크를 서브 테스크로 분할하는 로직 더이상 분할이 불가능할 때 서브 테스크의 결과를 생산할 알고리즘을 compute() 메소드로 정의합니다. compute() 메소드 구현은 아래의 Pseudocode 형식을 갖습니다.

if (태스크가 충분히 작거나 더 이상 분할할 수 없는 경우) {
  순차적으로 태스크 계산  
} else {
  태스크를 두 개의 서브 태스크로 분할
  태스크가 다시 서브 태스크로 분할되도록 이 메소드를 재귀적으로 호출
  모든 서브 태스크의 연산이 완료될 때까지 기다림
  각 서브 태스크의 결과를 합침
}

위의 Pseudocode에 따라 배열의 모든 요소 합을 구하는 RecursiveTask를 활용한 코드를 작성해보겠습니다.

static class SumTask extends RecursiveTask<Integer> {
  private static final int THRESHOLD = 5;
  private final int[] array;
  private final int start;
  private final int end;

  public SumTask(int[] array, int start, int end) {
      this.array = array;
      this.start = start;
      this.end = end;
  }

  @Override
  protected Integer compute() {
      if (end - start <= THRESHOLD) {
          int sum = 0;
          for (int i = start; i < end; i++) {
              sum += array[i];
          }
          return sum;
      } else {
          // divide
          int mid = (start + end) / 2;
          SumTask leftTask = new SumTask(array, start, mid);
          SumTask rightTask = new SumTask(array, mid, end);

          // fork
          leftTask.fork();
          int rightResult = rightTask.compute();

          // join
          int leftResult = leftTask.join();

          return leftResult + rightResult;
      }
  }

위와 같이 작성된 SumTask를 실행시키는 코드는 아래와 같습니다.

@Test
void testRecursiveTask() throws Exception {
    // given
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    int[] array = new int[100];
    for (int i = 0; i < 100; i++) {
        array[i] = i + 1;
    }

    // when
    SumTask sumTask = new SumTask(array, 0, array.length);

    int result = forkJoinPool.invoke(sumTask);

    // then
    assertThat(result).isEqualTo(5050);
}

여기서 주의할 점으로 compute 메소드에서 테스크를 분할하여 실행시킬 때 각각의 메소드 속성에 따른 오버헤드를 계산해야합니다.


  • join : join() 메소드는 결과를 반환할 때까지 블록시키기 때문에 항상 두 서브 테스크를 시작한 뒤 호출해야 합니다. 그렇지 않으면 기존의 순차 스트림보다 느리게 되는데 이는 작업의 결과를 기다리지 않아 프로세서가 유휴 상태에 있게 되기 때문입니다.
  • compute: 분리된 서브테스크 중 한 작업에만 compute() 작업을 호출해야 합니다. 테스크간 스레드를 재사용할 수 있어 오버헤드가 감소합니다.

오탈자 및 오류 내용을 댓글 또는 메일로 알려주시면, 검토 후 조치하겠습니다.

'Language > Java' 카테고리의 다른 글

[Java] 동기화  (0) 2024.03.13
[Java] 데이터 병렬 처리(Java 8)  (0) 2024.01.08
[Library] Assertions  (1) 2023.11.27
[Java] JPMS(Java 9 Platform Module System)  (1) 2023.10.23
[Java] Enumeration  (0) 2023.09.04