티스토리 뷰

728x90
반응형

💡상황

ThreadPoolTaskExecutor를 사용하여 이용가능한 Thread의 숫자가 100개였지만 트레픽이 몰려 101개의 요청이 들어온 상황으로 가정하였습니다.

 

ThreadPoolTaskExecutor 설정
  • setMaxPoolSize는 90으로 설정
  • setQueueCapacity는 10으로 설정 
  • 한번에 받을 수 있는 요청은 100으로 가정
@Configuration
@EnableAsync
public class AsyncConfig extends AsyncConfigurerSupport {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("async-");
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(90);
        executor.setQueueCapacity(10);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
}

 

100개의 요청을 보낸 경우

반복문을 사용하여 100개의 요청을 보내는 경우 최대 사용가능한 Thread의 수는 100개이므로 예외가 발생하지 않습니다.

@SpringBootApplication
@RequiredArgsConstructor
public class ExampleApplication {

    private final PoolService poolService;

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqExampleApplication.class, args);
    }

    @PostConstruct
    public void request() throws InterruptedException {
        for (int i = 1; i <= 100; i++) {
            poolService.order("kdg - " + i);
        }
    }
}

@Slf4j
@Service
public class PoolService {

    @Async
    public ListenableFuture<String> order(String name) throws InterruptedException {
        log.info("order name : " + name);
        Thread.sleep(3000);
        return new AsyncResult<>(name);
    }
}

 

101개의 요청을 보낸 경우 🧨

위의 예제 코드와 다른 부분은 반복문이 101까지 반복한다는 것입니다. 현재 최대 사용가능한 Thread는 100개이므로 예외를 발생하게 됩니다. org.springframework.core.task.TaskRejectedException 발생!!

@SpringBootApplication
@RequiredArgsConstructor
public class ExampleApplication {

    private final PoolService poolService;

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqExampleApplication.class, args);
    }

    @PostConstruct
    public void request() throws InterruptedException {
        for (int i = 1; i <= 101; i++) {
            poolService.order("kdg - " + i);
        }
    }
}

@Slf4j
@Service
public class PoolService {

    @Async
    public ListenableFuture<String> order(String name) throws InterruptedException {
        log.info("order name : " + name);
        Thread.sleep(3000);
        return new AsyncResult<>(name);
    }
}

 

예외를 처리해보자!


RejectedExecutionHandler

RejectedExecutionHandler는 ThreadPoolExecutor에서 task를 더 이상 받을 수 없을 때 호출됩니다. 이런 경우는 큐 허용치를 초과하거나 Executor가 종료되어서 Thread 또는 큐 슬롯을 사용할 수 없는 경우에 발생하게 됩니다.

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

 

Reject Policy

💡ThreadPoolExecutor.AbortPolicy

  • 기본적으로 설정되어 있는 정책입니다. Reject된 task가 RejectedExecutionException을 발생시킵니다.
@Configuration
@EnableAsync
public class AsyncConfig extends AsyncConfigurerSupport {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("async-");
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(90);
        executor.setQueueCapacity(10);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.initialize();
        return executor;
    }
}

@Slf4j
@Service
public class PoolService {

    @Async
    public ListenableFuture<String> order(String name) throws InterruptedException {
        System.out.println(Thread.currentThread().getName());
        Thread.sleep(3000);
        return new AsyncResult<>(name);
    }
}

@SpringBootApplication
@RequiredArgsConstructor
public class ExampleApplication {

    private final PoolService poolService;

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqExampleApplication.class, args);
    }

    @PostConstruct
    public void request() throws InterruptedException {
        for (int i = 1; i <= 101; i++) {
            poolService.order("kdg - " + i);
        }
    }
}

 

💡ThreadPoolExecutor.CallerRunsPolicy

  • 호출한 Thread에서 reject된 task를 대신 실행하게 됩니다.
  • 실행 결과를 보면 main Thread에서 작업한 것을 알 수 있습니다.
@Configuration
@EnableAsync
public class AsyncConfig extends AsyncConfigurerSupport {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("async-");
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(90);
        executor.setQueueCapacity(10);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

@Slf4j
@Service
public class PoolService {

    @Async
    public ListenableFuture<String> order(String name) throws InterruptedException {
        System.out.println(Thread.currentThread().getName());
        Thread.sleep(3000);
        return new AsyncResult<>(name);
    }
}

@SpringBootApplication
@RequiredArgsConstructor
public class ExampleApplication {

