티스토리 뷰

728x90
반응형

Future의 단순 활용


  • 자바 5부터는 미래의 어느 시점에 결과를 얻는 모델에 활용할 수 있도록 Future 인터페이스를 제공하고 있습니다.

 

💡 Future를 사용하여 비동기 처리

  • 아래 예제에서는 doSomeThing 이라는 작업이 오래 걸려 다른 스레드에게 작업을 맡기고 나는 나의 일을 하고 있는 예제입니다. 
    하지만 스레드에게 넘겨 준 일이 영원히 끝나지 않으면 문제가 발생하게(나의 상태가 블럭(대기)됨) 됩니다. 그렇기 때문에 적절한 
    타임 아웃을 설정하는 것이 좋습니다.
public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    Future<Double> future = executorService.submit(new Callable<Double>() {
        @Override
        public Double call() throws Exception {
            return doSomeThing(); // 시간이 오래 걸리는 작업을 다른 스레드에게 맡김
        }
    });

    otherSomeThing(); // 비동기를 진행하는 동안 다른 작업 진행
    try {
        Double result = future.get(1, TimeUnit.SECONDS); // 비동기의 결과를 가져오고, 결과가 준비되어 있지 않다면 예외 호출됨
    } catch (ExecutionException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }
}

private static double doSomeThing() {
    try {
        Thread.sleep(5000);
        System.out.println("오래 걸리는 작업");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 2;
}

private static void otherSomeThing() {
    System.out.println("다른 작업 진행중~ ");
}

 

💡 Future의 제한

  • 두 개의 비동기 계산 결과를 하나로 합칩니다. 두 가지 계산 결과는 서로 독립적일 수 있으며, 두 번째 결과가 첫 번째 결과에 의존하는 상황일 수 있습니다.
  • Future의 집합이 실행하는 모든 태스크의 완료를 기다립니다.
  • Future 집합에서 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻습니다.
  • Future 완료 동작에 반응합니다. 즉 결과를 기다리면서 블럭되지 않고 결과가 준비되었다는 알림을 받은 다음에 Future의 결과로 원하는 추가 동작을 수행할 수 있습니다.

 

비동기 API 구현


💡 비동기를 사용하지 않은 예제

  • 사용자가 최저가격 검색 애플리케이션을 호출하면 비동기 동작이 완료될 때까지 1초동안 대기 상태가 되는 예제입니다.
@Getter
public class Shop {

    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        this.random = new Random(name.charAt(0 * name.charAt(1)));
    }

    // getPrice를 구현해 상품의 가격을 반환하는 메서드입니다. 비정상적이지만 예제니까!
    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    // 우리는 실제 호출할 서비스를 구현할 수 없으므로 다른 네트워크 상의 서비스를 delay라는 메서드로 대신합니다.
    private void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

예제 실행

  • 예제를 실행하면 shopList에는 5개의 Shop이 있으므로 총 5초의 시간이 걸리는 것을 알 수 있습니다.
public static void main(String[] args) {
    long start = System.nanoTime();
    System.out.println(findProductPrice("아이폰 12"));
    long duration = ((System.nanoTime() - start) / 1_000_000);
    System.out.println("Done is " + duration + " msecs");
}

private static List<String> findProductPrice(String product) {
    List<Shop> shopList = List.of(new Shop("아이폰 12"),
            new Shop("갤럭시 노트"),
            new Shop("맥북"),
            new Shop("그램"),
            new Shop("아이폰 14"));

    return shopList.stream()
            .map(shop -> shop.getName() + "의 가격은 " + shop.getPrice(product))
            .collect(Collectors.toList());
}

 

💡 비동기를 사용한 예제

  • Shop 클래스에서 getPriceAsync 메서드를 만들어 비동기 함수를 사용하고 있습니다. 
@Getter
public class Shop {

    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        this.random = new Random(name.charAt(0 * name.charAt(1)));
    }

    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        })
        .start();
        return futurePrice;
    }

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    private void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Main {

    public static void main(String[] args) {
        long start = System.nanoTime();
        System.out.println(findProductPrice("아이폰 12"));
        long duration = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Done is " + duration + " msecs");
    }

    private static List<String> findProductPrice(String product) {
        List<Shop> shopList = List.of(new Shop("아이폰 12"),
                new Shop("갤럭시 노트"),
                new Shop("맥북"),
                new Shop("그램"),
                new Shop("아이폰 14"));

        return shopList.stream()
                .map(shop -> shop.getName() + "의 가격은 " + shop.getPriceAsync(product))
                .collect(Collectors.toList());
    }
}

 

