ํ‹ฐ์Šคํ† ๋ฆฌ ๋ทฐ

728x90
๋ฐ˜์‘ํ˜•

๐Ÿ’ก Server Sent Event๋ž€?

  • HTTP ์ŠคํŠธ๋ฆฌ๋ฐ์„ ํ†ตํ•ด ์„œ๋ฒ„์—์„œ ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ๋‹จ๋ฐฉํ–ฅ์œผ๋กœ ์•Œ๋ฆผ์„ ์ „์†กํ•  ์ˆ˜ ์žˆ๋Š” HTML5 ํ‘œ์ค€ ๊ธฐ์ˆ ์ž…๋‹ˆ๋‹ค.
  • EventStream์˜ ์ตœ๋Œ€ ๊ฐœ์ˆ˜๋Š” HTTP/1.1 ์‚ฌ์šฉ์‹œ 6๊ฐœ, ๊ฐ„๋‹จํžˆ ๋งํ•ด ํฌ๋กฌํƒญ์„ 6๊ฐœ๊นŒ์ง€ ์ด์šฉ๊ฐ€๋Šฅํ•˜๋ฉฐ HTTP/2 ์‚ฌ์šฉ์‹œ ์ตœ๋Œ€ 100๊ฐœ๊นŒ์ง€ ์œ ์ง€ํ•  ์ˆ˜ ์žˆ๋‹ค๊ณ  ํ•ฉ๋‹ˆ๋‹ค.
  • JavaScript์˜ EventSource๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ปค๋„ฅ์…˜์„ ๋งบ์„ ์ˆ˜ ์žˆ์œผ๋ฉฐ, ์ ‘์†์— ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•œ ๊ฒฝ์šฐ ์ž๋™์œผ๋กœ ์žฌ์‹œ๋„๋ฅผ ํ•˜๋Š” ํŠน์ง•์„ ๊ฐ€์ง€๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
  • IE์—์„œ๋Š” EventSource๋ฅผ ๊ธฐ๋ณธ์ ์œผ๋กœ ์ œ๊ณตํ•˜๊ณ  ์žˆ์ง€ ์•Š์ง€๋งŒ polyfill์ด๋ผ๋Š” ๊ฒƒ์„ ์‚ฌ์šฉํ•˜์—ฌ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ์ง€๋งŒ ์ด์ œ IE๋Š” ๋– ๋‚ฌ์œผ๋ฉฐ ๊ฑฑ์ •ํ•˜์ง€ ์•Š์•„๋„ ๊ดœ์ฐฎ์ง€ ์•Š์„๊นŒ? ํ•ฉ๋‹ˆ๋‹ค.
  • ํด๋ผ์ด์–ธํŠธ์—์„œ ํŽ˜์ด์ง€๋ฅผ ๋‹ซ์•„๋„ ์„œ๋ฒ„๊ฐ€ ์ด๋ฅผ ์ฒดํ‚นํ•˜๊ธฐ ์–ด๋ ต์Šต๋‹ˆ๋‹ค. ์†Œ์ผ“์„ ์‚ฌ์šฉํ•˜๋ฉด disconnect ์ด๋ฒคํŠธ๋ฅผ ์•Œ ์ˆ˜ ์žˆ์ง€๋งŒ SSE๋Š” ์–ด๋ ต์Šต๋‹ˆ๋‹ค.

 

๐Ÿ’ก ๊ตฌํ˜„ํ•˜๊ณ ์ž ํ•˜๋Š” ์ „์ฒด์ ์ธ ๊ทธ๋ฆผ

  • ์ƒํ™ฉ์€ ๊ฐ„๋‹จํ•ฉ๋‹ˆ๋‹ค. ์‚ฌ์šฉ์ž A๊ฐ€ ๊ธ€์„ ๋“ฑ๋กํ–ˆ์„ ๊ฒฝ์šฐ ์‚ฌ์šฉ์ž B ๋˜๋Š” SseEmitter์— ๋“ฑ๋ก๋œ ์ „์ฒด ์‚ฌ์šฉ์ž์—๊ฒŒ ์•Œ๋ฆผ์„ ๋ณด๋‚ด๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

์ „์ฒด์ ์ธ ๊ตฌ์กฐ

 

๐Ÿ’ก Spring boot Source

 

