티스토리 뷰
728x90
반응형
Future의 단순 활용
- 자바 5부터는 미래의 어느 시점에 결과를 얻는 모델에 활용할 수 있도록 Future 인터페이스를 제공하고 있습니다.
💡 Future를 사용하여 비동기 처리
- 아래 예제에서는 doSomeThing 이라는 작업이 오래 걸려 다른 스레드에게 작업을 맡기고 나는 나의 일을 하고 있는 예제입니다.
하지만 스레드에게 넘겨 준 일이 영원히 끝나지 않으면 문제가 발생하게(나의 상태가 블럭(대기)됨) 됩니다. 그렇기 때문에 적절한
타임 아웃을 설정하는 것이 좋습니다.
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<Double> future = executorService.submit(new Callable<Double>() {
@Override
public Double call() throws Exception {
return doSomeThing(); // 시간이 오래 걸리는 작업을 다른 스레드에게 맡김
}
});
otherSomeThing(); // 비동기를 진행하는 동안 다른 작업 진행
try {
Double result = future.get(1, TimeUnit.SECONDS); // 비동기의 결과를 가져오고, 결과가 준비되어 있지 않다면 예외 호출됨
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
private static double doSomeThing() {
try {
Thread.sleep(5000);
System.out.println("오래 걸리는 작업");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
}
private static void otherSomeThing() {
System.out.println("다른 작업 진행중~ ");
}
💡 Future의 제한
- 두 개의 비동기 계산 결과를 하나로 합칩니다. 두 가지 계산 결과는 서로 독립적일 수 있으며, 두 번째 결과가 첫 번째 결과에 의존하는 상황일 수 있습니다.
- Future의 집합이 실행하는 모든 태스크의 완료를 기다립니다.
- Future 집합에서 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻습니다.
- Future 완료 동작에 반응합니다. 즉 결과를 기다리면서 블럭되지 않고 결과가 준비되었다는 알림을 받은 다음에 Future의 결과로 원하는 추가 동작을 수행할 수 있습니다.
비동기 API 구현
💡 비동기를 사용하지 않은 예제
- 사용자가 최저가격 검색 애플리케이션을 호출하면 비동기 동작이 완료될 때까지 1초동안 대기 상태가 되는 예제입니다.
@Getter
public class Shop {
private final String name;
private final Random random;
public Shop(String name) {
this.name = name;
this.random = new Random(name.charAt(0 * name.charAt(1)));
}
// getPrice를 구현해 상품의 가격을 반환하는 메서드입니다. 비정상적이지만 예제니까!
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
// 우리는 실제 호출할 서비스를 구현할 수 없으므로 다른 네트워크 상의 서비스를 delay라는 메서드로 대신합니다.
private void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
예제 실행
- 예제를 실행하면 shopList에는 5개의 Shop이 있으므로 총 5초의 시간이 걸리는 것을 알 수 있습니다.
public static void main(String[] args) {
long start = System.nanoTime();
System.out.println(findProductPrice("아이폰 12"));
long duration = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Done is " + duration + " msecs");
}
private static List<String> findProductPrice(String product) {
List<Shop> shopList = List.of(new Shop("아이폰 12"),
new Shop("갤럭시 노트"),
new Shop("맥북"),
new Shop("그램"),
new Shop("아이폰 14"));
return shopList.stream()
.map(shop -> shop.getName() + "의 가격은 " + shop.getPrice(product))
.collect(Collectors.toList());
}
💡 비동기를 사용한 예제
- Shop 클래스에서 getPriceAsync 메서드를 만들어 비동기 함수를 사용하고 있습니다.
@Getter
public class Shop {
private final String name;
private final Random random;
public Shop(String name) {
this.name = name;
this.random = new Random(name.charAt(0 * name.charAt(1)));
}
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price);
})
.start();
return futurePrice;
}
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
private void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) {
long start = System.nanoTime();
System.out.println(findProductPrice("아이폰 12"));
long duration = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Done is " + duration + " msecs");
}
private static List<String> findProductPrice(String product) {
List<Shop> shopList = List.of(new Shop("아이폰 12"),
new Shop("갤럭시 노트"),
new Shop("맥북"),
new Shop("그램"),
new Shop("아이폰 14"));
return shopList.stream()
.map(shop -> shop.getName() + "의 가격은 " + shop.getPriceAsync(product))
.collect(Collectors.toList());
}
}
💡 비동기 예외 처리 방법
- 위의 getPriceAsync 메서드에서 에러가 발생하면 어떻게 될까요? 그렇게 된다면 블럭 상태가 되며 계속 기다리게 될 것입니다.
이처럼 블럭 문제가 발생하는 상황에서는 적절한 타임아웃을 사용해야 합니다. - 클라이언트는 타임아웃 값을 받는 get 메서드의 오버로드 버전을 만들어 이 문제를 해결할 수 있습니다. 이처럼 블럭 문제가 발생할 수 있는 상황에서는 적절한 타임아웃을 사용하는게 좋으며, 그래야 문제가 발생했을 때 클라이언트가 영원히 블럭되지 않고 타임 아웃 시간이 지나면 Exception을 받을 수 있습니다.
public Future<Double> getPriceAsyncV2(String product) {
// 상점의 DB를 이용해 가격 정보를 얻는 동시에 다른 외부 서비스에 접근
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception e) {
futurePrice.completeExceptionally(e);
}
})
.start();
return futurePrice;
}
💡 supplyAsync로 CompletableFuture 만들기
- 지금까지는 CompletableFuture를 직접 만들었지만 조금 더 간단히 만들 수 있습니다.
- supplyAsync는 Supplier를 인수로 받아서 CompletableFuture를 반환합니다. CompletableFuture는 Supplier를 실행해서
비동기적으로 결과를 생성합니다.
// CompletableFuture의 함수
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
비블럭 코드 만들기
💡 병렬 스트림으로 만들기
- 아래 예제에서는 stream을 사용하지 않고 parallelStream을 사용하여 병렬로 성능을 높였습니다.
@Getter
public class Shop {
private final String name;
private final Random random;
public Shop(String name) {
this.name = name;
this.random = new Random(name.charAt(0 * name.charAt(1)));
}
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
private void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) {
long start = System.nanoTime();
System.out.println(findProductPrice("아이폰 12"));
long duration = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Done is " + duration + " msecs"); // Done is 1256 msecs
}
private static List<String> findProductPrice(String product) {
List<Shop> shopList = List.of(new Shop("아이폰 12"),
new Shop("갤럭시 노트"),
new Shop("맥북"),
new Shop("그램"),
new Shop("아이폰 14"));
return shopList.parallelStream()
.map(shop -> shop.getName() + "의 가격은 " + shop.getPrice(product))
.collect(Collectors.toList());
}
}
🤔 병렬처리 parallelStream을 사용하면 편리한거 같은데 과연 좋을까?
- 병렬 처리이기 때문에 기본적으로 순서를 보장하지 않습니다.
- parallelStream은 parallelStream별로 스레드 풀을 만드는게 아닙니다. 별도의 설정이 없으면 하나의 스레드 풀은 모든 parllelStream이 공유하게 됩니다. 이때 기본적으로 사용되는 풀은 ForkJoinPool.commonPool 입니다.
🧨 Race Condition(경쟁 상태) 발생
- 아래 예제는 별도의 설정 없이 commonPool을 공유하게 되는 경우 발생하는 문제입니다. 스레드가 서로 공유하는 자원에 대해
Race Condition이 발생하게 됩니다.
@Getter
public class Calculator {
private int x;
public void add(int x) {
this.x += x;
}
}
public class Main {
public static void main(String[] args) {
// Race Condition 발생
Calculator calculatorA = new Calculator();
Calculator calculatorB = new Calculator();
IntStream.rangeClosed(0, 100)
.parallel()
.forEach(i -> calculatorA.add(i));
IntStream.rangeClosed(0, 100)
.parallel()
.forEach(i -> calculatorB.add(i));
System.out.println("calculatorA : " + calculatorA.getX()); // 4971
System.out.println("calculatorB : " + calculatorB.getX()); // 5029
}
}
🤔 CompletableFuture와 ParallelStream의 작은 차이
- JDK에는 commonPool을 사용하는 CompleatableFuture과 ParallelStream이 있습니다. 이 두가지 중 중요한 차이는 CompletableFuture 같은 경우 commonPool을 사용하지 않도록 별도의 Thread Pool을 설정할 수 있으나 ParllelStream 같은 경우는 별도의 설정이 불가능합니다.
💡 CompletableFuture로 비동기 처리
- findProductPrice 메서드에서 두 map 연산을 하나의 스트림처리에서 처리하지 않고 두 개의 스트림 파이프라인으로 처리했습니다.
스트림 연산은 Lazy 특성이 있어서 하나의 파이프라인으로 연산을 처리하면 모든 정보요청 동작이 동기적, 순차적으로 이루어집니다.
@Getter
public class Shop {
private final String name;
private final Random random;
public Shop(String name) {
this.name = name;
this.random = new Random(name.charAt(0 * name.charAt(1)));
}
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
private void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
long start = System.nanoTime();
System.out.println(findProductPrice("아이폰 12"));
long duration = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Done is " + duration + " msecs"); // Done is 1268 msecs
}
private static List<String> findProductPrice(String product) {
List<Shop> shopList = List.of(new Shop("아이폰 12"),
new Shop("갤럭시 노트"),
new Shop("맥북"),
new Shop("그램"),
new Shop("아이폰 14"));
List<CompletableFuture<String>> futures = shopList.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + "의 가격은 " + shop.getPrice(product)))
.collect(Collectors.toList());
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
💡 커스텀 Executor를 만드는 방법
- 우선 커스텀 Executor를 만들기 위해서는 스레드풀 최적값을 찾아야 합니다. 최적값이 필요한 이유는 스레드 풀이 너무 크면 CPU와
메모리 자원을 서로 경쟁하느라 시간을 낭비할 수 있고, 반면 스레드 풀이 너무 작으면 CPU의 일부 코어는 활용되지 않을 수 있습니다.
또한 "자바 병렬 프로그래밍" 에서는 Nthreads = Ncpu * Ucpu * (1 + W/C)로 나타내고 있습니다. - Ncpu는 Runtime.getRuntime().availableProcessors() 가 반환하는 코어 수
- Ucpu 는 0과 1 사이의 값을 갖는 CPU활용 비율
- W/C는 대기시간과 계산시간의 비율
- 아래 커스텀 Executor에서는 데몬 스레드를 사용하고 있습니다. 자바에서 일반 스레드가 실행중이면 자바 프로그램은 종료되지 않습니다. 따라서 어떤 이벤트를 한없이 기다리면서 종료되지 않는 일반 스레드가 있으면 문제가 발생할 수 있습니다. 반면 데몬 스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있습니다. 성능은 같으므로 무한히 기다리는 것을 방지하기 위해 데몬 스레드로 설정해줍니다.
public class Main {
private static List<Shop> shopList = List.of(new Shop("아이폰 12"),
new Shop("갤럭시 노트"),
new Shop("맥북"),
/// 90개
new Shop("그램"),
new Shop("아이폰 14"));
private static final Executor executor = Executors.newFixedThreadPool(Math.min(shopList.size(), 100),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
});
public static void main(String[] args) {
long start = System.nanoTime();
System.out.println(findProductPrice("아이폰 12"));
long duration = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Done is " + duration + " msecs");
}
private static List<String> findProductPrice(String product) {
List<CompletableFuture<String>> futures = shopList.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + "의 가격은 " + shop.getPrice(product), executor))
.collect(Collectors.toList());
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
}
비동기 작업 파이프라인 만들기
💡 사용될 예제 코드
public enum Code {
NONE(0), SILVER(1), GOLD(2), PLATINUM(3), DIAMOND(4);
@Getter
private final int percentage;
Code(int percentage) {
this.percentage = percentage;
}
}
public class Discount {
private Code code;
public static String applyDiscount(Quote quote) {
return quote.getShopName() + " 가격은 " + apply(quote.getPrice(), quote.getDiscountCode());
}
private static double apply(double price, Code code) {
delay();
return price * (100 - code.getPercentage() / 100);
}
public static void delay() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Getter
public class Quote {
private final String shopName;
private final double price;
private final Code discountCode;
public Quote(String shopName, double price, Code discountCode) {
this.shopName = shopName;
this.price = price;
this.discountCode = discountCode;
}
public static Quote parse(String s) {
String[] split = s.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Code discountCode = Code.valueOf(split[2]);
return new Quote(shopName, price, discountCode);
}
}
@Getter
public class Shop {
private final String name;
private final Random random;
public Shop(String name) {
this.name = name;
this.random = new Random(name.charAt(0 * name.charAt(1)));
}
public String getPrice(String product) {
double price = calculatePrice(product);
Code code = Code.values() [random.nextInt(Code.values().length)];
return name + ":" + price + ":" + code;
}
private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
public void delay() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
💡 예제 실행
- findPrices 메서드의 첫번째 map에서는 각 상점에 요청한 제품의 할인 가격을 반환하면서 1초의 시간이 걸립니다.
- 두번째 map에서는 이들 문자열을 파싱해 Quote 객체를 만듭니다.
- 세번째 map에서는 할인 서비스에 접근해 최종 할인 가격을 계산하고, 가격에 대응하는 상점 이름을 포함한 문자열을 반환합니다. 1초
- 순차적으로 실행하니 첫 번째 map에서 5초 세 번째 map에서 5초 총 10초의 시간이 소요되고 있습니다.
public class Client {
private static final List<Shop> shopList = List.of(
new Shop("아이폰 12"),
new Shop("갤럭시 노트"),
new Shop("맥북"),
new Shop("그램"),
new Shop("아이폰 14")
);
private static final Executor executor = Executors.newFixedThreadPool(Math.min(shopList.size(), 100),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
});
public static void main(String[] args) {
long start = System.nanoTime();
System.out.println(findPrices("IPhone"));
long duration = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Done is " + duration + " msecs"); // Done is 10246 msecs
}
public static List<String> findPrices(String product) {
return shopList.stream()
.map(shop -> shop.getPrice(product))
.map(Quote::parse)
.map(Discount::applyDiscount)
.collect(Collectors.toList());
}
}
💡 동기 작업과 비동기 작업 조합
- 코드가 많이 복잡해졌지만 열심히 하나씩 살펴보겠습니다.
public static List<String> findPrices(String product) {
List<CompletableFuture<String>> futures = shopList.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)) // return => Stream<CompletableFuture<String>>
.map(future -> future.thenApply(Quote::parse)) // return => Stream<CompletableFuture<Quote>>
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))) // return => Stream<CompletableFuture<String>>
.collect(Collectors.toList());
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
가격 정보 얻기
- 첫번째 연산은 supplyAsync 메서드를 사용해 비동기적으로 상점에서 가격을 조회합니다. 두번째 매개변수를 통해 우리가 만든 Executor를 사용하여 작업을 수행합니다. 반환값은 Stream<CompletableFuture<String>>이며, 상점에서 반환하는 문자열 정보를 포함하고 있습니다.
Quote 파싱하기
- 두번째 연산은 첫번째 결과로 얻은 Stream<CompletableFuture<String>>을 사용하여 map을 돌리면 CompletableFuture<String>의 상태가 되며 각 Completable<String>에 thenApply 메서드를 호출한 다음 Quote 인스턴스를 반환받고 있습니다.
- thenApply 메서드는 CompletableFuture가 끝날 때까지 블럭(대기)하지 않는다는 점의 주의해야 합니다.
즉 CompletableFuture가 동작을 완전히 완료한 다음에 thenApply 메서드로 전달된 람다 표현식을 적용할 수 있습니다. - thenApply 메서드의 역할은 비동기 연산의 결과로 받은 문자열(String)을 통해 새로운 것을 반환하는 역할을 하는것 같습니다. Function 인터페이스를 통해 T타입의 인자를 받아 R 타입의 객체를 반환
CompletableFuture를 조합해서 할인된 가격 계산하기
- 세번째 map 연산에서는 상점에서 받은 할인전 가격에 원격 Discount 서비스에서 제공하는 할인율을 적용해야 합니다. 여기서는 Discount 서비스에 지연(1초)이 되므로 동기적으로 수행해야 합니다.
- 여기서는 CompletableFuture.supplyAsync 메서드를 통해 Discount 서비스에 접근하여 할인이 적용된 가격을 비동기적으로 가져오고 있으며 thenCompose 메서드를 통해 비동기 연산을 파이프라인으로 만들 수 있습니다.
CompletableFuture.join
- 마지막에는 List에 담겨있는것을 stream을 사용해 변환하며 completableFuture::join을 사용하고 있습니다. join 메서드는 ComputableFuture가 끝나기를 기다리는 블럭(대기) 메서드라고 할 수 있으며, 콜백으로 넘긴 작업이 끝나고 난 이후에 결과값을 반환하게 됩니다.
- get, join의 차이는 get은 Checked Exception을 던지고 join은 Unchecked Exception을 던지고 있습니다.
728x90
반응형
'스터디 > 모던 자바 인 액션' 카테고리의 다른 글
모던 자바 인 액션 - 18장 함수형 관점으로 생각하기 (0) | 2022.09.24 |
---|---|
모던 자바 인 액션 - 17장 리액티브 프로그래밍 (0) | 2022.09.24 |
모던 자바 인 액션 - 13장 디폴트 메서드 (0) | 2022.09.17 |
모던 자바 인 액션 - 11장 null 대신 Optional 클래스 (0) | 2022.09.15 |
모던 자바 인 액션 - 9장 리펙터링, 테스팅, 디버깅 (0) | 2022.09.11 |
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
TAG
- 서비스 기반 아키텍처
- 자바 백엔드 개발자 추천 도서
- 레이어드 아키텍처란
- spring boot excel download paging
- 공간 기반 아키텍처
- pipeline architecture
- spring boot redis 대기열 구현
- @ControllerAdvice
- service based architecture
- spring boot redisson sorted set
- JDK Dynamic Proxy와 CGLIB의 차이
- polling publisher spring boot
- 트랜잭셔널 아웃박스 패턴 스프링부트
- redis sorted set으로 대기열 구현
- pipe and filter architecture
- redis 대기열 구현
- spring boot excel download oom
- spring boot 엑셀 다운로드
- spring boot redisson 분산락 구현
- java userThread와 DaemonThread
- transactional outbox pattern spring boot
- 람다 표현식
- redis sorted set
- microkernel architecture
- java ThreadLocal
- 트랜잭셔널 아웃박스 패턴 스프링 부트 예제
- spring boot redisson destributed lock
- transactional outbox pattern
- spring boot poi excel download
- space based architecture
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
글 보관함