💡 비동기 예외 처리 방법

  • 위의 getPriceAsync 메서드에서 에러가 발생하면 어떻게 될까요? 그렇게 된다면 블럭 상태가 되며 계속 기다리게 될 것입니다.
    이처럼 블럭 문제가 발생하는 상황에서는 적절한 타임아웃을 사용해야 합니다.
  • 클라이언트는 타임아웃 값을 받는 get 메서드의 오버로드 버전을 만들어 이 문제를 해결할 수 있습니다. 이처럼 블럭 문제가 발생할 수 있는 상황에서는 적절한 타임아웃을 사용하는게 좋으며, 그래야 문제가 발생했을 때 클라이언트가 영원히 블럭되지 않고 타임 아웃 시간이 지나면 Exception을 받을 수 있습니다.
public Future<Double> getPriceAsyncV2(String product) {
    // 상점의 DB를 이용해 가격 정보를 얻는 동시에 다른 외부 서비스에 접근
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        } catch (Exception e) {
            futurePrice.completeExceptionally(e);
        }
    })
    .start();
    return futurePrice;
}

 

💡 supplyAsync로 CompletableFuture 만들기

  • 지금까지는 CompletableFuture를 직접 만들었지만 조금 더 간단히 만들 수 있습니다.
  • supplyAsync는 Supplier를 인수로 받아서 CompletableFuture를 반환합니다. CompletableFuture는 Supplier를 실행해서
    비동기적으로 결과를 생성합니다.
// CompletableFuture의 함수
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(ASYNC_POOL, supplier);
}

public Future<Double> getPriceAsync(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

 

비블럭 코드 만들기


💡 병렬 스트림으로 만들기

  • 아래 예제에서는 stream을 사용하지 않고 parallelStream을 사용하여 병렬로 성능을 높였습니다.
@Getter
public class Shop {

    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        this.random = new Random(name.charAt(0 * name.charAt(1)));
    }

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    private void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Main {
    public static void main(String[] args) {
        long start = System.nanoTime();
        System.out.println(findProductPrice("아이폰 12"));
        long duration = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Done is " + duration + " msecs"); // Done is 1256 msecs
    }

    private static List<String> findProductPrice(String product) {
        List<Shop> shopList = List.of(new Shop("아이폰 12"),
                new Shop("갤럭시 노트"),
                new Shop("맥북"),
                new Shop("그램"),
                new Shop("아이폰 14"));

        return shopList.parallelStream()
                .map(shop -> shop.getName() + "의 가격은 " + shop.getPrice(product))
                .collect(Collectors.toList());
    }
}

 

🤔 병렬처리 parallelStream을 사용하면 편리한거 같은데 과연 좋을까?

  • 병렬 처리이기 때문에 기본적으로 순서를 보장하지 않습니다.
  • parallelStream은 parallelStream별로 스레드 풀을 만드는게 아닙니다. 별도의 설정이 없으면 하나의 스레드 풀은 모든 parllelStream이 공유하게 됩니다. 이때 기본적으로 사용되는 풀은 ForkJoinPool.commonPool 입니다.

 

🧨 Race Condition(경쟁 상태) 발생

  • 아래 예제는 별도의 설정 없이 commonPool을 공유하게 되는 경우 발생하는 문제입니다. 스레드가 서로 공유하는 자원에 대해 
    Race Condition이 발생하게 됩니다.
@Getter
public class Calculator {

    private int x;

    public void add(int x) {
        this.x += x;
    }
}

public class Main {

    public static void main(String[] args) {

        // Race Condition 발생
        Calculator calculatorA = new Calculator();
        Calculator calculatorB = new Calculator();

        IntStream.rangeClosed(0, 100)
                .parallel()
                .forEach(i -> calculatorA.add(i));

        IntStream.rangeClosed(0, 100)
                .parallel()
                .forEach(i -> calculatorB.add(i));

        System.out.println("calculatorA : " + calculatorA.getX()); // 4971
        System.out.println("calculatorB : " + calculatorB.getX()); // 5029
    }
}

 

🤔 CompletableFuture와 ParallelStream의 작은 차이

  • JDK에는 commonPool을 사용하는 CompleatableFuture과 ParallelStream이 있습니다. 이 두가지 중 중요한 차이는 CompletableFuture 같은 경우 commonPool을 사용하지 않도록 별도의 Thread Pool을 설정할 수 있으나 ParllelStream 같은 경우는 별도의 설정이 불가능합니다.

 

💡 CompletableFuture로 비동기 처리

  • findProductPrice 메서드에서 두 map 연산을 하나의 스트림처리에서 처리하지 않고 두 개의 스트림 파이프라인으로 처리했습니다.
    스트림 연산은 Lazy 특성이 있어서 하나의 파이프라인으로 연산을 처리하면 모든 정보요청 동작이 동기적, 순차적으로 이루어집니다.
@Getter
public class Shop {

    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        this.random = new Random(name.charAt(0 * name.charAt(1)));
    }

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    private void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args) {
    long start = System.nanoTime();
    System.out.println(findProductPrice("아이폰 12"));
    long duration = ((System.nanoTime() - start) / 1_000_000);
    System.out.println("Done is " + duration + " msecs"); // Done is 1268 msecs
}

