Posts Thread Pool을 위한 Java Executor Framework
Post
Cancel

Thread Pool을 위한 Java Executor Framework

자바에서의 비동기 처리를 위해 CompletableFuture에 대해 공부하며 클래스 내부를 보다보니 Executor, ForkJoinPool 등이 눈에 띄었다. 생소한 부분이라 공부하며 나름대로 정리해보았다.

Thread Pool


Executor, ForkJoinPool에 대해 알기 전에 먼저 스레드 풀의 개념에 대해 살펴보자.

  • 스레드를 만들고 관리하는 데는 비용이 많이 들기 때문에, 스레드를 필요할 때 마다 생성하는게 아니라 미리 생성해놓고 필요할 때 마다 재사용한다.
    • 이를 위해 스레드를 관리하기 위한 스레드 풀이라는 개념이 나오게 된다.
    • 즉, 요청이 들어올 때마다 새 스레드를 만드는 대신 스레드 풀을 사용하여 task를 병렬로 실행할 수 있다.
  • 스레드 풀 인스턴스는 이러한 작업을 실행하기 위해 재사용되는 여러 스레드를 제어한다.
    • 스레드를 재사용함으로써, 멀티 스레드를 활용하는 애플리케이션의 리소스를 아낄 수 있다.
  • 스레드 풀을 통해 생성하는 스레드 수와 스레드의 생명주기를 제어할 수 있다. 또한 작업(task)을 스케줄링하고 큐에 작업을 보관할 수 있다.
    • 스레드 갯수를 정함으로써 동시성의 정도를 제한할 수 있다.
출처 : https://www.baeldung.com/thread-pool-java-and-guava

스레드 풀을 사용해야 하는 이유

  • 자바에서 스레드는 운영 체제의 리소스인 시스템 수준 스레드(system-level thread)에 매핑되기 때문에, 스레드를 무분별하게 생성하면 리소스가 빠르게 소진될 수 있다.
    • ex) java.lang.OutOfMemoryError: unable to create new native thread
  • 운영 체제는 여러 task를 동시에 처리(실제로는 한 번에 하나의 task)하기 위해 스레드 간 context switching을 수행한다.
    • 따라서, 스레드를 많이 생성할수록 각 스레드가 실제 작업을 수행하는 데 걸리는 시간이 줄어든다.
  • reqeust 또는 task 처리 중에 스레드가 생성되지 않으므로 응답 시간이 단축된다.
  • 필요에 따라 애플리케이션의 실행 정책을 유연하게 변경할 수 있다.
    • 예를 들어, 자바의 ExecutorService 구현체를 교체하기만 하면 단일 스레드에서 멀티 스레드로 대체할 수 있다.
  • 시스템 부하 및 사용 가능한 리소스에 기반하여 스레드 수를 결정하기 때문에, 시스템의 안정성을 높인다.
  • 스레드 관리보다 비즈니스 로직에 집중할 수 있다.

자바에서의 스레드 풀 관리


Java 1.5 이전까지는 스레드 풀을 만들고 관리하는 것이 개발자의 책임이었지만, JDK 5 부터는 Executor 프레임워크에서 Java에 내장된 다양한 스레드 풀(fixed thread pool, cached thread pool 등)을 제공한다. Executor, ExecutorService, Executors는 Executor 프레임워크의 핵심이다.

Executor / ExecutorService / Executors 비교

Executor

  • Executor는 병렬 실행(parallel execution)을 위해 추상화된 핵심 인터페이스이다.
    1
    2
    3
    
    public interface Executor {
      void execute(Runnable command);
    }
    
  • Executor는 작업(task)과 실행(execution)을 결합한 Thread(new Thread(RunnableTask()).start())와는 다르게 작업과 실행을 구분한다.
    • 따라서, Executor는 task를 처리하기 위한 스레드를 직접 호출하는 대신 다음과 같이 사용될 수 있다.
      1
      2
      3
      4
      5
      6
      
      static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
          if (f == null) throw new NullPointerException();
          CompletableFuture<U> d = new CompletableFuture<U>();
          e.execute(new AsyncSupply<U>(d, f));
          return d;
      }
      

ExecutorService

  • ExecutorService는 Executor 인터페이스의 확장으로, Future 개체를 반환하고, 스레드 풀을 종료하는 등의 다양한 기능을 제공한다.
1
2
3
public interface ExecutorService extends Executor {
...
}
  • shutdown()이 호출되면 스레드 풀은 새로운 task를 수락하지 않고 보류 중인 task를 완료한다.
  • submit()을 통해 Future 객체를 리턴한다.
1
2
3
4
5
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);
  • Future 객체는 비동기 실행 기능을 제공한다.
    • 즉, task에 대한 실행이 완료될 때까지 기다릴 필요없이, 추후에 Future 객체에 결과가 있는지 확인하고 실행이 완료되면 Future.get()을 사용하여 결과를 얻을 수 있다.
    • get()은 blocking method이다.
      • 즉, task의 실행이 완료될 때까지 기다리고 아직 완료되지 않은 경우 결과를 사용할 수 없다.
  • cancel()을 통해 보류 중인 실행을 취소할 수 있다.
  • 이외에도 invokeAny(), invokeAll() 등 다양한 메서드를 제공한다. (자세한 내용은 공식 문서 참조)

Executors

  • Executors는 Collections와 유사한 유틸리티 클래스로, fixed thread pool, cached thread pool과 같은 서로 다른 유형의 스레드 풀을 만드는 팩토리 메서드를 제공한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>(),
                                     threadFactory);
}

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