์ปจํŠธ๋กค๋Ÿฌ 

  • ํด๋ผ์ด์–ธํŠธ์—์„œ EventSource๋ฅผ ๋ณด๋‚ผ ๋•Œ ์š”์ฒญ์„ ๋ฐ›์•„๋“ค์ด๊ธฐ ์œ„ํ•œ ์ปจํŠธ๋กค๋Ÿฌ๊ฐ€ ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. 
  • sse ํ†ต์‹ ์„ ํ•˜๊ธฐ์œ„ํ•ด์„œ๋Š” MIME ํƒ€์ž…์„ text/event-stream์œผ๋กœ ์„ค์ •์„ ํ•ด์•ผํ•ฉ๋‹ˆ๋‹ค.
  • Last-Event-ID๋ž€ sse์˜ ์ปค๋„ฅ์…˜ ๋งŒ๋ฃŒ ์‹œ๊ฐ„ ๋ฐ ๋„คํŠธ์›Œํฌ ์˜ค๋ฅ˜๋กœ ์ธํ•ด ์—ฐ๊ฒฐ์ด ๋Š์–ด์กŒ์„ ๊ฒฝ์šฐ ํด๋ผ์ด์–ธํŠธ๋Š” ๊ทธ ์‹œ๊ฐ„๋™์•ˆ ๋ฐ์ดํ„ฐ๊ฐ€ ์œ ์‹ค๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋กœ์ธํ•ด ํด๋ผ์ด์–ธํŠธ๊ฐ€ ๋ฐ›์€ ๋งˆ์ง€๋ง‰ ์•„์ด๋””๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๊ทธ๋™์•ˆ์˜ ์œ ์‹ค๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ๋ณด๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
@RestController
@RequestMapping("/notification")
@RequiredArgsConstructor
public class NotificationApi {

    private final NotificationService notificationService;

    @CrossOrigin("*")
    @GetMapping(value = "/subscribe", produces = "text/event-stream")
    public SseEmitter subscribe(@RequestParam(name = "userId") Long userId,
                                @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
        return notificationService.subscribe(userId, lastEventId);
    }
}

 

์„œ๋น„์Šค

  • userId๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ emitterId๋ฅผ ๋งŒ๋“œ๋Š”๋ฐ ๋งŒ๋“œ๋Š” ํ˜•์‹์€ userId + currentTimeMillis ์ž…๋‹ˆ๋‹ค. ์ด๋ ‡๊ฒŒ ๋งŒ๋“œ๋Š” ์ด์œ ๋Š” ์œ„์—์„œ ์–ธ๊ธ‰ํ•œ 
    Last-Event-ID์™€ ๊ด€๋ จ์ด ์žˆ์œผ๋ฉฐ Last-Event-ID๋Š” ํด๋ผ์ด์–ธํŠธ๊ฐ€ ๋งˆ์ง€๋ง‰์œผ๋กœ ๋ฐ›์€ ์ด๋ฒคํŠธ์˜ ์•„์ด๋””์ž…๋‹ˆ๋‹ค. ์ด๋ ‡๊ฒŒ ๋‘๋Š” ์ด์œ ๋Š” ์•„๋ž˜์—์„œ ์„ค๋ช…ํ•˜๊ฒ ์”๋‹ˆ๋‹ค.
  • ๋‚˜๋จธ์ง€ ์ฝ”๋“œ๋Š” ์ฒœ์ฒœํžˆ ์‚ดํŽด๋ณด๋ฉด ์ดํ•ด๊ฐ€ ๋ ๊ฒƒ์ž…๋‹ˆ๋‹ค.
// Last-Event-Id๊ฐ€ ์ค‘๋ณต์ด ๋˜๋ฉด ์–ด๋–ค ๋ฐ์ดํ„ฐ๊ฐ€ ๋งˆ์ง€๋ง‰์œผ๋กœ ๋ณด๋‚ธ ๋ฐ์ดํ„ฐ์ธ์ง€ ํ™•์ธํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.
Last-Event-Id = 10
{10, data1}
{10, data3}
{10, data2}

// Last-Event-Id๋ฅผ ์‹œ๊ฐ„๊ณผ ํ•จ๊ป˜ ์„ž์–ด ์„ค์ •ํ•˜๋ฉด ์–ด๋–ค ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋ƒˆ๋Š”์ง€ ์ˆœ์„œ๋ฅผ ํŒŒ์•…ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
Last-Event-Id = 10_1631593143664

{10_2931593143664, data1}
{10_4031593143664, data3}
{10_1831593143664, data2}

 