private static List<String> findProductPrice(String product) {
    List<Shop> shopList = List.of(new Shop("아이폰 12"),
            new Shop("갤럭시 노트"),
            new Shop("맥북"),
            new Shop("그램"),
            new Shop("아이폰 14"));

    List<CompletableFuture<String>> futures = shopList.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + "의 가격은 " + shop.getPrice(product)))
            .collect(Collectors.toList());

    return futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

 

💡 커스텀 Executor를 만드는 방법

  • 우선 커스텀 Executor를 만들기 위해서는 스레드풀 최적값을 찾아야 합니다. 최적값이 필요한 이유는 스레드 풀이 너무 크면 CPU와 
    메모리 자원을 서로 경쟁하느라 시간을 낭비할 수 있고, 반면 스레드 풀이 너무 작으면 CPU의 일부 코어는 활용되지 않을 수 있습니다.
    또한 "자바 병렬 프로그래밍" 에서는 Nthreads = Ncpu * Ucpu * (1 + W/C)로 나타내고 있습니다.
  • Ncpu는 Runtime.getRuntime().availableProcessors() 가 반환하는 코어 수
  • Ucpu 는 0과 1 사이의 값을 갖는 CPU활용 비율
  • W/C는 대기시간과 계산시간의 비율 

 

  • 아래 커스텀 Executor에서는 데몬 스레드를 사용하고 있습니다. 자바에서 일반 스레드가 실행중이면 자바 프로그램은 종료되지 않습니다. 따라서 어떤 이벤트를 한없이 기다리면서 종료되지 않는 일반 스레드가 있으면 문제가 발생할 수 있습니다. 반면 데몬 스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있습니다. 성능은 같으므로 무한히 기다리는 것을 방지하기 위해 데몬 스레드로 설정해줍니다.
public class Main {
    private static List<Shop> shopList = List.of(new Shop("아이폰 12"),
            new Shop("갤럭시 노트"),
            new Shop("맥북"),
            /// 90개 
            new Shop("그램"),
            new Shop("아이폰 14"));

    private static final Executor executor = Executors.newFixedThreadPool(Math.min(shopList.size(), 100),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    return thread;
                }
            });

    public static void main(String[] args) {
        long start = System.nanoTime();
        System.out.println(findProductPrice("아이폰 12"));
        long duration = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Done is " + duration + " msecs");
    }

    private static List<String> findProductPrice(String product) {
        List<CompletableFuture<String>> futures = shopList.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + "의 가격은 " + shop.getPrice(product), executor))
                .collect(Collectors.toList());

        return futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }
}

 

비동기 작업 파이프라인 만들기


💡 사용될 예제 코드

public enum Code {

    NONE(0), SILVER(1), GOLD(2), PLATINUM(3), DIAMOND(4);

    @Getter
    private final int percentage;

    Code(int percentage) {
        this.percentage = percentage;
    }
}

public class Discount {

    private Code code;

    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " 가격은 " + apply(quote.getPrice(), quote.getDiscountCode());
    }

    private static double apply(double price, Code code) {
        delay();
        return price * (100 - code.getPercentage() / 100);
    }

    public static void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Getter
public class Quote {

    private final String shopName;
    private final double price;
    private final Code discountCode;

    public Quote(String shopName, double price, Code discountCode) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = discountCode;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Code discountCode = Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }
}