Executor 프레임워크 사용시 주의할 점

  • fixed length thread pool 사용시 스레드 풀 용량
    • 애플리케이션이 task를 효율적으로 실행하기 위해 필요한 스레드 수를 결정하는 것은 매우 중요하다.
    • 너무 큰 스레드 풀은 대부분의 스레드가 대기 모드에 있게되고, 이러한 스레드를 만드는데 불필요한 오버헤드가 발생한다.
    • 너무 적으면 큐에 있는 task는 대기하는 시간이 길어지기 때문에, 애플리케이션이 응답하지 않는 것처럼 보일 수 있다.
  • 작업 취소 후 Future.get() 메서드 호출
    • 이미 취소된 작업의 결과를 가져오려고 하면 CancellationException이 발생한다.
  • Future.get() 메서드로 인해 예기치 않게 긴 blocking
    • 이를 방지하기 위해 제한 시간을 사용하는 것이 좋다.

ThreadPoolExecutor


  • ThreadPoolExecutor는 미세 조정을 위한 많은 매개변수와 후크가 있는 확장 가능한 스레드 풀 구현체이다.
  • 주요 구성 매개변수는 corePoolSize, maximumPoolSize, keepAliveTime이다.
    • corePoolSize 매개변수는 인스턴스화되어 풀에 보관될 코어 스레드의 수이다.
    • 새 작업이 들어올 때 모든 코어 스레드가 사용 중이고 내부 큐가 가득 차면 풀이 maximumPoolSize까지 커질 수 있다.
  • 풀은 항상 내부에 유지되는 고정된 수의 코어 스레드로 구성된다.
    • 생성된 다음 더 이상 필요하지 않을 때 종료될 수 있는 excessive 스레드로 구성되기도 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

FixedThreadPool

  • corePoolSize와 maximumPoolSize가 같으며 keepAliveTime이 0인 ThreadPoolExecutor.
  • 따라서, 이 스레드 풀의 스레드 수는 항상 동일하다.
1
2
3
4
5
6
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
  • 아래 예시의 경우, 동시에 실행되는 작업의 수가 항상 2개보다 작거나 같으면 즉시 실행된다.
    • 즉, 처음 두 태스크는 한 번에 실행되고 세 번째 태스크는 대기열에서 대기해야 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);

executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());

CachedThreadPool

  • corePoolSize는 0, maximumPoolSize는 Integer.MAX_VALUE, keepAliveTime은 60초인 ThreadPoolExecutor.
    • 즉, 스레드 풀이 모든 task를 수용할 수 있도록 제한 없이 커질 수 있음을 의미한다.
    • 또한 스레드가 60초 동안 사용하지 않으면 폐기된다.
  • CachedThreadPool은 애플리케이션이 주로 short-living task를 처리하는 경우 활용한다.
  • 내부적으로 SynchronousQueue가 사용되므로 대기열 크기는 항상 0이다.
    • SynchronousQueue에서는 삽입 및 제거 작업 쌍이 항상 동시에 수행되기 때문에, 실제로 아무것도 포함하지 않는다.
1
2
3
4
5
6
7
8
9
10
11
12
public static ExecutorService newCachedThreadPool() {
       return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                     60L, TimeUnit.SECONDS,
                                     new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();

executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(3, executor.getPoolSize());
assertEquals(0, executor.getQueue().size());

SingleThreadExecutor

  • Executors.newSingleThreadExecutor()는 단일 스레드를 포함하는 또 다른 일반적인 형태의 ThreadPoolExecutor를 만든다.
  • SingleThreadExecutor는 이벤트 루프를 만드는 데 이상적이다. corePoolSize, maximumPoolSize는 1이고 keepAliveTime은 0이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
1
2
3
4
5
6
7
8
9
AtomicInteger counter = new AtomicInteger();

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    counter.set(1);
});
executor.submit(() -> {
    counter.compareAndSet(1, 2);
});

ScheduledThreadPoolExecutor

  • ScheduledThreadPoolExecutor는 ThreadPoolExecutor 클래스를 상속받고 ScheduledExecutorService 인터페이스도 구현하여 부가적인 기능을 제공한다.
    • schedule() 메서드를 사용하면 지정된 지연 후 작업을 한 번 실행할 수 있다.
    • scheduleAtFixedRate() 메서드를 사용하면 지정된 초기 지연 후에 작업을 실행한 다음 특정 기간 동안 반복 실행할 수 있다.
      • 즉, task 수행 시작 시간은 (initialDelay + delay), (initialDelay + 2 * period), … 이런식으로 계산된다.
    • scheduleWithFixedDelay() 메서드는 지정된 태스크를 반복적으로 실행한다는 점에서 scheduleAtFixedRate()와 유사하다.
      • 하지만, delay는 이전 task의 종료와 다음 task의 시작 사이에서 측정된다.
      • 즉, task 수행 시작 시간은 (이전 task의 끝나는 시점 + delay)가 된다.
1
2
3
4
5
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
         implements ScheduledExecutorService {
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}
1
2
3
4
5
6
7
8
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
1
2
3
4
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
    System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);
  • 다음 코드는 500밀리초 지연 후 작업을 실행한 후 100밀리초마다 반복하는 방법을 보여준다.
  • 또한, 작업을 예약한 후 CountDownLatch lock을 사용하여 작업이 세 번 실행될 때까지 기다린 후, Future.cancel() 메서드를 사용하여 작업을 취소한다.
1
2
3
4
5
6
7
8
9
10
CountDownLatch lock = new CountDownLatch(3);

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
    System.out.println("Hello World");
    lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);

lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);

더 공부할 부분


  • Fork/Join Framework

참고자료


This post is licensed under CC BY 4.0 by the author.

(미해결) 카프카 프로듀서 재송신 테스트

비동기 프로그래밍을 위한 자바 클래스 살펴보기