스레드를 직접 사용할 때의 문제점
- 스레드 생성 시간으로 인한 성능 문제
- 메모리 할당: 스레드는 자신만의 호출 스택을 가지고 있어야 하는데, 이는 곧 메모리 공간을 할당해주는 작업이다.
- 운영체제 자원 사용: 스레드 생성 작업은 시스템 콜(system call)을 통해 처리된다. 즉, CPU와 메모리 리소스를 소모하는 작업이다.
- 운영체제 스케줄러 설정: 새로운 스레드가 생성되면 운영체제의 스케줄러는 이 스레드를 관리하고 실행 순서를 조정해야 한다 → 오버헤드
- 참고로 스레드 하나는 보통 1MB 이상의 메모리를 사용한다.
- ⇒ 스레드 생성 작업은 상대적으로 무거우며, 어떤 작업 하나를 수행할 때마다 스레드를 각각 생성하고 실행한다면, 이 생성 비용 때문에, 이미 많은 시간이 소모될 것
- ⇒ 생성한 스레드를 재사용할 필요가 있다..!
- 스레드 관리 문제
- 서버의 CPU, 메모리 자원은 한정되어 있기 때문에 스레드는 무한하게 만들 수 없다.
- 우리 시스템이 버틸 수 있는, 최대 스레드의 수까지만 스레드를 생성할 수 있게 관리해야 한다.
- 또한, 애플리케이션 종료 시 실행 중인 스레드를 어떻게 종료할지에 대한 처리가 필요하므로, 역시 스레드가 어딘가에 관리가 되어 있어야 한다.
Runnable
인터페이스의 불편함- 반환 값이 없다:
run()
메서드는 반환값을 가지지 않아서, 스레드의 실행 결과를 직접 받을 수 없다. 이전까지는join()
등을 사용해서 스레드가 종료되길 기다리고, 다음에 멤버 변수에 보관한 값을 받아오는 방식으로 사용했다. - 예외 처리:
run()
메서드는 체크 예외(checked exception)을 던질 수 없다. 체크 예외의 처리는 메서드 내부에서 처리해야 한다.
- 반환 값이 없다:
해결
1번, 2번 문제를 해결하기 위해서 스레드를 생성하고 관리하는 풀(Pool
)이 필요하다.
- 스레드를 관리하는 스레드 풀에 스레드를 미리 필요한만큼 만들어둔다.
- 스레드는 스레드 풀에 대기(
WAITING
)하며 쉰다. - 작업 요청이 오면 스레드 풀에서 대기 중인 스레드를 하나 조회하여 작업을 처리한다.
- 이후 작업이 완료되면 스레드를 종료하는게 아니라, 다시 스레드 풀에 반납하여 재사용할 수 있게 한다.
⇒ 스레드의 생성 시간을 절약할 수 있고, 필요한 만큼만 스레드를 만들어 관리할 수 있다.
하지만 이런 스레드 풀을 구현하기에는 생각보다 복잡한 작업들이 많은데, 이런 문제를 한방에 해결해주는 것이 바로 자바가 제공하는 Executor
프레임워크이다.
Executor
프레임워크는 스레드 풀, 스레드 관리, Runnable
의 문제점은 물론이고, 생산자 소비자 문제까지 한방에 해결해주는 자바 멀티스레드 최고의 도구이다.
실무에서는 스레드를 직접 하나하나 생성해서 사용하기보다는 Executor
프레임워크를 주로 사용하여 멀티스레드 프로그래밍을 한다.
Executor 프레임워크 & ExecutorService
Executor 프레임워크의 주요 구성 요소
Executor
public interface Executor {
void execute(Runnable command);
}
ExecutorService
인터페이스 - 주요 메서드
public interface ExecutorService extends Executor, AutoCloseable {
<T> Future<T> submit(Callable<T> task);
@Override
default void close(){...}
...
}
Executor
인터페이스를 확장해서 작업 제출과 제어 기능을 추가로 제공한다.- 주요 메서드로는
submit()
,close()
가 있다. Executor
프레임워클르 사용할 때는 대부분 이 인터페이스를 사용한다.- 기본 구현체는
ThreadPoolExecutor
이다.
ThreadPoolExecutor
public abstract class ExecutorUtils {
public static void printState(ExecutorService executorService) {
if (executorService instanceof ThreadPoolExecutor poolExecutor) { // ThreadPoolExecutor에 있는 기능들
int pool = poolExecutor.getPoolSize(); // 스레드 풀에서 관리되는 스레드의 숫자
int active = poolExecutor.getActiveCount(); // 작업을 수행하는 스레드의 숫자
int queuedTasks = poolExecutor.getQueue().size(); // 큐에 대기 중인 작업의 숫자
long completedTask = poolExecutor.getCompletedTaskCount(); // 완료된 작업의 숫자
log("[pool=" + pool + ", active=" + active + ", queuedTasks=" + queuedTasks + ", completedTasks=" + completedTask + "]");
} else {
log(executorService);
}
}
}
ExecutorService
의 구현체 중 ThreadPoolExecutor
에 대해 알아보자. 이하에서는 모두 es라는 간단한 변수명으로 칭하겠다
- 2가지 구성 요소
스레드 풀
: 스레드를 관리한다.BlockingQueue
: 작업을 보관한다. 생산자 소비자 문제를 해결하기 위해 단순한 큐가 아니라,BlockingQueue
를 사용한다.
es.execute(Runnable runnable)
- 호출 시, 전달받은
Runnable
인스턴스를BlockingQueue
에 보관한다. - 생산자:
main
스레드 - 소비자: 스레드 풀에 있는 스레드가 소비자. 이후에 소비자 중에 하나가
BlockingQueue
에 들어있는 작업(Runnable
)을 받아서 처리한다.
- 호출 시, 전달받은
ThreadPoolExecutor
생성자corePoolSize
: 스레드 풀에서 관리되는 기본 스레드의 수maximumPoolSize
: 스레드 풀에서 관리되는 최대 스레드의 수keepAliveTime
,TimeUnit unit
: 기본 스레드 수를 초과해서 만들어진 스레드가 생존할 수 있는 대기 시간. 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거된다.BlockingQueue workQueue
: 작업을 보관할 블로킹 큐- 예시)
ExecutorService es = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
- 위 예에서
corePoolSize = 2
,maximumPoolSize = 2
를 사용해서 기본 스레드와 최대 스레드 수를 맞추었다. 따라서 풀에서 관리되는 스레드는 2개로 고정 - 블로킹 큐의 구현체로
LinkedBlockingQueue
를 사용했다. 이 블로킹 큐는 작업을 무한대로 저장할 수 있다. - 예제 실행 분석
ThreadPoolExecutor
를 생성한 시점부터corePoolSize
만큼 스레드를 만드는게 아니라, 작업 요청을 받으면 이때 스레드를 만든다.- 작업이 들어올 때마다
corePoolSize
의 크기까지 스레드를 만든다. corePoolSize
까지 스레드가 생성되고 나면, 이후에는 스레드를 생성하지 않고 앞서 만든 스레드를 재사용한다.- 4개의 작업이 들어오면, 2번에 걸쳐 작업이 실행될 것이다.
close()
를 호출하면,ThreadPoolExecutor
가 종료되면서, 스레드 풀에 대기하는 스레드도 함께 제거된다.
- 위 예에서
Future
시작
Runnable
은 반환값이 없고, 체크 예외를 던질 수 없다는 불편함이 있었다.
⇒ 이를 해결하기 위해 Executor
프레임워크는 Callable
과 Future
라는 인터페이스를 도입했다.
Callable
public interface Callable<V> {
V call() throws Exception;
}
java.util.concurrent
에서 제공되는 기능Callable
의call()
은 반환 타입이 제네릭V
이다. 즉, 값을 반환할 수 있다.throws Exception
예외가 선언되어 있다. 따라서 해당 인터페이스를 구현하는 모든 메서드는 체크 예외인Exception
과 그 하위 예외를 모두 던질 수 있다.
Callable
+ Future
사용
public class CallableMainV1 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<Integer> future = es.submit(new MyCallable());
Integer result = future.get();
log("result value = " + result);
es.close();
}
static class MyCallable implements Callable<Integer> {
@Override
public Integer call() {
log("Callable 시작");
sleep(2000); // 2초정도 걸린다고 가정
int value = new Random().nextInt(10);
log("create value = " + value);
log("Callable 완료");
return value;
}
}
}
- 결과를 더 이상 필드에 담아두고 조회하는 방식이 아니라, 반환값을 통해 직접 받을 수 있다.
MyCallable
인스턴스가 블로킹 큐에 전달되고, 스레드 풀의 스레드 중 하나가 이 작업을 실행할 것이다. 이때, 작업의 처리 결과는 직접 반환되지 않고,Future
라는 특별한 인터페이스를 통해 반환된다.future.get()
을 호출하면MyCallable
의call()
이 반환한 결과를 받을 수 있다.future.get()
은InterruptedException
,ExecutionException
체크 예외를 던진다.- 왜 결과를 바로 반환하지 않고,
Future
라는 것을 사용하는지는 바로 아래 ‘분석’에서 확인하자!
Runnable
을 사용하는 방식과 다르게 단순히 싱글 스레드 방식으로 개발한다는 느낌이 든다.- 내가 스레드를 생성하고,
join()
으로 제어하는 코드들이 필요가 없다.
- 내가 스레드를 생성하고,
분석
Future<Integer> future = es.submit(new MyCallable());
submit()
의 호출로MyCallable
의 인스턴스를 es에 전달한다.submit()
은MyCallable.call()
이 반환되는 무작위 숫자 대신에Future
를 반환한다.MyCallable
의 즉시 실행되어서 즉시 결과를 반환하는 것은 불가능하다. 언제 실행이 완료되어서 결과를 반환할 지도 알 수 없다..- ⇒ 이런 이유로
es.submit()
은MyCallable
의 결과를 반환하는 대신MyCallable
의 결과를 나중에 받을 수 있는Future
라는 객체를 대신 제공한다. - ⇒
Future
는 전달한 작업(MyCallable
)의 미래이다. 이 객체를 통해 전달한 작업의 미래 결과를 받을 수 있다.
실행 결과 분석
- 요청 스레드(main 스레드)의
es.submit(taskA)
를 호출ExecutorService
는 전달한taskA
의 미래 결과를 알 수 있는Future
객체를 생성하고, 이 객체 안에taskA
의 인스턴스를 보관Future
는 인터페이스이고, 실제 구현체는FutureTask
이다.Future
는 내부에taskA
의 작업 완료 여부와 작업의 결과 값을 가진다.
taskA
를 감싸고 있는Future
가 블로킹 큐에 담기는 것이다- 이때, 중요한 핵심은 작업을 전달할 때 생성된
Future
는 즉시 반환- `Future future = es.submit(new MyCallable());`
Future
를 즉시 반환하기 때문에 요청 스레드(main
)는 대기하지 않고, 자유롭게 본인의 다음 코드를 호출할 수 있다!- 이는 마치
Thread.start()
를 호출한 것과 비슷하다.
- 이는 마치
- 이때, 중요한 핵심은 작업을 전달할 때 생성된
- 큐에 들어가 있는
Future[taskA]
를 꺼내서 스레드 풀의 스레드1이 작업을 시작한다.Future
의 구현체인FutureTask
는Runnable
인터페이스도 함께 구현하고 있다- =>
FutureTask
는run()
메서드를 갖고있다는 것 - 스레드1은
FutureTask
의run()
메서드를 수행한다. - 그리고
run()
메서드가taskA
의call()
메서드를 호출하고 그 결과를 받아서 처리한다.FutureTask.run() → MyCallable.call()
- =>
- 요청 스레드의
future.get()
호출- 스레드1
- 스레드1은
taskA
의 작업을 아직 처리 중이다. 아직 완료하지 않았다.- 요청 스레드(main 스레드)
- 요청 스레드는
Future
인스턴스의 참조를 가지고 있으며 언제든 본인이 필요할 때future.get()
을 호출해서taskA
작업의 미래 결과를 받을 수 있다.Future
에는taskA
의 완료 여부를 갖고 있다.
- 요청 스레드가
future.get()
을 호출하면Future
가 완료 상태가 될 때까지 ‘대기’한다. 이때 요청 스레드의 상태는RUNNABLE
→WAITING
이 된다. future.get()
을 호출했을 때,- Future가 완료 상태 → 요청 스레드는 대기하지 않고, 값을 즉시 반환받을 수 있다.
- Future가 완료 상태 아님 → 아직 수행 중이라는 의미로, 어쩔 수 없이 요청 스레드가 결과를 받기 위해 대기해야 한다. 요청 스레드가 마치 락을 얻을 때처럼, 결과를 얻기 위해 대기한다. 이처럼 스레드가 어떤 결과를 얻기 위해 대기하는 것을
블로킹(Blocking)
이라 한다.
- 스레드1은
- 스레드1
Thread.join(), Future.get()과 같은 메서드는 스레드가 작업을 바로 수행하지 않고, 다른 작업이 완료될 때까지 기다리게 하는 블로킹 메서드이다.
- 스레드1의 작업 완료
- 요청 스레드
WAITING
상태로future.get()
을 호출하고 대기 중이다.
- 스레드1
- 작업을 완료하면,
taskA
의 반환 결과를Future
에 담는다. Future
의 상태를 완료로 변경한다.- 요청 스레드를 깨운다. 요청 스레드는
WAITING
→RUNNABLE
상태로 변한다.- 요청 스레드는 본인이 본인을 깨울 수 없으니 깨워줘야 한다.
- 작업을 완료하면,
- 요청 스레드
- 스레드1 반환
- 요청 스레드
Future
에서 결과를 반환받는다.
- 스레드1
- 작업을 마친 스레드1은 스레드 풀로 반환된다.
RUNNABLE
→WAITING
- 작업을 마친 스레드1은 스레드 풀로 반환된다.
- 요청 스레드
es.submit()을 호출하는 부분에서 블로킹이 걸리는게 아니라, future.get() 부분에서 블로킹이 걸리는 것이다. es.submit() 이후 체이닝으로 바로 get()까지 호출하면 이는 생성과 동시에 블로킹을 통해 결과를 기다리는 코드가 될 것이다.
활용
public class SumTaskMainV2 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
SumTask task1 = new SumTask(1, 50);
SumTask task2 = new SumTask(51, 100);
ExecutorService es = Executors.newFixedThreadPool(2);
// Integer sum1 = es.submit(task1).get(); // 블로킹 -> 2초 대기
// Integer sum2 = es.submit(task2).get(); // 블로킹 -> 2초 대기
Future<Integer> future1 = es.submit(task1);
Future<Integer> future2 = es.submit(task2);
Integer sum1 = future1.get(); // 블로킹
Integer sum2 = future2.get(); // 블로킹
log("task1.result=" + sum1);
log("task2.result=" + sum2);
int sumAll = sum1 + sum2;
log("task1 + task2 = " + sumAll);
log("End");
es.close();
}
static class SumTask implements Callable<Integer> {
int startValue;
int endValue;
public SumTask(int startValue, int endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
public Integer call() throws InterruptedException {
log("작업 시작");
Thread.sleep(2000);
int sum = 0;
for (int i = startValue; i <= endValue; i++) {
sum += i;
}
log("작업 완료 result=" + sum);
return sum;
}
}
}
만약 위 코드의 주석 부분처럼 체이닝으로 사용한다면 2개의 스레드가 병렬로 수행되지 못하고 직렬로 순차적으로 실행되게 하여 작업 시간을 늘리는 비효율이 발생할 것이다
Future
라는 개념이 없다면 결과를 받을 때까지 요청 스레드는 아무일도 못하고 대기해야 했을 것이다. 따라서 작업을 동시에 수행할 수도 없다.Future
라는 개념 덕분에 요청 스레드는 대기하지 않고, 다른 작업을 수행할 수 있다. 예를 들어서 다른 작업을 더 요청할 수 있다. 모든 작업 요청이 끝난 다음에, 본인이 필요할 때Future.get()
을 호출해서 최종 결과를 받을 수 있다.- 즉,
Future
는 요청 스레드를 블로킹(대기) 상태로 만들지 않고, 필요한 요청을 모두 수행할 수 있게 해준다. - 필요한 모든 요청을 한 다음에
Future.get()
을 통해 블로킹 상태로 대기하며 결과를 받으면 된다.
Future - 정리
Future
인터페이스 - 주요 메서드
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
enum State {
RUNNING,
SUCCESS,
FAILED,
CANCELLED
}
default State state() {}
}
boolean cancel(boolean mayInterruptIfRunning)
- 기능: 아직 완료되지 않은 작업을 취소한다.
- 매개변수:
mayInterruptIfRunning
cancel(true)
: 이미 작업이 실행 중이라면Thread.interrupt()
를 호출해서 작업을 중단하고Future
를 취소 상태로 변경cancel(false)
: 이미 실행 중인 작업은 계속 실행하고,Future
를 취소 상태로 변경
- 반환값: 작업이 성공적으로 취소된 경우
true
, 이미 완료되었거나 취소할 수 없는 경우false
- 설명: 작업이 실행 중이 아니거나 아직 시작되지 않았으면 취소하고, 실행 중인 작업의 경우
mayInterruptIfRunning
이true
이면 중단을 시도 - 참고: 취소 상태의
Future
에Future.get()
을 호출하면CancellationException
런타임 예외가 발생한다.
boolean isCancelled()
- 기능: 작업이 취소되었는지 여부를 확인한다.
- 반환값: 작업이 취소된 경우
true
, 그렇지 않은 경우false
- 설명: 이 메서드는 작업이
cancel()
메서드에 의해 취소된 경우에true
를 반환한다.
boolean isDone()
- 기능: 작업이 완료되었는지 여부를 확인한다.
- 반환값: 작업이 완료된 경우
true
, 그렇지 않은 경우false
- 설명: 작업이 정상적으로 완료되었거나, 취소되었거나, 예외가 발생하여 종료된 경우에
true
를 반환한다.
State state()
- 기능:
Future
의 상태를 반환한다. 자바 19부터 지원한다.RUNNING
: 작업 실행 중SUCCESS
: 성공 완료FAILED
: 실패 완료CANCELLED
: 취소 완료
- 기능:
V get()
- 기능: 작업이 완료될 때까지 대기(
WAITING
)하고, 완료되면 결과를 반환한다. - 반환값: 작업의 결과
- 예외
InterruptedException
: 대기 중에 현재 스레드가 인터럽트된 경우 발생ExecutionException
: 작업 계산 중에 예외가 발생한 경우 발생
- 설명: 작업이 완료될 때까지
get()
을 호출한 현재 스레드를 대기(블록킹)한다. 작업이 완료되면 결과를 반환한다.
- 기능: 작업이 완료될 때까지 대기(
V get(long timeout, TimeUnit unit)
- 기능:
get()
과 같은데, 시간 초과되면 예외를 발생시킨다. - 매개변수:
timeout
: 대기할 최대 시간unit
:timeout
매개변수의 시간 단위 지정
- 반환값: 작업의 결과
- 예외:
InterruptedException
: 대기 중에 현재 스레드가 인터럽트된 경우 발생ExecutionException
: 작업 계산 중에 예외가 발생한 경우 발생TimeoutException
: 주어진 시간 내에 작업이 완료되지 않은 경우 발생
- 설명: 지정된 시간 동안 결과를 기다린다. 시간이 초과되면
TimeoutException
을 발생시킨다.
- 기능:
ExecutorService - 작업 컬렉션 처리
ExecutorService
는 여러 작업을 한 번에 편리하게 처리하는 invokeAll()
, invokeAny()
기능을 제공한다.
작업 컬렉션 처리
invokeAll()
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
- 모든
Callable
작업을 제출하고, 모든 작업이 완료될 때까지 기다린다.
- 모든
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
- 지정된 시간 내에 모든
Callable
작업을 제출하고 완료될 때까지 기다린다
- 지정된 시간 내에 모든
public class InvokeAllMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newFixedThreadPool(10);
CallableTask task1 = new CallableTask("task1", 1000);
CallableTask task2 = new CallableTask("task1", 2000);
CallableTask task3 = new CallableTask("task1", 3000);
List<CallableTask> tasks = List.of(task1, task2, task3);
List<Future<Integer>> futures = es.invokeAll(tasks);
for (Future<Integer> future : futures) {
Integer value = future.get();
log("value = " + value);
}
es.close();
}
}
invokeAny()
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
- 하나의
Callable
작업이 완료될 때까지 기다리고, 가장 먼저 완료된 작업의 결과를 반환한다. - 완료되지 않은 나머지 작업은 취소한다.
- 하나의
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
- 지정된 시간 내에 하나의
Callable
작업이 완료될 때까지 기다리고, 가장 먼저 완료된 작업의 결과를 반환한다. - 완료되지 않은 나머지 작업은 취소한다.
- 지정된 시간 내에 하나의
public class InvokeAnyMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newFixedThreadPool(10);
CallableTask task1 = new CallableTask("task1", 1000);
CallableTask task2 = new CallableTask("task1", 2000);
CallableTask task3 = new CallableTask("task1", 3000);
List<CallableTask> tasks = List.of(task1, task2, task3);
Integer value = es.invokeAny(tasks);
log("value = " + value);
es.close();
}
}
ExecutorService - 우아한 종료(graceful shutdown)
고객의 주문을 처리하는 서버를 운영중이라고 가정하자.
만약 서버 기능을 업데이트를 위해서 서버를 재시작해야 한다고 가정해보자.
이때 서버 애플리케이션이 고객의 주문을 처리하고 있는 도중에 갑자기 재시작 된다면, 해당 고객의 주문이 제대로 진행되지 못할 것이다.
가장 이상적인 방향은 새로운 주문 요청은 막고, 이미 진행중인 주문은 모두 완료한 다음에 서버를 재시작 하는 것이 가장 좋을 것이다.
이처럼 서비스를 안정적으로 종료하는 것도 매우 중요하다. 이렇게 문제 없이 우아하게 종료하는 방식을 graceful shutdown
이라 한다.
ExecutorService의 종료 관련 메서드
- 서비스 종료
void shutdown()
- 새로운 작업을 받지 않고, 이미 제출된 작업을 모두 완료한 후에 종료 → 우리가 원하는 것
- 논 블로킹 메서드
- 이 메서드를 호출한 스레드는 대기하지 않고 즉시 다음 코드 호출
- 즉, shutdown 호출 이후 바로 종료하면 안되고 모든 작업이 종료될 때까지 기다릴 필요가 있음
우아한 종료(graceful shutdown)
List<Runnable> shutdownNow()
- 실행 중인 작업을 중단하고, 대기 중인 작업을 반환하며 즉시 종료
- 실행 중인 작업을 중단하기 위해 인터럽트를 발생시킨다.
- 논 블로킹 메서드
- 서비스 상태 확인
boolean isShutdown()
: 서비스가 종료되었는지 확인boolean isTerminated()
:shutdown()
,shutdownNow()
호출 후, 모든 작업이 완료되었는지 확인
- 작업 완료 대기
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
- 서비스 종료 시 모든 작업이 완료될 때까지 대기. 이때 지정된 시간까지만 대기한다.
- 블로킹 메서드
- 메인 메서드가
shutdown()
을 호출한다고 해서 스레드 풀이 완전히 종료될 때까지 메인 메서드는 기다리지 않는다.. 하지만,awaitTermination()
을 사용하면 모든 작업이 완료될 때까지 대기해줄 수 있다.
close()
- 자바 19부터 지원하는 서비스 종료 메서드
shutdown()
과 같다고 생각하면 편하지만, 더 정확히는shutdown()
을 호출하고,하루
를 기다려도 작업이 완료되지 않거나 호출한 스레드에 인터럽트가 발생하면shutdownNow()
를 호출한다.
구현
shutdown()
을 호출해서 이미 들어온 모든 작업을 다 처리하고 서비스를 우아하게 종료(graceful shutdown
)하는 것이 가장 이상적이지만, 갑자기 요청이 너무 많이 들어와서 큐에 대기중인 작업이 너무 많아 작업 완료가 어렵거나, 작업이 너무 오래 걸리거나, 또는 버그가 발생해서 특정 작업이 끝나지 않을 수 있다.
=> 이렇게 되면 서비스가 너무 늦게 종료 되거나, 종료되지 않는 문제가 발생할 수 있다.
=> 우아하게 종료하는 시간을 정하고, 해당 시간동안 작업이 우아하게 종료되지 못했을 경우 강제 종료
한다.
close()의 경우 이미 그렇게 구현되어 있지만, 하루는 너무 길다… 직접 구현해보자!
public class ExecutorShutdownMain {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(2);
es.execute(new RunnableTask("taskA"));
es.execute(new RunnableTask("taskB"));
es.execute(new RunnableTask("taskC"));
es.execute(new RunnableTask("longTask", 100_000)); // 100초 대기
printState(es);
log("== shutdown 시작 ==");
shutdownAndAwaitTermination(es);
log("== shutdown 완료 ==");
printState(es);
}
private static void shutdownAndAwaitTermination(ExecutorService es) {
es.shutdown(); // non-blocking, 새로운 작업을 받지 않는다. 처리 중이거나, 큐에 이미 대기중인 작업은 처리한다. 이후에 풀의 스레드를 종료한다.
try {
// 이미 대기중인 작업들을 모두 완료할 때까지 10초 기다린다.
if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
// 정상 종료가 너무 오래 걸리면...
log("서비스 정상 종료 실패 -> 강제 종료 시도");
es.shutdownNow();
// 작업이 취소될 때까지 대기한다.
if (!es.awaitTermination(60, TimeUnit.SECONDS)) {
log("서비스가 종료되지 않았습니다.");
// 이 경우 자바를 강제로 종료할 수 밖에 없음. 인터럽트 설계를 잘못한 것
}
}
} catch (InterruptedException e) {
// awaitTermination()으로 대기 중인 현재 스레드가 인터럽트 될 수 있다.
es.shutdownNow();
}
}
}
11:17:14.252 [pool-1-thread-1] taskA 시작
11:17:14.253 [pool-1-thread-2] taskB 시작
11:17:14.253 [ main] [pool=2, active=2, queuedTask=2, completedTask=0
11:17:14.259 [ main] == shutdown 시작 ==
11:17:15.262 [pool-1-thread-2] taskB 완료
11:17:15.263 [pool-1-thread-2] taskC 시작
11:17:15.263 [pool-1-thread-1] taskA 완료
11:17:15.263 [pool-1-thread-1] longTask 시작
11:17:16.268 [pool-1-thread-2] taskC 완료
11:17:24.261 [ main] 서비스 정상 종료 실패 -> 강제 종료 시도
11:17:24.262 [pool-1-thread-1] 인터럽트 발생, sleep interrupted
11:17:24.263 [ main] == shutdown 완료 ==
11:17:24.263 [ main] [pool=0, active=0, queuedTask=0, completedTask=4
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted
at util.ThreadUtils.sleep(ThreadUtils.java:12)
at thread.executor.RunnableTask.run(RunnableTask.java:23)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep0(Native Method)
at java.base/java.lang.Thread.sleep(Thread.java:509)
at util.ThreadUtils.sleep(ThreadUtils.java:9)
... 4 more
- main 메서드는
shutdown()
을 호출하여 서비스의 우아한 종료를 위해 60초간 기다리지만,longTask
가 너무 오래 걸려 작업을 종료하지 못하는 상황이다. - 이렇게 될 경우,
shutdownNow()
를 통한 강제 종료에 들어간다. - 강제종료 호출한 후에 또 10초간 기다리는 이유는, 인터럽트를 호출하더라도 여러가지 이유로 작업에 시간이 걸릴 수 있기 때문이다. 인터럽트 이후에 자원을 정리하는 어떤 간단한 작업을 수행할 수도 있는데, 이런 시간을 기다려주는 것이다.
- 극단적이지만 최악의 경우, 인터럽트 자체를 받을 수 없도록 코드를 작성해서 강제 종료도 안되는 경우가 있을 수 있는데, 이런 경우에는 자바를 강제 종료하는 수 밖에 없다..
ExecutorService 스레드 풀 관리
이번에는 ThreadPoolExecutor
의 corePoolSize
와 maximumPoolSize
의 차이를 알아보자.
코드
public class PoolSizeMainV1 {
public static void main(String[] args) {
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ExecutorService es = new ThreadPoolExecutor(2, 4,
3000, TimeUnit.MILLISECONDS, workQueue);
printState(es);
es.execute(new RunnableTask("task1"));
printState(es, "task1");
es.execute(new RunnableTask("task2"));
printState(es, "task2");
es.execute(new RunnableTask("task3"));
printState(es, "task3");
es.execute(new RunnableTask("task4"));
printState(es, "task4");
es.execute(new RunnableTask("task5")); // 큐까지 전부다 꽉차야 새로운 active thread를 만들기 시작함
printState(es, "task5");
es.execute(new RunnableTask("task6"));
printState(es, "task6");
try {
es.execute(new RunnableTask("task7"));
} catch (RejectedExecutionException e) {
log("task7 실행 거절 예외 발생: " + e);
}
sleep(3000);
log("== 작업 수행 완료 ==");
printState(es);
sleep(3000);
log("== maximumPoolSize 대기 시간 초과 ==");
printState(es); // corePoolSize만큼 poolSize 감소
es.close();
log("== shutdown 완료 ==");
printState(es);
}
}
- 작업을 보관할 블로킹 큐의 구현체로
ArrayBlockingQueue(2)
를 사용했다. 사이즈를 2로 설정했으므로 최대 2개까지 작업을 큐에 보관할 수 있다. corePoolSize=2
,maximumPoolSize=4
를 사용해서 기본 스레드는 2개, 최대 스레드는 4개로 설정했다.- 스레드 풀에 기본 2개의 스레드를 운영한다. 요청이 너무 많거나 급한 경우 스레드 풀은 최대 4개까지 스레드를 증가시켜서 사용할 수 있다. 이렇게 기본 스레드 수를 초과해서 만들어진 스레드를 초과 스레드라 하겠다.
3000
,TimeUnit.MILLISECONDS
- 초과 스레드가 생존할 수 있는 대기 시간을 뜻한다. 이 시간 동안 초과 스레드가 처리할 작업이 없다면 초과스레드는 제거된다.
- 여기서는 3000 밀리초(3초)를 설정했으므로, 초과 스레드가 3초간 작업을 하지 않고 대기한다면 초과 스레드는 스레드 풀에서 제거된다.
분석
- task1 작업을 요청한다.
Executor
는 스레드 풀에 스레드가 core 사이즈만큼 있는지 확인한다.- core 사이즈만큼 없다면, 스레드를 하나 생성한다
- 작업을 처리하기 위해 스레드를 하나 생성했기 때문에, 큐에 넣을 필요 없이 바로 스레드가 작업을 처리한다.
- task2 요청이 들어와도 마찬가지로 스레드를 하나 생성하고 바로 작업을 처리한다.
- task3 작업을 요청한다.
- 스레드 풀에 스레드가 core 사이즈만큼 이미 만들어져 있고, 스레드 풀에 사용할 수 있는 스레드가 없으므로 이 경우 큐에 작업을 보관한다.
- task4 요청이 들어와도 여전히 사용 가능한 스레드가 없다면 큐에 저장된다.
- task5 작업을 요청한다.
- 스레드 풀에 core 사이즈만큼 스레드가 있고, 큐가 가득 차있다 → 긴급 상황
- 이 경우,
Executor
는max(maximumPoolSize)
사이즈까지 초과 스레드를 만들어서 작업을 수행한다.- 초과 스레드인 스레드3을 만든다.
- 작업을 처리하기 위해 스레드를 하나 생성했기 때문에, 큐에 넣을 필요 없이 바로 스레드가 작업을 처리한다. (어차피 큐가 가득 차서 큐에 넣는 것도 불가능하다)
- 스레드3이 task5를 처리한다.
- 때문에 task5가 더 늦게 전달됐음에도 task5가 task3, task4 보다 먼저 수행된다.
- task6 요청이 들어와도 아직 max까지 스레드를 하나 더 만들 수 있으므로 새로운 스레드를 만들어서 처리한다.
- task7 작업을 요청한다.
- 큐도 가득찼고, 스레드 풀의 스레드도 max 사이즈만큼 가득찼다
RejectedExecutionException
이 발생한다. → 작업을 거절한다.
- 이후 작업을 완료하면 스레드를 반납(
RUNNABLE
→WAITING
)하고, 큐에 작업이 남아있을 경우 다시 스레드가 작업을 획득해서 처리한다. - 모든 작업이 완료된다
- 스레드3, 스레드4와 같은 초과 스레드들은 지정된 시간까지 작업을 하지 않고 대기하면 제거된다. 긴급한 작업들이 끝난 것으로 이해하면 된다.
- 이후에
shutdown()
이 진행되면 풀의 스레드도 모두 제거된다.
정리
- 작업을 요청하면 core 사이즈 만큼 스레드를 만든다.
- core 사이즈를 초과하면 큐에 작업을 넣는다.
- 큐를 초과하면 max 사이즈 만큼 스레드를 만든다. 임시로 사용되는 초과 스레드가 생성된다.
- 큐가 가득차서 큐에 넣을 수도 없다. 초과 스레드가 바로 수행해야 한다.
- max 사이즈를 초과하면 요청을 거절한다. 예외가 발생한다.
- 큐도 가득차고, 풀에 최대 생성 가능한 스레드 수도 가득 찼다. 작업을 받을 수 없다.
스레드 미리 생성하기
응답시간이 아주 중요한 서버라면, 서버가 고객의 처음 요청을 받기 전에 스레드를 스레드 풀에 미리 생성해두고 싶을 수 있다.(원래는 요청이 들어와야 생성하면서 스레드 수를 core 사이즈만큼 늘린다)
스레드를 미리 생성해두면, 처음 요청에서 사용되는 스레드의 생성 시간을 줄일 수 있다.
ThreadPoolExecutor.prestartAllCoreThreads()
를 사용하면 기본 스레드를 미리 생성할 수 있다.
Executor 스레드 풀 전략
자바는 Executors 클래스를 통해 3가지 기본 스레드 풀 전략을 제공한다.
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
newSingleThreadPool()
: 단일 스레드 풀 전략- 스레드 풀에 기본 스레드 1개만 사용
- 큐 사이즈에 제한 X (by
LinkedBlockingQueue
) - 주로 간단히 사용하거나, 테스트 용도로 사용
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
newFixedThreadPool(nThreads)
: 고정 스레드 풀 전략- 스레드 풀에
nThreads
만큼의 기본 스레드를 생성 (초과 스레드는 생성 X)nThreads = 1
이면 단일 스레드 풀 전략과 동일
- 큐 사이즈에 제한 X (by
LinkedBlockingQueue
) - 장점
- 스레드 수가 고정 => 리소스 어느정도 예측 가능 => 안정적
- 문제점 -> 갑작스럽게 요청이 크게 증가하는 경우
- 스레드가 고정되어 있으니, 사용자가 늘어나도 CPU, 메모리 사용량이 확 늘어나지 않지만
- 큐의 사이즈를 확인해보니 요청이 무수히 많이 쌓여있게 된다.
- 요청이 처리되는 시간보다 쌓이는 시간이 더 빠른 것
- => 결국 서버 자원은 여유가 있는데, 사용자만 점점 느려지는 문제가 발생한다.
- 일은 바빠죽겠는데 서버는 여유부리는 상황 → 일이 많으면 더 빠르게 일하거나 일손을 늘려야 한다.
- 스레드 풀에
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
newCachedThreadPool()
: 캐시 스레드 풀 전략- 기본 스레드를 사용하지 않고, 60초 생존 주기를 가진 초과 스레드만 사용
- 초과 스레드의 수는 제한 X
- 큐에 작업을 저장 X (by
SynchronousQueue
) - 장점
- 모든 요청이 대기하지 않고 스레드가 바로바로 처리 => 빠른 처리
- 초과 스레드의 수도 제한이 없어서 시스템의 자원을 최대로 사용 가능
- 작업의 요청 수에 따라서 스레드 수도 증가하고 감소 => 유연함
- 점진적으로 사용자가 늘어나는 경우에는 크게 문제 X. 단, CPU 및 메모리의 사용량이 증가하니 이에 맞게 적절한 시점에 증설해주어야 한다.
- 문제점 -> 갑작스럽게 요청이 크게 증가하는 경우
- 너무 많은 스레드가 작업을 처리하면서 시스템 전체가 느려지는 현상이 발생
- 스레드가 무한으로 생성될 수 있는데, 시스템은 너무 많은 스레드에 잠식 당해서 거의 다운된다(메모리도 거의 다 사용되어 버린다.)
- 시스템이 멈추는 장애가 발생한다.
- 무식하게 일이 많다고 일손만 잔뜩 늘리면 사장은 파산한다..
사용자 정의 풀 전략
다음과 같이 세분화된 전략을 사용하면 상황1, 상황2를 모두 어느정도 대응할 수 있다
- 일반: 일반적인 상황에는 CPU, 메모리 자원을 예측할 수 있도록 고정 크기의 스레드로 서비스를 안정적으로 운영한다.
- 긴급: 사용자의 요청이 갑자기 증가하면 긴급하게 스레드를 추가로 투입해서 작업을 빠르게 처리한다.
- 거절: 사용자의 요청이 폭증해서 긴급 대응도 어렵다면 사용자의 요청을 거절한다.
- 예시)
- 100개의 기본 스레드를 사용한다.
- 추가로 긴급 대응 가능한 긴급 스레드 100개를 사용한다. 긴급 스레드는 60초의 생존 주기를 가진다.
- 1000개의 작업이 큐에 대기할 수 있다.
ExecutorService es = new ThreadPoolExecutor(100, 200, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
public class PoolSizeMainV4 {
// static final int TASK_SIZE = 1100; // 일반
// static final int TASK_SIZE = 1200; // 긴급 -> 더 빨리 끝남 (알바 한명 더 부르는거)
static final int TASK_SIZE = 1201; // 거절
public static void main(String[] args) {
ExecutorService es = new ThreadPoolExecutor(100, 200,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
printState(es);
long startMs = System.currentTimeMillis();
for (int i = 1; i <= TASK_SIZE; i++) {
String taskName = "task" + i;
try {
es.execute(new RunnableTask(taskName));
printState(es, taskName);
} catch (RejectedExecutionException e) {
log(taskName + " -> " + e);
}
}
es.close();
long endMs = System.currentTimeMillis();
log("time: " + (endMs - startMs));
}
}
- 일반: 1000개 이하의 작업이 큐에 담겨있다 → 100개의 기본 스레드가 돌아가며 큐에서 작업을 꺼내 처리한다.
- 최대 1000개의 작업이 큐에 대기, 100개의 작업이 실행 중 → 1100개까지는 기본 스레드로 처리가능
- 작업을 모두 처리하는데
1100 / 100 = 11
초가 걸린다.
- 긴급: 큐에 담긴 작업이 1000개를 초과한다 → 100개의 기본 스레드 + 100개의 초과 스레드가 처리한다.
- 최대 1000개의 작업이 큐에 대기, 200개의 작업이 실행 중 → 1200개까지는 긴급 스레드까지 붙여서 처리가능
- 작업을 모두 처리하는데
1200 / 200 → 6
초가 걸린다. - 긴급 투입한 스레드 덕분에 풀의 스레드 수가 2배가 된다. 따라서 작업을 2배 빠르게 처리한다.
- 물론 CPU, 메모리 사용을 더 하기 때문에 이런 부분을 감안해서 긴급 상황에 투입할 최대 스레드를 정해야 한다.
- 거절: 초과 스레드를 투입했지만, 큐에 담긴 작업 1000개를 초과하고 또 초과 스레드도 넘어간 상황 → 이 경우 예외 발생
- 긴급 투입한 스레드로도 작업이 빠르게 소모되지 않는다는 것은, 시스템이 감당하기 어려운 많은 요청이 들어오고 있다는 의미
- 큐에 대기하는 작업 1000개 + 스레드가 처리 중인 작업 200개 → 총 1200개의 작업을 초과하면 예외가 발생한다.
maximumPoolSize + workQueue 사이즈
= 최대 처리 가능한 작업 수- 따라서 1201번에서 예외가 발생한다.
- 이런 경우 요청을 거절한다. 고객 서비스라면 시스템에 사용자가 너무 많으니 나중에 다시 시도해달라고 해야한다.
실무에서 자주 하는 실수
⇒ 큐 사이즈를 무한대로 설정하는 실수..
- 이렇게 설정하면 절대로 스레드 풀의 최대 사이즈만큼 늘어나지 않는다. 큐가 가득차야 긴급 상황으로 인지되는데,
LinkedBlockingQueue
를 기본 생성자를 통해 무한대의 사이즈로 사용하게 되면, 큐가 가득찰 수가 없다! - 결국 기본 스레드로만 무한대의 작업을 처리하게 된다.
Executor 예외 정책
생산자 소비자 문제를 실무에서 사용할 때는, 결국 소비자가 처리할 수 없을 정도로 생산 요청이 가득 차면 어떻게 할지를 정해야 한다. 개발자가 인지할 수 있게 로그도 남겨야 하고, 사용자에게 현재 시스템에 문제가 있다고 알리는 것도 필요하다.
=> 이런 것을 위해 예외 정책이 필요하다.
ThreadPoolExecutor
에 작업을 요청할 때, 큐도 가득차고, 초과 스레드도 더는 할당할 수 없다면 작업을 거절한다.
⇒ ThreadPoolExecutor
는 작업을 거절하는 다양한 정책을 제공한다.
AbortPolicy
: 새로운 작업을 제출할 때RejectedExecution
을 발생시킨다. 기본 정책.DiscardPolicy
: 새로운 작업을 조용히 버린다.CallerRunsPolicy
: 새로운 작업을 제출한 스레드가 대신해서 직접 작업을 실행한다.- 작업을 거절하는 대신에, 작업을 요청한 스레드인 main 스레드가 작업을 수행한다.
- 생산자 스레드가 대신 일을 수행하는 덕분에 작업의 생산 자체가 느려져서 생산 속도를 조절할 수 있다는 특징이 있다.
- 그런데
CallerRunsPolicy
는shutdown()
이후에도 작업을 수행해버리는 문제가 있다.. 따라서shutdown()
조건을 체크해서 이 경우에는 작업을 수행하지 않도록 한다.
사용자 정의(RejectedExecutionHandler)
:RejectedExecutionHandler
을 구현하여 개발자가 직접 정의한 거절 정책을 사용할 수 있다.
public class RejectMain {
public static void main(String[] args) {
ExecutorService executor = new ThreadPoolExecutor(1, 1, 0,
TimeUnit.SECONDS, new SynchronousQueue<>(), new MyRejectedExecutionHandler());
executor.submit(new RunnableTask("task1"));
executor.submit(new RunnableTask("task2")); // main 스레드가 실행
executor.submit(new RunnableTask("task3"));
executor.close();
}
static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
static AtomicInteger count = new AtomicInteger(0);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
int i = count.incrementAndGet();
log("[경고] 거절된 누적 작업 수: " + i);
}
}
}
ThreadPoolExecutor를 shutdown() 하면 이후에 요청하는 작업을 거절하는데, 이때도 같은 정책이 적용된다!
'Java' 카테고리의 다른 글
[Java] 소켓 통신과 네트워크 예외 (0) | 2025.01.10 |
---|---|
[Java] 제네릭(Generics) (0) | 2025.01.07 |
[Java] 동시성 컬렉션 (0) | 2025.01.06 |
[Java] CAS - 동기화와 원자적 연산 (1) | 2025.01.06 |
[Java] 스레드 - 생산자 소비자 문제 (0) | 2025.01.06 |