@Getter
@AllArgsConstructor
public class NotificationDTO {

    private String text;
}

@Service
@RequiredArgsConstructor
public class NotificationService {

    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    private final Map<String, NotificationDTO> eventCaches = new ConcurrentHashMap<>();
    private static final Long EMITTER_SESSION_TIME = (60 * 60 * 1000) * 3L; // 3์‹œ๊ฐ„

    // ๋ธŒ๋ผ์šฐ์ €์—์„œ eventsource๋กœ ์—ฐ๊ฒฐํ• ๋•Œ๋งˆ๋‹ค emitter ๊ฐ์ฒด ์ƒ์„ฑํ•˜์—ฌ ๋“ฑ๋ก
    public SseEmitter subscribe(Long userId, String lastEventId) {
        String emitterId = makeIdByUserId(userId);
        SseEmitter emitter = new SseEmitter(EMITTER_SESSION_TIME);

        // SseEmitter์˜ ์‹œ๊ฐ„์ดˆ๊ณผ ๋ฐ ๋„คํŠธ์›Œํฌ ์˜ค๋ฅ˜๋ฅผ ํฌํ•จํ•œ ๋น„๋™๊ธฐ ์š”์ฒญ์ด ์ •์ƒ์ ์œผ๋กœ ๋™์ž‘ํ•  ์ˆ˜ ์—†๋‹ค๋ฉด ์ €์žฅํ•ด๋‘” SseEmitter ์‚ญ์ œ
        emitter.onCompletion(() -> deleteByEmitterId(emitterId));
        emitter.onTimeout(() -> deleteByEmitterId(emitterId));
        emitter.onError((e) -> deleteByEmitterId(emitterId));
        saveNewEmitter(emitterId, emitter);

        // ์—ฐ๊ฒฐ ํ›„ ์•„๋ฌด๋Ÿฐ ์‘๋‹ต๊ฐ’์„ ๋ณด๋‚ด์ฃผ์ง€ ์•Š์œผ๋ฉด ์˜ค๋ฅ˜ ๋ฐœ์ƒํ•˜๋ฏ€๋กœ ๋”๋ฏธ ๋ฐ์ดํ„ฐ ๋ฐ˜ํ™˜
        // 503 error๋ฅผ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•œ ๋”๋ฏธ ์ด๋ฒคํŠธ ์ „์†ก
        String eventId = makeIdByUserId(userId);
        sendNotification(emitter, eventId, new NotificationDTO("๋”๋ฏธ ๋ฐ์ดํ„ฐ"));

        // ํด๋ผ์ด์–ธํŠธ๊ฐ€ ๋ฏธ์ˆ˜์‹ ํ•œ Event๊ฐ€ ์žˆ์„ ๊ฒฝ์šฐ ์ „์†กํ•˜์—ฌ Event ์œ ์‹ค ๋ฐฉ์ง€
        // ์ฆ‰ ์—ฐ๊ฒฐ์ด ๋Š๊ธฐ๊ณ  ์žฌ ์—ฐ๊ฒฐ๋˜์—ˆ์„ ๊ฒฝ์šฐ lastEventId๋กœ ๋งˆ์ง€๋ง‰ ์ด๋ฒคํŠธ ์ฐพ์Œ
        if (hasLatestNotification(lastEventId)) {
            sendLatestNotification(emitter, lastEventId, emitterId);
        }
        return emitter;
    }

    // ํŠน์ • ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ์•Œ๋ฆผ ๋ฐœ์†ก
    public void send(Long userId, NotificationDTO notificationDTO) {
        Map<String, SseEmitter> emitters = findEmitterByEmitterId(userId.toString());
        emitters.forEach((key, value) -> {
            saveEventCache(key, notificationDTO);
            sendNotification(value, key, notificationDTO);
        });
    }

    // ๋ชจ๋“  ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ์•Œ๋ฆผ ๋ฐœ์†ก
    public void sendBroadcast(NotificationDTO notificationDTO) {
        Map<String, SseEmitter> broadcast = findAllEmitterBroadcast();
        broadcast.forEach((key, value) -> {
            saveEventCache(key, notificationDTO);
            sendNotification(value, key, notificationDTO);
        });
    }

    // ๊ธฐ์กด์— ๋“ฑ๋ก๋˜์–ด ์žˆ๋˜ emitter ์‚ญ์ œ ํ›„ ๋“ฑ๋ก
    private void saveNewEmitter(String emitterId, SseEmitter emitter) {
        deleteAllEmitterByEmitterId(emitterId);
        emitters.put(emitterId, emitter);
    }

