본문 바로가기
Language/Java

[Java] Thread Pool

by 기몬식 2024. 7. 18.

이전 글에서 스레드에 대해 알아보았습니다. 스레드는 병렬 처리를 가능하게 하여 여러 작업을 동시에 실행할 수 있게 해주지만 사실 스레드를 직접 생성하고 해제하는 것은 상당히 복잡함을 요구합니다.


  1. 자원: 스레드 생성 시 운영체제는 스레드에 필요한 메모리와 다른 자원을 할당해야함.
  2. 시간: 스레드를 생성하고 해제하는 작업은 시간 소모적이기 때문에 이로 인한 오버헤드가 발생할 수 있음.
  3. 관리: 스레드를 직접 관리하는 것은 동시성 문제, 스케줄링, 예외 처리등 다양한 문제들이 복잡하게 엮여 있기 때문에 오류가 발생하기 쉬움.

즉 위와 같은 문제들로 인해 요청이 빠르게 많이 들어 오고 가는 실제 운영 환경에서 지속적으로 스레드를 생성하게 되면 어느 순간 응답이 불가능한 상태에 빠지게 됩니다. 이런 상황 속에서 조금 더 스레드를 안전하고 효율적으로 사용할 수 있도록 고안해낸 방법이 바로 스레드 풀(Thread Pool)입니다.

Thread Pool


출처


컴퓨터 프로그래밍에서 스레드 풀은 동시(concurrency) 실행을 달성하기 위한 설계 패턴 중 하나로 감독 프로그램에 의한 동시 실행을 위해 작업이 할당되기를 기다리는 여러 스레드를 유지 관리합니다. 즉 일정 수의 스레드를 미리 생성하여 작업 큐에 있는 작업을 처리하는 방식으로 성능을 향상시키고 단기 작업에 대한 스레드의 빈번한 생성 및 파괴로 인한 실행 지연을 방지합니다.


또한 스레드 풀에서 스레드 수를 제한함으로써 얻을 수 있는 이점은 시스템 자원을 보다 더 효율적으로 사용할 수 있습니다. 스레드가 많을수록 많이 소비되는 메모리와 컨텍스트 스위칭 오버헤드를 줄일 수 있으며 하드웨어 및 프로그램에 맞는 적절한 스레드 수는 각 스레드가 충분한 CPU 시간을 할당받을 수 있게 됩니다.


결론적으로 스레드 풀의 사용은 각 작업에 대해 새 스레드를 생성하는 것에 비해 스레드 풀은 스레드 생성 및 제거 오버헤드가 풀의 초기 생성으로 제한되어 성능이 향상되고 시스템 안정성이 향상될 수 있으며 스레드 생성 및 삭제 관련 리소스는 시간 측면에서 비용이 많이 드는 프로세스이므로 미리 생성하여 작업의 응답 시간을 단축 및 자원을 절약할 수 있습니다.

Java

Java 는 스레드 풀 및 동시성 프로그래밍을 보다 쉽고 간편하게 사용할 수 있도록 다양한 고수준의 API 를 제공하고 있습니다. 그 중 java.util.concurrent.Executors 클래스는 다양한 종류의 스레드 풀을 생성할 수 있는 정적 메소드를 통해 상황에 맞는 스레드 풀을 선택하여 사용할 수 있습니다.


  • FixedThreadPool
    • 고정된 크기의 스레드 풀을 생성하며 그 수를 유지시키는 스레드 풀을 생성함
  • newCachedThreadPool
    • 필요에 따라 스레드를 증가하여 생성하거나 종료시키는 스레드 풀을 생성함
    • 반면 스레드 수에 제한을 두지 않기 때문에 주의해야함
  • newScheduledThreadPool
    • 일정 시간 간격으로 또는 주기적인 작업을 처리할 수 있는 스레드 풀을 생성함
  • newSingleThreadExecutor
    • 단일 스레드로 작업을 처리하는 스레드 풀을 생성함

스레드 풀