    private final PoolService poolService;

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqExampleApplication.class, args);
    }

    @PostConstruct
    public void request() throws InterruptedException {
        for (int i = 1; i <= 101; i++) {
            poolService.order("kdg - " + i);
        }
    }
}

 

💡ThreadPoolExecutor.DiscardPolicy

  • Reject된 task는 버려지게 됩니다. 또한 Exception도 발생하지 않습니다.
  • 실행결과를 보면 setMaxPoolSize로 설정한 90개의 Thread만 사용을하고 setQueueCapacity로 설정한 10개의 Thread는 사용을 하지 않았습니다.
@Configuration
@EnableAsync
public class AsyncConfig extends AsyncConfigurerSupport {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("async-");
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(90);
        executor.setQueueCapacity(10);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        return executor;
    }
}


@SpringBootApplication
@RequiredArgsConstructor
public class ExampleApplication {

    private final PoolService poolService;

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqExampleApplication.class, args);
    }

    @PostConstruct
    public void request() throws InterruptedException {
        for (int i = 1; i <= 101; i++) {
            poolService.order("kdg - " + i);
        }
    }
}

@Slf4j
@Service
public class PoolService {

    @Async
    public ListenableFuture<String> order(String name) throws InterruptedException {
        System.out.println(Thread.currentThread().getName());
        Thread.sleep(3000);
        return new AsyncResult<>(name);
    }
}

 

💡ThreadPoolExecutor.DiscardOldestPolicy

  • 실행자를 종료하지 않는 한 가장 오래된 처리되지 않는 요청을 삭제하고 execute()를 다시 시도하게 됩니다.
  • 실행 결과는 DiscardPolicy와 동일합니다.(일부 요청은 유실됩니다.)
@Configuration
@EnableAsync
public class AsyncConfig extends AsyncConfigurerSupport {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("async-");
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(90);
        executor.setQueueCapacity(10);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        executor.initialize();
        return executor;
    }
}

@Slf4j
@Service
public class PoolService {

    @Async
    public ListenableFuture<String> order(String name) throws InterruptedException {
        System.out.println(Thread.currentThread().getName());
        Thread.sleep(3000);
        return new AsyncResult<>(name);
    }
}

@SpringBootApplication
@RequiredArgsConstructor
public class ExampleApplication {

    private final PoolService poolService;

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqExampleApplication.class, args);
    }

    @PostConstruct
    public void request() throws InterruptedException {
        for (int i = 1; i <= 101; i++) {
            poolService.order("kdg - " + i);
        }
    }
}

 

 

그렇다면 어떤 방식을 사용해야할까?

🤔

처음에는 CorePoolSize를 유동적으로 늘려줘야하나? 라는 생각이 들었습니다. 하지만 유동적으로 늘린 PoolSize의 크기를 다시 줄여주지 못해서 이 방식을 사용하지 않았으며, 기존에 위에서 설명한 방식을 사용하지 않고 RuntimeException을 발생시켜 클라이언트에게 예외를 넘겨 잠시 후 다시 요청할 수 있는 로직으로 가야되겠구나 생각을 하였습니다.

 

클라이언트에게 예외를 넘기는 이유

1. 이미 풀이 차서 계속 에러가 발생하는 상황에서 커넥션을 물고 무엇인가를 해줄려고한다면 장애가 전파되지 않을까? 라는 생각

2. fail fast가 오히려 좋은 답이 될수도 있다는 생각

 

💡 해결 방안

@Configuration
@EnableAsync
public class AsyncConfig extends AsyncConfigurerSupport {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("async-");
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(90);
        executor.setQueueCapacity(10);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
               throw new RuntimeException("잠시 후 다시 요청해주세요.");
            }
        });
        executor.initialize();
        return executor;
    }
}

 

더 좋은 방안이 있으면 댓글로 남겨주시면 감사하겠습니다!

 

 

 

참고자료)

https://www.baeldung.com/java-rejectedexecutionhandler

https://deep-dive-dev.tistory.com/11

https://jessyt.tistory.com/171

https://stackoverflow.com/questions/3446011/threadpoolexecutor-block-when-queue-is-full

https://118k.tistory.com/656

 

 

 

 

728x90
반응형