티스토리 뷰
728x90
반응형
리액티브 매니패스토
- 리액티브 매니패스토는 2013년 ~ 2014년에 걸쳐 개발되었으며 리액티브 애플리케이션과 시스템 개발의 핵심 원칙을 공식적으로 정의하고 있습니다.
반응성
- 리액티브 시스템은 빠를 뿐 아니라 더 중요한 특징으로 일정하고 예상할 수 있는 반응 시간을 제공합니다. 결과적으로 사용자가 기대치를 가질 수 있습니다.
- 즉 리액티브 시스템은 빠르고, 일정하고 예측할 수 있는 반응 시간을 통해 사용자의 기대치를 높이는것 같습니다.
회복성
- 장애가 발생해도 시스템은 반응해야 합니다.
탄력성
- 애플리케이션의 생명주기 동안 다양한 작업 부하를 받게 되는데, 이러한 부하로 인해 애플리케이션의 반응성이 위협받을 수 있습니다.
리액티브 시스템에서는 무거운 작업 부하가 발생하면 자동적으로 관련 컴포넌트에 할당된 자원수를 늘리게 됩니다.
메시지 주도
- 회복성과 탄력성을 지원하려면 약한 결합, 고립, 위치 투명성 등을 지원할 수 있도록 시스템을 구성하는 컴포넌트의 경계를 명확히 정의해야 합니다. 비동기 메시지를 전달해 컴포넌트 끼리 통신이 이루어지는데 이 덕분에 회복성과 탄력성을 얻을 수 있다고 합니다.
💡 어플리케이션 수준의 리액티브
- 애플리케이션 수준 컴포넌트의 리액티브 프로그래밍의 주요 기능은 비동기로 작업을 수행할 수 있다는 점입니다.
- 이러한 기술을 사용하면 스레드보다 가벼울 뿐 아니라 개발자에게 큰 이득을 제공한다고 합니다. 개발자 입장에서는 이들 기술을 이용함으로 동시, 비동기 애플리케이션 구현의 추상 수준을 높일 수 있으므로 동기 블럭, 경쟁 조건, 데드락 같은 저수준의 멀티 스레드 문제를 직접 처리할 필요가 없어지면서 비지니스 요구사항에 집중할 수 있다고 합니다.
- 스레드를 다시 쪼개는 종류의 기술(아마 병렬화,,?)을 이용할 때는 메인 이벤트 루프 안에서는 절대 동작을 블럭하지 않아야 한다는 중요한 전제 조건이 항상 따른다고 합니다. 데이터베이스나 파일 시스템 접근, 작업 완료까지 얼마나 걸릴지 예측이 힘든 원격 서비스 호출 등 모든 I/O 관련 동작이 블럭 동작에 속합니다.
예제로 이해해보기
- 두 스레드를 포함하는 스레드 풀이 있고 이벤트 스트림을 3개나 처리해야 하는 상황을 가정합니다. 한번에 오직 두개의 스트림을 처리할 수 있는 상황이므로 가능하면 두 스레드를 효율적으로 작업을 나눠 이벤트 스트림을 처리해야 합니다.
어떤 스트림의 이벤트를 처리하다보니 데이터베이스 접근 등 블럭이 되는 API로 인해 작업이 느려지게 되었습니다. 이때 나머지 이벤트 스트림은 다른 스레드의 작업이 끝나기만 기다리고 있는 상태이므로 여전히 처리되지 못하고 있습니다. - RxJava, Akka 같은 리액티브 프레임워크는 별도로 지정된 스레드 풀에서 블럭 동작을 실행시켜 이러한 문제를 해결합니다. 메인 풀의 모든 스레드는 방해받지 않고 실행되므로 모든 CPU 코어가 가장 최적의 상황에서 동작할 수 있다고 합니다.
💡 시스템 수준의 리액티브
- 리액티브 시스템은 여러 애플리케이션이 한개의 일괄적인, 회복할 수 있는 플랫폼을 구성할 수 있게 해줄 뿐 아니라 이들 애플리케이션 중 하나가 실패해도 전체 시스템은 계속 운영될 수 있도록 도와주는 소프트웨어 아키텍처입니다.
- 리액티브 시스템에서는 수신자와 발신자가 각각 수신 메시지, 발신 메시지와 결합하지 않도록 이들 메시지를 비동기로 처리해야 합니다. 각 컴포넌트를 완전히 고립하려면 이들이 결합되지 않도록 해야하며 그래야만 시스템이 장애(회복성)와 높은 부하(탄력성)에서도 반응성을 유지할 수 있습니다.
- 리액티브 아키텍처에서는 컴포넌트에서 발생한 장애를 고립시킴으로써 문제가 다른 컴포넌트로 전파되지 않도록 막아 회복성을 제공합니다. 이러한 맥락에서 회복성은 결함 허용 능력과 같은 의미를 지닙니다.
- 시스템에 장애가 발생했을 때 서서히 능력이 저하되는 것이 아니라 문제를 격리함으로써 장애에서 완전 복구되어 건강한 상태로 시스템이 돌아옵니다.
- 이러한 고립과 비결합이 회복성의 핵심이라면 탄력성의 핵심은 위치 투명성입니다. 위치 투명성은 리액티브 시스템의 모든 컴포넌트가 수신자의 위치에 상관없이 다른 모든 서비스와 통신할 수 있음을 의미합니다.
이러한 위치 투명성으로 인해 시스템을 복제할 수 있으며 현재 작업 부하에 따라 자동으로 애플리케이션을 확장할 수 있습니다.
리액티브 스트림과 Flow API
- 리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그래밍입니다.
- 리액티브 스트림은 잠재적으로 무한의 비동기 데이터를 순서대로 그리고 블럭하지 않는 역압력을 전제해 처리하는 표준 기술입니다.
역압력이란?
- 역압력은 발행-구독 프로토콜에서 이벤트 스트림의 구독자가 발행자가 제공하는 속도보다 느린 속도로 이벤트를 소비하면서 문제가 발생하지 않도록 보장하는 장치입니다.
- 책에서는 조금 어렵게 표현을 하고 있는거 같아 개인적으로 풀어 해석하자면 예전 TCP/IP를 공부하다가 이러한 비슷한 내용을 본적 있는데 TCP의 특징 중 하나인 flow-control(흐름제어)를 통해 상대방이 받을 수 있는 만큼만 데이터를 효율적으로 전송하는 것입니다.
발행자가 데이터를 보내는 속도가 구독자가 데이터를 받는 속도보다 빠르다면 데이터는 계속 쌓이게 됩니다. 그러면 구독자는 힘들어할 것입니다. 이를 방지하기 위한 기능이 역압력인것 같습니다.
😱 구독자가 감당하지 못하는 속도로 데이터를 받으면?
- 위에서 발행자가 너무 빠른 속도로 데이터를 보내게 되면 구독자는 너무 힘들어할 것이다. 라고 표현하였습니다. 그럼 구독자는 어떠한 액션을 취할 수 있을까요? 우리는 아래 내용으로 역압력은 필수라는 사실을 알 수 있습니다.
- 1. 부하가 발생한 컴포넌트는 이벤트 발생 속도를 늦추라고 알립니다.
- 2. 현재 구독자가 얼마나 많은 이벤트를 받을 수 있는지 발행자에게 알립니다.
- 3. 발행자로부터 새로운 데이터를 받기전 기존 데이터를 처리하는데 얼마나 시간이 걸리는지를 업스트림 발행자에게 알립니다.
💡 Flow 클래스 소개
- Publisher
- Subscriber
- Subscription
- Processor
- Flow 클래스 내부에 여러가지 클래스들이 있는데 하나씩 살펴보겠습니다.
// java.util.concurrent.Flow
public final class Flow {
private Flow() {} // uninstantiable
... 생략(밑에 있음)
static final int DEFAULT_BUFFER_SIZE = 256;
public static int defaultBufferSize() {
return DEFAULT_BUFFER_SIZE;
}
}
💡 Publisher<T>
- Publisher 인터페이스에는 시그니처 함수로 subscribe 메서드를 가지고 있으며 구독자를 등록할 수 있는 메서드입니다.
- Publisher는 반드시 Subscription의 request 메서드에 정의된 개수 이하의 요소만 Subscriber에게 전달해야합니다. 전달이 성공적으로 끝났으면 Subscriber의 onComplete 메서드가 호출되고, 문제가 있으면 onError 메서드가 호출됩니다.
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
💡 Subscriber<T>
- Publisher가 Subscriber를 등록(subscribe)하면 Subscriber는 onSubscribe 메서드를 호출해 Subscription을 받습니다.
- Subscriber는 요소를 받아 처리할 수 있음을 Publisher에게 알려야 하며, 이런 방식으로 Subscriber는 Publisher에게 역압력을 행사할 수 있습니다.
- Subscriber는 Subscription의 request 메서드 호출없이 언제든 종료 시그널을 받을 준비가 되어 있어야 하며, Subscription의 cancel 메서드가 호출된 이후에라도 한개 이상의 onNext 메서드를 받을 준비가 되어 있어야 합니다.
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
💡 Subscription
- Subscriber와 같은 생명주기를 가지며 Subscriber가 Publisher에 의해 등록(subscribe)될 때 하나씩 생성됩니다.
- 하나의 Subscriber당 하나의 Subscription을 가집니다.
- Subscriber가 Publisher에게 데이터를 요청하게되면 아래 인터페이스의 request 메서드를 사용하게 됩니다.
- Publisher와 Subscriber는 Subscription을 공유해야하며 각각 고유한 역할을 수행해야 합니다.
public static interface Subscription {
public void request(long n);
public void cancel();
}
💡 Processor<T, R>
- 해당 인터페이스는 아무 메서드도 없으며 마치 Function<T, R> 인터페이스처럼 T의 인자를 사용해 R 형태로 반환하는 메서드의 역할을 수행합니다. 즉 중간에 데이터를 가공해야할 경우 사용됩니다.
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
💡 Publisher - Subscriber 구조
- 발행자(Publisher)가 구독자(Subscriber)를 등록(subscribe)합니다.
- 동시에 구독자(Subscriber)가 전달자(Subscription)를 등록(onSubscribe)합니다.
- 구독자(Subscriber)는 데이터가 필요할 때 전달자(Subscription.request)를 통해 발행자(Publisher)에게 데이터를 요청합니다.
- 발행자는 데이터를 전달자에게 보냅니다.
- 구독자는 onNext 메서드를 통해 데이터를 받고 성공적으로 완료되면 onComplete 메서드를 호출하여 흐름을 종료하거나 실패해서 onError 메서드를 호출하여 흐름을 종료할 수 있습니다.
💡 예제로 만들어보기
- 코드가 조금 길지만 천천히 살펴보면 Main 메서드에서 새로운 구독자는 subscribe 메서드를 통해 발행자를 구독하게 되며 구독자는 onSubscriber 메서드를 통해 Subscription을 받게 됩니다. 그렇게 구독자와 발행자의 관계가 만들어지는데 구독자가 발행자에게 데이터를 요청할 때 Subscription의 request 메서드가 호출하게되며, Thermometer 클래스의 getThermometer 메서드에서는 10분의 1확률로 예외가 발행하게 됩니다.
@Getter
@ToString
@AllArgsConstructor
public class Thermometer {
private static final Random random = new Random();
private final String city; // 각 지역
private final int temperature; // 지역의 온도
public static Thermometer getThermometer(String city) {
if (random.nextInt(10) == 0) throw new RuntimeException("온도를 가져오는 중 예러 발생!");
return new Thermometer(city, random.nextInt(100));
}
}
@AllArgsConstructor
public class ThermometerSubscription implements Subscription {
private final Subscriber<? super Thermometer> subscriber;
private final String city;
@Override
public void request(long n) {
for (long i = 0; i < n; i++) {
try {
subscriber.onNext(Thermometer.getThermometer(city));
} catch (Exception e) {
subscriber.onError(e);
break;
}
}
}
@Override
public void cancel() {
this.subscriber.onComplete();
}
}
public class ThermometerSubscriber implements Subscriber<Thermometer> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Thermometer item) {
System.out.println(item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Complete !!");
}
}
public class Main {
public static void main(String[] args) {
getThermometer("써울").subscribe(new ThermometerSubscriber());
}
private static Publisher<Thermometer> getThermometer(String city) {
return subscriber -> subscriber.onSubscribe(new ThermometerSubscription(subscriber, city));
}
}
💡 문제가 발생하는 상황
- 구독자가 발행자에게 데이터를 요청하는 경우 Subscription의 request 메서드가 호출되며, Thermometer 클래스의 getThermometer 메서드에서는 해당 메서드에서는 10분의 1확률로 예외를 던지고 있고 해당 로직을 주석처리하면 재귀호출로 인해 스택 오버 플로우 예외가 발생합니다.
- 어떻게 문제를 해결할 수 있을까요?
- 별도의 ExecutorService를 만들어 해결할 수 있다고 합니다.
@AllArgsConstructor
public class ThermometerSubscription implements Subscription {
private static final ExecutorService executor = Executors.newSingleThreadExecutor();
private final Subscriber<? super Thermometer> subscriber;
private final String city;
@Override
public void request(long n) {
executor.submit(() -> {
for (long i = 0; i < n; i++) {
try {
subscriber.onNext(Thermometer.getThermometer(city));
} catch (Exception e) {
subscriber.onError(e);
break;
}
}
});
}
@Override
public void cancel() {
this.subscriber.onComplete();
}
}
728x90
반응형
'스터디 > 모던 자바 인 액션' 카테고리의 다른 글
모던 자바 인 액션 - 18장 함수형 관점으로 생각하기 (0) | 2022.09.24 |
---|---|
모던 자바 인 액션 - 16장 CompletableFuture: 안정적 비동기 프로그래밍 (2) | 2022.09.20 |
모던 자바 인 액션 - 13장 디폴트 메서드 (0) | 2022.09.17 |
모던 자바 인 액션 - 11장 null 대신 Optional 클래스 (0) | 2022.09.15 |
모던 자바 인 액션 - 9장 리펙터링, 테스팅, 디버깅 (0) | 2022.09.11 |
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
TAG
- spring boot poi excel download
- polling publisher spring boot
- redis sorted set
- 서비스 기반 아키텍처
- service based architecture
- 람다 표현식
- spring boot 엑셀 다운로드
- spring boot redisson destributed lock
- redis 대기열 구현
- @ControllerAdvice
- java userThread와 DaemonThread
- transactional outbox pattern spring boot
- spring boot excel download oom
- spring boot excel download paging
- spring boot redis 대기열 구현
- JDK Dynamic Proxy와 CGLIB의 차이
- 공간 기반 아키텍처
- 자바 백엔드 개발자 추천 도서
- spring boot redisson 분산락 구현
- spring boot redisson sorted set
- 트랜잭셔널 아웃박스 패턴 스프링 부트 예제
- space based architecture
- microkernel architecture
- java ThreadLocal
- transactional outbox pattern
- 트랜잭셔널 아웃박스 패턴 스프링부트
- 레이어드 아키텍처란
- pipeline architecture
- pipe and filter architecture
- redis sorted set으로 대기열 구현
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
글 보관함