java.util.concurrent.ExecutorService 작업을 등록하고 실행을 담당하는 인터페이스로 대부분의 스레드 풀은 기본적으로 해당 인터페이스를 구현합니다. 따라서 ExecutorService 은 작업들을 상태를 확인하고 라이프사이클을 관리하기 위한 메소들을 제공하고 있습니다.


@Test
void testNewFixedThreadPool() {
  // given
  ExecutorService threadPool = Executors.newFixedThreadPool(4);
  Runnable runnable = () -> System.out.println("Thread: " + Thread.currentThread().getName());

  // when
  threadPool.execute(runnable);

  threadPool.shutdown();

  // then
  Assertions.assertThatThrownBy(() -> threadPool.execute(runnable));
}

ExecutorService 를 통해 스레드 풀을 생성하여 작업을 제출하게 되면 shutdown 메소드를 명시적으로 호출해줘야 합니다. shutdown 메소드는 이미 제출된 작업을 완료한 후 종료를 진행시키는 메소드로 해당 메소드가 호출되기 전까지 계속해서 다음 작업을 대기하기 때문입니다.

비동기

ExecutorService 는 비동기 작업을 위한 여러 기능들을 제공하는데 대표적으로 비동기 작업을 관리 및 추적할 수 있는 Future 인터페이스를 반환 타입으로 가지는 메소드들을 제공합니다.


@Test
void testSubmit() throws InterruptedException, ExecutionException {
  ExecutorService threadPool = Executors.newFixedThreadPool(2);

  Callable<String> hello = () -> {
      Thread.sleep(2000L);
      return "hello";
  };

  Future<String> future = threadPool.submit(hello);

  threadPool.shutdown();

  assertThat(future.get()).isEqualTo("hello");
  assertThat(future.isDone()).isTrue();
}

먼저 단일 작업(Runnable, Callable)을 실행하는 sumit 메소드입니다. 실제로 해당 테스트 코드를 실행해보면 약 2초 + α 의 시간이 소요되며 작업은 이미 실행이 완료된 상태이기 때문에 isDone 은 true 가 반환됩니다. 작업이 성공적으로 완료되었다면 get 메소드를 통해 작업의 결괏값을 확인할 수 있습니다.


@Test
void testInvokeAll() throws InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    Instant start = Instant.now();

    Callable<String> hello = () -> {
        Thread.sleep(1000L);
        String result = "hello";
        System.out.println(result);
        return result;
    };

    Callable<String> bye = () -> {
        Thread.sleep(5000L);
        String result = "bye";
        System.out.println(result);
        return result;
    };

    Callable<String> again = () -> {
        Thread.sleep(6000L);
        String result = "again";
        System.out.println(result);
        return result;
    };

    threadPool.invokeAll(List.of(hello, bye, again));
    threadPool.shutdown();

    Instant end = Instant.now();
    long seconds = Duration.between(start, end).getSeconds();

    assertThat(seconds).isEqualTo(6);
}

다음은 제출한 모든 작업이 종료될 때까지 블로킹하는 invokeAll 메소드입니다. 동시에 주어진 모든 작업을 실행하기 때문에 작업에 소요되는 시간은 가장 오랜 시간이 소요되는 작업의 시간과 동일합니다. 하지만 위의 경우는 등록한 작업들을 실행하기 위한 충분한 스레드 수가 존재하는 경우입니다.


위와 동일한 코드에서 스레드 갯수만 3에서 2로 줄인다면 결과는 어떻게 되는지 확인해보겠습니다.


@Test
void testInvokeAll() throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(2); // 변경
Instant start = Instant.now();

    Callable<String> hello = () -> {
        Thread.sleep(1000L);
        String result = "hello";
        System.out.println(result);
        return result;
    };

    Callable<String> bye = () -> {
        Thread.sleep(5000L);
        String result = "bye";
        System.out.println(result);
        return result;
    };

    Callable<String> again = () -> {
        Thread.sleep(6000L);
        String result = "again";
        System.out.println(result);
        return result;
    };

    threadPool.invokeAll(List.of(hello, bye, again));
    threadPool.shutdown();

    Instant end = Instant.now();
    long seconds = Duration.between(start, end).getSeconds();

    assertThat(seconds).isEqualTo(7); // 변경
}