@Getter
public class Shop {

    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        this.random = new Random(name.charAt(0 * name.charAt(1)));
    }

    public String getPrice(String product) {
        double price = calculatePrice(product);
        Code code = Code.values() [random.nextInt(Code.values().length)];
        return name + ":" + price + ":" + code;
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    public void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

💡 예제 실행

  • findPrices 메서드의 첫번째 map에서는 각 상점에 요청한 제품의 할인 가격을 반환하면서 1초의 시간이 걸립니다.
  • 두번째 map에서는 이들 문자열을 파싱해 Quote 객체를 만듭니다.
  • 세번째 map에서는 할인 서비스에 접근해 최종 할인 가격을 계산하고, 가격에 대응하는 상점 이름을 포함한 문자열을 반환합니다. 1초 
  • 순차적으로 실행하니 첫 번째 map에서 5초 세 번째 map에서 5초 총 10초의 시간이 소요되고 있습니다.
public class Client {
    private static final List<Shop> shopList = List.of(
            new Shop("아이폰 12"),
            new Shop("갤럭시 노트"),
            new Shop("맥북"),
            new Shop("그램"),
            new Shop("아이폰 14")
    );
    private static final Executor executor = Executors.newFixedThreadPool(Math.min(shopList.size(), 100),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    return thread;
                }
            });

    public static void main(String[] args) {
        long start = System.nanoTime();
        System.out.println(findPrices("IPhone"));
        long duration = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Done is " + duration + " msecs"); // Done is 10246 msecs
    }

    public static List<String> findPrices(String product) {
        return shopList.stream()
                .map(shop -> shop.getPrice(product))
                .map(Quote::parse)
                .map(Discount::applyDiscount)
                .collect(Collectors.toList());
    }
}

 

💡 동기 작업과 비동기 작업 조합

  • 코드가 많이 복잡해졌지만 열심히 하나씩 살펴보겠습니다.
public static List<String> findPrices(String product) {
    List<CompletableFuture<String>> futures = shopList.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)) // return => Stream<CompletableFuture<String>>
            .map(future -> future.thenApply(Quote::parse)) // return => Stream<CompletableFuture<Quote>>
            .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))) // return => Stream<CompletableFuture<String>>
            .collect(Collectors.toList());
    return futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

 

가격 정보 얻기

  • 첫번째 연산은 supplyAsync 메서드를 사용해 비동기적으로 상점에서 가격을 조회합니다. 두번째 매개변수를 통해 우리가 만든 Executor를 사용하여 작업을 수행합니다. 반환값은 Stream<CompletableFuture<String>>이며, 상점에서 반환하는 문자열 정보를 포함하고 있습니다.

Quote 파싱하기

  • 두번째 연산은 첫번째 결과로 얻은 Stream<CompletableFuture<String>>을 사용하여 map을 돌리면 CompletableFuture<String>의 상태가 되며 각 Completable<String>에 thenApply 메서드를 호출한 다음 Quote 인스턴스를 반환받고 있습니다.
  • thenApply 메서드는 CompletableFuture가 끝날 때까지 블럭(대기)하지 않는다는 점의 주의해야 합니다.
    즉 CompletableFuture가 동작을 완전히 완료한 다음에 thenApply 메서드로 전달된 람다 표현식을 적용할 수 있습니다.
  • thenApply 메서드의 역할은 비동기 연산의 결과로 받은 문자열(String)을 통해 새로운 것을 반환하는 역할을 하는것 같습니다. Function 인터페이스를 통해 T타입의 인자를 받아 R 타입의 객체를 반환

CompletableFuture를 조합해서 할인된 가격 계산하기

  • 세번째 map 연산에서는 상점에서 받은 할인전 가격에 원격 Discount 서비스에서 제공하는 할인율을 적용해야 합니다. 여기서는 Discount 서비스에 지연(1초)이 되므로 동기적으로 수행해야 합니다.
  • 여기서는 CompletableFuture.supplyAsync 메서드를 통해 Discount 서비스에 접근하여 할인이 적용된 가격을 비동기적으로 가져오고 있으며 thenCompose 메서드를 통해 비동기 연산을 파이프라인으로 만들 수 있습니다.

 

CompletableFuture.join

  • 마지막에는 List에 담겨있는것을 stream을 사용해 변환하며 completableFuture::join을 사용하고 있습니다. join 메서드는 ComputableFuture가 끝나기를 기다리는 블럭(대기) 메서드라고 할 수 있으며, 콜백으로 넘긴 작업이 끝나고 난 이후에 결과값을 반환하게 됩니다.
  • get, join의 차이는 get은 Checked Exception을 던지고 join은 Unchecked Exception을 던지고 있습니다.

 

 

 

 

 

 

 

728x90
반응형