Spring Framework/Spring & Spring Boot

[Spring/Spring Boot] SSE (Server-Sent Events)์™€ EventSource ์ ์šฉํ•ด๋ณด๊ธฐ

soogoori 2024. 5. 14. 15:08

SSE (Server-Sent Events)๋ž€?

์„œ๋ฒ„์—์„œ ํด๋ผ์ด์–ธํŠธ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ „์†กํ•˜๋Š” ๊ธฐ์ˆ ์ด๋‹ค. ์„œ๋ฒ„๊ฐ€ ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ์ง€์†์ ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†กํ•  ๋•Œ ์ฃผ๋กœ ์‚ฌ์šฉ๋œ๋‹ค.

 

๋˜ํ•œ ์‹ค์‹œ๊ฐ„ ์—…๋ฐ์ดํŠธ๊ฐ€ ํ•„์š”ํ•œ ๊ฒฝ์šฐ(์ฃผ์‹ ์‹œ์„ธ, ์‹ค์‹œ๊ฐ„ ์ฑ„ํŒ…, ์•Œ๋ฆผ ๋“ฑ)์— ์‚ฌ์šฉ๋˜๋ฉฐ HTTP ๊ธฐ๋ฐ˜์œผ๋กœ ์›น ๋ธŒ๋ผ์šฐ์ €์™€ ์›น ์„œ๋ฒ„ ๊ฐ„์˜ ๋‹จ๋ฐฉํ–ฅ ํ†ต์‹ ์„ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•˜๋ฏ€๋กœ ํด๋ผ์ด์–ธํŠธ๋Š” ์„œ๋ฒ„์— ์š”์ฒญ์„ ๋ณด๋‚ด์ง€ ์•Š๊ณ ๋„ ์„œ๋ฒ„๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์„ ์ˆ˜ ์žˆ๊ณ , ํด๋ผ์ด์–ธํŠธ๋Š” ์›น ๋ธŒ๋ผ์šฐ์ €์˜ EventSource API๋ฅผ ์‚ฌ์šฉํ•ด ์„œ๋ฒ„๋กœ๋ถ€ํ„ฐ ์ด๋ฒคํŠธ๋ฅผ ์ˆ˜์‹ ํ•˜๊ณ  ์ฒ˜๋ฆฌํ•œ๋‹ค.

 

ํ”„๋กœ์ ํŠธ์—์„œ ์•จ๋ŸฐAI์—๊ฒŒ ์งˆ๋ฌธ์„ ์š”์ฒญํ•˜๋ฉด ์‘๋‹ต์„ ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋ฐ›๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉํ•˜๊ณ ์ž ํ–ˆ๋‹ค.

์‹ค์ œ๋กœ ํ”„๋กœ์ ํŠธ์—์„œ ์–ด๋–ป๊ฒŒ ๊ตฌํ˜„ํ–ˆ๋Š”์ง€ ์•Œ์•„๋ณด์ž !

 

 

 

WebClientService ์ฝ”๋“œ ๊ตฌํ˜„ - ๋น„๋™๊ธฐ๋กœ ์™ธ๋ถ€ API ํ˜ธ์ถœ

๐Ÿ”น WebClientService 

@RequiredArgsConstructor
@Service
@Slf4j
public class WebClientService {

    @Value("${alan.key}")
    private String alanId;
    private final WebClient webClient = WebClient.builder().build();
    private final ObjectMapper objectMapper = new ObjectMapper().configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);

    public Flux<AlanResponseDto> getResponse(String content){
        return webClient.get()
                .uri("https://kdt-api-function.azurewebsites.net/api/v1/question/sse-streaming",
                        uriBuilder -> uriBuilder
                        .queryParam("content", content)
                        .queryParam("client_id", alanId)
                        .build())
                .retrieve()
                .bodyToFlux(String.class)
                .flatMap(this::parseJsonToResponseDto);
    }

    private Flux<AlanResponseDto> parseJsonToResponseDto(String jsonString){
        try {
            // JSON ๋ฌธ์ž์—ด์„ ResponseDto ๊ฐ์ฒด๋กœ ํŒŒ์‹ฑ
            AlanResponseDto responseDto = this.objectMapper.readValue(jsonString, AlanResponseDto.class);
            return Flux.just(responseDto);
        } catch (JsonProcessingException e) {
            return Flux.error(e);
        }
    }
}

 

