티스토리 뷰

728x90
반응형

서론


Transactional Outbox Pattern에 대해서 궁금하시다면 해당 링크를 참고해 주세요. Transactional Outbox Pattern 
이번 글에서는 Spring Boot와 Polling Publisher를 활용하여 어떻게 Transactional Outbox Pattern을 구현할 수 있는지, 그리고 주의점이 무엇인지 알아보겠습니다.

 

동작 과정


동작 과정

위 동작 과정은 다음과 같습니다.

 

  1. 클라이언트의 요청이 발생합니다.
  2. Member Service에서 Member 테이블에 대해 Insert, Update, Delete가 발생하면 Outbox 테이블에도 변경사항을 기록할 수 있도록 데이터를 저장합니다.
  3. Message Relay는 주기적으로 Outbox 테이블을 읽고 있습니다. 
  4. Message Reply에서 Outbox 테이블에 새로운 데이터가 있으면 Message Broker로 이벤트를 발행합니다. 그리고 해당 이벤트를 Outbox 테이블에서 삭제합니다.

 

필자가 구상한 아키텍처


 

위 과정을 진행하면서 어떠한 것을 고민했는지 무엇을 주의해야 하는지 먼저 알아보겠습니다.

 

🤔 고민점

 

  • 처음에 간단히 구현하고자 outbox-service 인스턴스가 필요할까? 라는 고민을 했었다. 하지만 실제 환경에서 coupon-service 인스턴스가 주기적으로 Auth-DB를 polling 하고 있다면 서비스들 간에 강결합이 발생하고, 추후 유지보수나 coupon-service가 혹여나 스케일 아웃이 발생하면 polling 하고 있던 스케줄러에도 문제가 발생하므로 나누는 게 좋을 거 같다는 생각이 들어서 구분을 지었다.
  • outbox-service에서 주기적으로 Kafka로 메시지를 발송하는데 이때 Kafka에 문제가 발생하여 중복 메시지가 발행될 때 coupon-service에서는 어떻게 멱등성으로 처리할 수 있을까에 대한 고민을 했었다. (이 부분은 추후 코드로 설명)

 

🧨 주의점

 

  • 트랜잭셔널 아웃박스 패턴 사용시 이벤트가 중복해서 발행될 수 있습니다. 따라서 소비하는 인스턴스에서는 이를 멱등하게 처리할 수 있어야 합니다.(필자는 Tsid를 사용하여 처리)
  • 트랜잭션을 지원하지 않는 NoSQL DB를 사용하는 경우 아웃박스에 대한 정보를 객체에 담아 처리할 수 있다고 합니다.
  • 메시지 순서가 중요한 경우 아웃박스 테이블에 시퀀셜한 아이디를 부여하거나 이벤트를 발행할 때 생성일을 저장하여 Message Relay에서 특정 값 순서로 쿼리를 작성하면 됩니다.

 

코드 살펴보자


1. 회원가입 요청으로 인한 auth table 및 outbox table 저장

 

  • 아래 코드를 살여보면 MemberCreateService에서 auth table에 저장을 하고, 이벤트를 발행시키는 것을 알 수 있습니다.
  • 이벤트는 @EventListener 어노테이션을 사용하여 동기적으로 처리하고 있습니다. 그렇기 때문에 auth table에 저장 시 문제가 발생하면 outbox table에도 저장되지 않고, 반대인 경우에도 저장되지 않습니다.
@Service
@Transactional
@RequiredArgsConstructor
public class MemberCreateService implements MemberCreateUsecase {

    private final CreateMemberPort createMemberPort;
    private final ValidationMemberPort validationPort;
    private final ApplicationEventPublisher publisher;

    @Override
    public MemberCreateResponse create(MemberCreateRequest request) {
        validationPort.validateMemberId(request.id());
        validationPort.validateMemberEmail(request.email());

        var memberEmail = new MemberEmail(request.email());
        var member = Member.create(request.id(), request.password(), request.name(), request.nickName(), memberEmail);
        
        // auth table 저장
        var entity = createMemberPort.createMember(member);

        // 멤버 생성 이벤트 발행
        publisher.publishEvent(new MemberCreatedEvent(member.getId(), member.getName(), member.getEmail()));

        return new MemberCreateResponse(entity.converterPKToString());
    }
}

@Component
@RequiredArgsConstructor
public class MemberEventListener {

    private final MemberOutBoxCreateService service;

    @EventListener(MemberCreatedEvent.class)
    public void handle(MemberCreatedEvent event) {
        service.create(event);
    }
}

@Service
@Transactional
@RequiredArgsConstructor
public class MemberOutBoxCreateService {

    private final CreateMemberOutBoxPort createMemberOutBoxPort;