    // ์บ์‹œ ์ €์žฅ
    private void saveEventCache(String eventId, NotificationDTO event) {
        eventCaches.put(eventId, event);
    }

    private Map<String, SseEmitter> findEmitterByEmitterId(String emitterId) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(emitterId))
                .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private Map<String, NotificationDTO> findAllEventCacheByEmitterId(String emitterId) {
        return eventCaches.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(emitterId))
                .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    // ๋“ฑ๋ก๋œ ๋ชจ๋“  ํด๋ผ์ด์–ธํŠธ ๋ฐ˜ํ™˜
    private Map<String, SseEmitter> findAllEmitterBroadcast() {
        return emitters.entrySet().stream()
                .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    // ๋“ฑ๋ก๋œ Emitter ์‚ญ์ œ
    private void deleteByEmitterId(String emitterId) {
        emitters.remove(emitterId);
    }

    // ์œ ์ €์™€ ๊ด€๋ จ๋œ ๋ชจ๋“  Emitter ์‚ญ์ œ
    private void deleteAllEmitterByEmitterId(String emitterId) {
        emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(emitterId))
                .forEach(entry -> emitters.remove(entry.getKey()));
    }

    // ์œ ์ €์™€ ๊ด€๋ จ๋œ ์บ์‹œ ์‚ญ์ œ
    private void deleteAllEventCacheUserId(String eventId) {
        eventCaches.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(eventId))
                .forEach(entry -> eventCaches.remove(entry.getKey()));
    }

    private void sendLatestNotification(SseEmitter emitter, String lastEventId, String emitterId) {
        Map<String, NotificationDTO> eventCaches = findAllEventCacheByEmitterId(emitterId);
        eventCaches.entrySet().stream()
                .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                .forEach(entry -> sendNotification(emitter, emitterId, entry.getValue()));
        deleteAllEventCacheUserId(lastEventId);
    }

    private void sendNotification(SseEmitter emitter, String emitterId, NotificationDTO data) {
        try {
            emitter.send(SseEmitter
                    .event()
                    .id(emitterId)
                    .data(data)
            );
        } catch (IOException e) {
            deleteByEmitterId(emitterId);
        }
    }

    private String makeIdByUserId(Long userId) {
        return userId + "_" + System.currentTimeMillis();
    }

    private boolean hasLatestNotification(String lastEventId) {
        return !lastEventId.isEmpty();
    }
}

 

๐Ÿ’ก Client Source

  • JavaScript์—์„œ๋Š” EventSource๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์„œ๋ฒ„์™€ ์—ฐ๊ฒฐ์„ ๋งบ๊ณ  ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
<script type="text/javaScript">
    const userId = 10;
    const eventSource = new EventSource(`http://localhost:8080/notification/subscribe?userId=${userId}`);
    eventSource.onmessage = function (event) {
        console.log(event.data);
    };

    eventSource.onerror = (e) => {
        console.log(e);
    }
</script>

์—ฐ๊ฒฐ ํ›„ ์‘๋‹ต ๊ฒฐ๊ณผ

 

๐Ÿ‘ ์ค‘๊ฐ„ ์ •๋ฆฌ

  • ์—ฌ๊ธฐ๊นŒ์ง€ Server-Sent-Event๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํด๋ผ์ด์–ธํŠธ์™€ ์„œ๋ฒ„๊ฐ€ ์—ฐ๊ฒฐ์„ ๋งบ๊ณ  ์•Œ๋ฆผ์„ ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค!๋Š” ๊ฒƒ๊นŒ์ง€๋Š” ์•Œ๊ฒŒ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.
    ์ง€๊ธˆ๋ถ€ํ„ฐ๋Š” ๊ธ€ ์ž‘์„ฑ์‹œ ์•Œ๋ฆผ์„ ๋ฐ›์„ ์ˆ˜ ์žˆ๋„๋กํ•˜๋Š” ์ƒํ™ฉ์„ ๊ฐ€์ •ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.
  • ์—ฌ๊ธฐ์„œ๋ถ€ํ„ฐ๋Š” ํด๋ผ์ด์–ธํŠธ ์†Œ์Šค๋Š” ๋™์ผํ•˜๋‹ˆ ์„œ๋ฒ„ ์ฝ”๋“œ๋ฅผ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

 