์ด์ŠคํŠธ์†Œํ”„ํŠธ์—์„œ ์ œ๊ณตํ•œ ์•จ๋ŸฐAI API๋ฅผ WebClient๋ฅผ ํ™œ์šฉํ•ด ๋น„๋™๊ธฐ๋กœ ์š”์ฒญํ•˜๊ณ , ์‘๋‹ต์„ ๋ฐ›์•„ ๋ฌธ์ž์—ด ์ŠคํŠธ๋ฆผ์œผ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.  

๋ฌธ์ž์—ด๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ์ด์œ ๋Š” ํ•ด๋‹นํ•˜๋Š” JSON ์‘๋‹ต๊ฐ’์ด ํฐ๋”ฐ์˜ดํ‘œ๊ฐ€ ์•„๋‹Œ ์ž‘์€๋”ฐ์˜ดํ‘œ๋กœ ๊ตฌ์„ฑ๋˜์–ด์žˆ๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. 

 

๊ทธ๋Ÿฌ๋ฏ€๋กœ ๊ฐ parseJsonToResponseDto() ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด์„œ ์ž‘์€๋”ฐ์˜ดํ‘œ๋กœ ๊ตฌ์„ฑ๋œ JSON ๋ฌธ์ž์—ด์„ ๊ฐ์ฒด๋กœ ํŒŒ์‹ฑํ•˜๋Š” ๊ณผ์ •์„ ๊ฑฐ์ณ์•ผํ•˜๊ณ  ์ตœ์ข…์ ์œผ๋กœ๋Š” Flux<AlanResponseDto>๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค. 

 

@Getter
public class AlanResponseDto {
	@JsonProperty(value = "type")
	private String type;

	@JsonProperty(value = "data")
	private Data data;

	@Getter
	public static class Data {
		@JsonProperty(value = "content")
		private String content;

		@JsonProperty(value = "name")
		private String name;

		@JsonProperty(value = "speak")
		private String speak;
	}
}

AlanResponseDto๋Š” ์œ„์™€ ๊ฐ™๋‹ค. ์‘๋‹ต๊ฐ’ ๊ตฌ์กฐ์— ๋งž์ถฐ ์ž‘์„ฑํ•˜์˜€๋‹ค. 

์‘๋‹ต๊ฐ’ ๊ตฌ์กฐ

 

 

 

SseController ์ฝ”๋“œ ๊ตฌํ˜„

๐Ÿ”น SseController 

@RequiredArgsConstructor
@RestController
@RequestMapping("/api")
@Slf4j
public class SseController {

    private final WebClientService webClientService;

    @GetMapping("/alan")
    public Flux<ServerSentEvent<AlanResponseDto>> getAlanResponse(HttpServletResponse response, @RequestParam String content){
        response.setContentType("text/event-stream");
        return webClientService.getResponse(content)
                .doOnNext(data -> {
                    log.info("Received data: {}", data.getData().getContent());
                })
                .map(data -> ServerSentEvent.<AlanResponseDto>builder()
                        .data(data)
                        .build())
                .delayElements(Duration.ofMillis(50));
    }
}

WebClientService๋ฅผ ํ†ตํ•ด ์–ป์€ ๊ฐ’์„ SSE์™€ WebFlux๋ฅผ ํ™œ์šฉํ•ด ์‹ค์‹œ๊ฐ„์œผ๋กœ ์‚ฌ์šฉ์ž์—๊ฒŒ ๋ณด์—ฌ์ค„ ์ˆ˜ ์žˆ๋„๋ก ํ•˜๋Š” ์ฝ”๋“œ์ด๋‹ค. 