실제 다양한 환경에서 실행 되는 비동기, 동시성 프로그램을 정확하게 예측하고 정의하는 것은 다소 어려움이 있겠으나 위와 같이 단순한 예제에서는 어느 정도 가능합니다. 스레드 풀의 사이즈는 3개에서 2개로 줄었고 수행해야할 작업의 갯수와 걸리는 시간은 동일합니다. 따라서 랜덤하게 2개의 작업이 선정되어 먼저 실행되고 작업이 먼저 끝난 스레드 풀은 나머지 작업을 실행하게 됩니다. 결국 가장 오랜 시간이 걸리는 작업에 + α 의 시간이 추가됩니다.


@Test
void testInvokeAny() throws InterruptedException, ExecutionException {
  ExecutorService threadPool = Executors.newFixedThreadPool(2);
  Instant start = Instant.now();

  Callable<String> hello = () -> {
      Thread.sleep(1000L);
      String result = "hello";
      System.out.println(result);
      return result;
  };

  Callable<String> bye = () -> {
      Thread.sleep(5000L);
      String result = "bye";
      System.out.println(result);
      return result;
  };

  String result = threadPool.invokeAny(List.of(hello, bye));
  threadPool.shutdown();

  assertThat(result).isEqualTo("hello");
}

마지막으로 제출한 모든 작업 중 가장 빠르게 종료된 작업을 블로킹하며 반환하는 invokeAny 메소드입니다.
흥미롭게도 invokeAny 메소드는 실행된 작업 중에서 하나가 예외를 던지지 않고 정상적으로 완료되면 그 작업의 결과를 반환하고 아직 완료되지 않은 나머지 작업은 취소됩니다. 또한 메소드가 실행되는 동안 주어진 작업 컬렉션을 수정하면 그 동작 결과는 예측할 수 없기 때문에 주어진 작업들을 수정하지 않아야 합니다.


그렇다면 왜 invokeAny 는 이런 방식으로 동작하는지 궁금해서 서치를 해보았습니다. 주로 시스템 리소스르 효율적으로 사용하기 위함이라고 알게 됐는데 사실 이런 모호한 말보다 아래 예제를 보고 단번에 이해하게 됐습니다.


List<Callable<String>> tasks = List.of(
    () -> fetchFromServer1(),
    () -> fetchFromServer2(),
    () -> fetchFromServer3()
);

여러 서버에 요청을 보내고 가장 빠르게 응답하는 서버의 결과를 반환하는 곳의 응답값을 사용하는 예제입니다. 가장 먼저 가용한 서버를 확인하고 그 결과를 통해 사용할 수 있도록 사용 가능한 것 입니다!!


위와 같이 스레드 풀의 개념부터 Java 에서 제공하는 고수준의 API 까지 알아 봤습니다. 하지만 기본적으로 비동기 작업에서 반환 타입으로 사용되는 Future 인터페이스는 결과를 얻기 위해서는 블로킹 방식으로 대기해야 하는 단점이 있습니다. 이를 개선하기 위해 해당 비동기 작업들에 대한 콜백 함수를 정의할 수 있는 feature 가 Java8 에 추가되었는데 이는 다음에 알아보도록 하겠습니다.


오탈자 및 오류 내용을 댓글 또는 메일로 알려주시면, 검토 후 조치하겠습니다.

'Language > Java' 카테고리의 다른 글

[Java] Stream 과 Optional 의 map, flatMap  (0) 2024.09.06
[Java] Thread  (1) 2024.06.30
[Java] Native Method  (0) 2024.06.15
[Java] 동기화  (0) 2024.03.13
[Java] 데이터 병렬 처리(Java 8)  (0) 2024.01.08