๐Ÿ’ก ๊ธ€ ์ž‘์„ฑ ์‹œ ์ด๋ฒคํŠธ ๋ฐœ์†ก(์„œ๋ฒ„ ์ฝ”๋“œ)

  • ํ•„์š”ํ•œ ์ž‘์—…
    1. ๊ธ€ ์ž‘์„ฑ์‹œ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ์•Œ๋ฆผ์„ ๋ณด๋‚ด์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
    2. ๊ธ€ ์ž‘์„ฑ์ด COMMIT๋˜๋ฉด ์•Œ๋ฆผ์„ ๋ณด๋ƒ…๋‹ˆ๋‹ค.
    3. ๋น„๋™๊ธฐ๋กœ ์•Œ๋ฆผ์„ ๋ฐœ์†กํ•ฉ๋‹ˆ๋‹ค.

 

๊ธ€ ์ž‘์„ฑ

  • ์•„๋ž˜๋Š” ๊ธ€ ์ž‘์„ฑ ํ›„ ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.
public enum ActionType {
    UNICAST,
    BROADCAST
}

@Service
@Transactional
@RequiredArgsConstructor
public class PostsService {

    private final PostsRepository postsRepository;
    private final ApplicationEventPublisher applicationEventPublisher;

    public Long create() {
        Posts posts = Posts.builder()
                .title("์ œ๋ชฉ์€?")
                .contents("๋‚ด์šฉ์€?")
                .build();
        postsRepository.save(posts);
        applicationEventPublisher.publishEvent(ActionType.BROADCAST); // ์ด๋ฒคํŠธ ๋ฐœ์ƒ
        return posts.getId();
    }
}

 

์ด๋ฒคํŠธ ๋ฆฌ์Šค๋„ˆ

  • ์‚ฌ์šฉ์ž๊ฐ€ ๊ธ€์„ ์ž‘์„ฑํ–ˆ์„ ๊ฒฝ์šฐ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š๋Š”๋‹ค๋ฉด ์ด๋ฒคํŠธ ๋ฆฌ์Šค๋„ˆ๊ฐ€ ํ˜ธ์ถœ๋ฉ๋‹ˆ๋‹ค. @TransactionEventListener ์–ด๋…ธํ…Œ์ด์…˜์„ ์‚ฌ์šฉํ•˜์—ฌ @Transactional ์–ด๋…ธํ…Œ์ด์…˜์ด ์„ ์–ธ๋œ ํด๋ž˜์Šค ๋ ˆ๋ฒจ ํ˜น์€ ๋ฉ”์„œ๋“œ ๋ ˆ๋ฒจ์—์„œ ์ปค๋ฐ‹์ด ๋œ ํ›„์— ํ•ด๋‹น ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.
    ์—ฌ๊ธฐ์„œ @EventListener ์–ด๋…ธํ…Œ์ด์…˜์„ ์‚ฌ์šฉํ•˜๊ฒŒ ๋œ๋‹ค๋ฉด Posts ๋“ฑ๋ก ์‹œ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๋”๋ผ๋„ ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.
  • ์ด๋ฒคํŠธ์˜ ์•ก์…˜ ํƒ€์ž…์— ๋งž๊ฒŒ ํŠน์ • ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ๋ฐœ์†กํ• ๊ฑด์ง€ ๋˜๋Š” ๋ชจ๋“  ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ๋ฐœ์†กํ• ๊ฑด์ง€ ๋ถ„๊ธฐ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
  • ์•Œ๋ฆผ ๋ฐœ์†ก ์‹œ ๋ชจ๋“  ์‚ฌ์šฉ์ž์—๊ฒŒ ์•Œ๋ฆผ์„ ๋ณด๋‚ด๋Š” ๊ณผ์ •์„ ๋น„๋™๊ธฐ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก @Async ์–ด๋…ธํ…Œ์ด์…˜์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.
@Component
@RequiredArgsConstructor
public class SseEventListener {

    private static final String TEXT = "์ƒˆ๋กœ์šด ์•Œ๋ฆผ์ด ๋„์ฐฉํ–ˆ์Šต๋‹ˆ๋‹ค.";
    private final NotificationService notificationService;