MIME ํƒ€์ž…์„ text/event-stream์œผ๋กœ ์ง€์ •ํ•˜๊ณ , map์„ ํ†ตํ•ด ์ „์†กํ•  ๋ฐ์ดํ„ฐ๋ฅผ ์„ค์ •ํ•˜๊ณ  ServerSentEvent๋กœ ๋งคํ•‘ํ•œ๋‹ค. 

 

 

 

ServerSentEvent ํด๋ž˜์Šค๋ฅผ ์‚ดํŽด๋ณด๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ํ•„๋“œ๋ฅผ ํฌํ•จํ•˜๋ฉฐ

 

๋ฉ”์„œ๋“œ๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ๋‹ค. 

Representation for a Server-Sent Event for use with Spring's reactive Web support. Flux<ServerSentEvent> or Observable<ServerSentEvent> is the reactive equivalent to Spring MVC's SseEmitter.

 

์ฐธ๊ณ ์ž๋ฃŒ์— ์œ„์™€ ๊ฐ™์€ ๋‚ด์šฉ์ด ์žˆ๋Š” ๊ฒƒ์œผ๋กœ ๋ณด์•„ Spring MVC์˜ SseEmitter์™€ ์œ ์‚ฌํ•˜๋‹ค๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค. 

SseEmitter๋Š” ๋™๊ธฐ์‹ ๋ฐ ๋ธ”๋กœํ‚น ๋ฐฉ์‹์˜ Spring MVC์—์„œ SSE๋ฅผ ๊ตฌํ˜„ํ•˜๊ธฐ ์œ„ํ•œ ํด๋ž˜์Šค์ด๋ฉฐ,

Flux๋Š” ๋น„๋™๊ธฐ ๋ฐ ๋…ผ๋ธ”๋กœํ‚น ๋ฐฉ์‹์˜ Spring WebFlux์—์„œ SSE๋ฅผ ๊ตฌํ˜„ํ•˜๊ธฐ ์œ„ํ•œ ๋ฐ˜์‘ํ˜• ์ŠคํŠธ๋ฆผ์ด๋ฏ€๋กœ ๋” ๋งŽ์€ ๋™์‹œ์„ฑ์„ ์ง€์›ํ•˜๋ฉฐ ๋ฆฌ์†Œ์Šค๋ฅผ ํšจ์œจ์ ์œผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค. 


์ฃผ์–ด์ง„ event, id, data ํ•„๋“œ๋Š” Nullable์ด๋ฏ€๋กœ ๋ฐ˜๋“œ์‹œ ํ•„์š”ํ•œ ๊ฒƒ์€ ์•„๋‹ˆ์—ˆ๋‹ค. 

JavaScript๋ฅผ ํ™œ์šฉํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ๋ถˆ๋Ÿฌ์˜ค๋ฉด์„œ ํ™•์‹คํžˆ ์“ฐ์ž„์ƒˆ๋ฅผ ์•Œ๊ฒŒ ๋˜์—ˆ๋‹ค. 

์ง„ํ–‰ํ•œ ํ”„๋กœ์ ํŠธ์—์„œ๋Š” ๋”ฑํžˆ id, event๊ฐ€ ์—†์–ด๋„ ๋˜๋Š” ํ•„๋“œ์˜€๊ธฐ ๋•Œ๋ฌธ์— ์ƒ๋žตํ–ˆ๊ณ , data๋งŒ ์‚ฌ์šฉํ•ด ํ•ด๋‹น ํ•„๋“œ์— ์‘๋‹ต๊ฐ’์„ ๋„ฃ์–ด์ฃผ์—ˆ๋‹ค. 

 

 

 

์™„์„ฑ๋œ ํ™”๋ฉด

 

 

 

 

 

 

 

์ฐธ๊ณ ์ž๋ฃŒ

https://developer.mozilla.org/en-US/docs/Web/API/EventSource

https://tecoble.techcourse.co.kr/post/2022-10-11-server-sent-events/

https://www.baeldung.com/spring-server-sent-events

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/http/codec/ServerSentEvent.html