    @SneakyThrows
    public void create(MemberCreatedEvent event) {
        var objectMapper = new ObjectMapper();
        var payload = objectMapper.writeValueAsString(event);
        var memberOutBox = MemberOutBox.create(event.id(), payload);

        // outbox table 저장
        createMemberOutBoxPort.createMemberOutBox(memberOutBox);
    }
}

 

 

2. Outbox-Service에 의한 Polling Publisher

 

  • 스케줄러는 10초마다 outbox table에서 status가 READY인 데이터를 조회하고, 상태를 DONE으로 변경시키면서 Kafka에게 메시지를 발행시킵니다.
@Service
@Transactional
@RequiredArgsConstructor
public class OutboxFindService implements OutboxFindPort {

    private final MemberOutBoxDao memberOutBoxDao;

    @Override
    public List<OutboxResponse> findAll() {
        // outbox table에서 status가 READY인 데이터 조회
        var outBoxes = memberOutBoxDao.findAll();

        // 상태 변경(READY -> DONE)
        outBoxes.forEach(MemberOutBox::updateToDoneStatus);

        return outBoxes.stream()
                .map(outBox -> new OutboxResponse(outBox.getId(), outBox.getMemberId(), outBox.getPayload()))
                .collect(Collectors.toList());
    }
}

public interface OutboxFindPort {

    List<OutboxResponse> findAll();
}

@Slf4j
@Component
@RequiredArgsConstructor
public class OutBoxScheduler {

    private final OutboxFindPort findPort;
    private final KafkaProducer kafkaProducer;

    @Scheduled(cron = "0/10 * * * * *")
    public void scheduler() {
        findPort.findAll()
                .forEach(kafkaProducer::sendMessage);
    }
}

 

 

3. Coupon-Service의 데이터 저장

 

  • coupon-service는 Kafka의 특정 토픽을 구독하고 있습니다. 구독을 하고 있다가 메시지를 받으면 Service를 호출하게 됩니다. 그리고 서비스에서는 쿠폰과 쿠폰 히스토리를 저장하고 있습니다. 
  • coupon-service에서는 Kafka로부터 받은 메시지를 멱등성이 보장되도록 비지니스 로직을 구성해야 합니다. 필자는 아직 쿠폰 히스토리 테이블에 outboxId를 유니크 설정을 하여 저장을 시킴으로써 멱등성을 보장하고 있습니다. 
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumer {

    private final CouponCreateUsecase createPort;

    @KafkaListener(topics = "outbox")
    public void consume(MemberOutboxMessage message) {
        log.info("Consumer message : {}", message);

        var request = new CouponCreateRequest(message.outBoxId(), message.id());
        createPort.create(request);
    }
}

public interface CouponCreateUsecase {

    void create(CouponCreateRequest request);
}

@Service
@Transactional
@RequiredArgsConstructor
public class CouponCreateService implements CouponCreateUsecase {

    private static final String COUPON_NAME = "가입 축하 쿠폰";

    private final CouponCreatePort createPort;
    private final CouponHistoryCreatePort historyCreatePort;

    @Override
    public void create(CouponCreateRequest request) {

        // 쿠폰 저장
        var coupon = Coupon.create(COUPON_NAME, CouponStatus.AVAILABLE, LocalDateTime.now());
        var savedCoupon = createPort.create(coupon);

        // 쿠폰 히스토리 저장
        var couponHistory = CouponHistory.create(savedCoupon.getId(), request.outBoxId(), request.memberId(), LocalDateTime.now());
        historyCreatePort.create(couponHistory);
    }
}

 

 

👏 후기

지금까지 Polling Publisher 방법을 사용하여 트랜잭셔널 아웃박스 패턴에 대해 알아보았는데, 필자는 29CM 기술 블로그를 통해 트랜잭셔널 아웃박스 패턴을 적용하면서 무엇을 주의해야 하는지, MSA 환경에서 멱등성을 어떻게 보장해야 하는지, 정합성을 어떻게 보장해야 하는지 조금이나마 깊게 생각해 본 기회가 되었습니다.

그리고 Kafka는 너무 어렵다... 라는 것을 깨닭게 되었습니다ㅠㅠ 그렇지만 많은 기능을 제공해주고 있기 때문에 Kafka에 대해서 학습할 필요를 느꼈으며, 다음 포스팅에서는 Kafka Connect를 사용한 Transaction log tailing에 대해 학습해 보겠습니다. 

 

위 블로그 내용은 안보더라도 아래 참고 링크에 있는 29CM 기술 블로그와 우와콘에서 소개하고 있는 아웃박스 패턴 적용 사례는 꼭 보시길 추천하겠습니다..!! 

 

 

https://github.com/kdg0209/my-real-trip

 

GitHub - kdg0209/my-real-trip

Contribute to kdg0209/my-real-trip development by creating an account on GitHub.

github.com

 

 

 

 

 

참고

 

 

 

 

 

 

 

728x90
반응형