    @Async
    @TransactionalEventListener
    public void eventHandler(ActionType actionType) {
        if (actionType == ActionType.UNICAST) {
            notificationService.send(15L, new NotificationDTO(TEXT));
        }
        if (actionType == ActionType.BROADCAST) {
            notificationService.sendBroadcast(new NotificationDTO(TEXT));
        }
    }
}

 

๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ Config

  • ์•Œ๋ฆผ์„ ๋น„๋™๊ธฐ ๋ฐฉ์‹์œผ๋กœ ๋ฐœ์†กํ•˜๊ธฐ ์œ„ํ•ด Async๋ฅผ ์„ค์ •ํ•ด์ค๋‹ˆ๋‹ค.
  • ๋น„๋™๊ธฐ์— ๋Œ€ํ•ด ์ž์„ธํ•œ ๋‚ด์šฉ์€ ์•„๋ž˜ ๋งํฌ๋ฅผ ์ฒจ๋ถ€ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.
  • ๋น„๋™๊ธฐ์˜ ๋™์ž‘ ๋ฐฉ์‹  ๋งํฌ๋ฅผ ์ฐธ๊ณ ํ•ด์ฃผ์„ธ์š”!
  • @Async ์„ค์ • ๋ฐฉ๋ฒ•  ๋งํฌ๋ฅผ ์ฐธ๊ณ ํ•ด์ฃผ์„ธ์š”!
@Configuration
@EnableAsync
public class AsyncConfig extends AsyncConfigurerSupport {

    private static final int CORE_POOL_SIZE = 10;
    private static final int MAX_POOL_SIZE = 90;
    private static final int QUEUE_CAPACITY = 10;
    private static final boolean TASKS_TO_COMPLETE_ON_SHUTDOWN = true;
    private static final int AWAIT_TERMINATION_SECONDS = 60;

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("async-");
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        executor.setQueueCapacity(QUEUE_CAPACITY);
        executor.setWaitForTasksToCompleteOnShutdown(TASKS_TO_COMPLETE_ON_SHUTDOWN);
        executor.setAwaitTerminationSeconds(AWAIT_TERMINATION_SECONDS);
        executor.setRejectedExecutionHandler((r, e) -> {
            throw new IllegalStateException("๋™์‹œ ์š”์ฒญ์ˆ˜๊ฐ€ ๋งŽ์Šต๋‹ˆ๋‹ค. ์ž ์‹œํ›„์— ๋‹ค์‹œ ์š”์ฒญํ•ด์ฃผ์„ธ์š”.");
        });
        executor.initialize();
        return executor;
    }
}

 

 

๐Ÿ˜‚ ๊ตฌํ˜„ ํ›„ ์•Œ๊ฒŒ๋œ Server-Sent-Event์˜ ์‚ฌ์‹ค

  • AWS API Gateway๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค๋ฉด SSE๋ฅผ ์ ์šฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. ๊ตฌํ˜„ ํ›„ ๊ฐœ๋ฐœ ์„œ๋ฒ„์— ๋ฐฐํฌํ•˜๋ฉด์„œ ๊ณ„์†ํ•˜์—ฌ 504 ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ–ˆ๋Š”๋ฐ 
    ๊ตฌ๊ธ€๋ง ๊ฒฐ๊ณผ ์ง€์›์„ ํ•˜์ง€ ์•Š๋Š”๋‹ค๊ณ ํ•˜์—ฌ rabbitMQ๋ฅผ ์ ์šฉํ•˜์—ฌ ๊ฐœ๋ฐœํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค..
    https://github.com/streamdata-serverless/streamdata-io-basic-demo-stockmarket-prices/issues/1
  • ๋‹ค์ŒํŽธ์—๋Š” rabbitMQ์™€ MQTT๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์•Œ๋ฆผ์„ ๋ฐœ์†กํ•˜๋Š” ํŽธ์œผ๋กœ ๋Œ์•„์˜ค๊ฒ ์Šต๋‹ˆ๋‹ค.!

 

 

 

 

 

 

 

728x90
๋ฐ˜์‘ํ˜•
๊ณต์ง€์‚ฌํ•ญ
์ตœ๊ทผ์— ์˜ฌ๋ผ์˜จ ๊ธ€
์ตœ๊ทผ์— ๋‹ฌ๋ฆฐ ๋Œ“๊ธ€
Total
Today
Yesterday
ยซ   2024/09   ยป
์ผ ์›” ํ™” ์ˆ˜ ๋ชฉ ๊ธˆ ํ† 
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
๊ธ€ ๋ณด๊ด€ํ•จ