Spring Framework/Spring WebFlux

[WebFlux] ๋น„๋™๊ธฐ/๋™๊ธฐ, non-blocking/blocking, CompletableFuture

soogoori 2024. 7. 3. 11:24

ํŒ€์—์„œ IoT๋ฅผ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด WebFlux๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ๋‹ค.

WebFlux์— ๋Œ€ํ•ด ์–•๊ฒŒ๋งŒ ์•Œ๊ณ  ์‚ฌ์šฉํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— ์ด๋ฒˆ ๊ธฐํšŒ์— ๊ณต๋ถ€ํ•ด๋ณด๊ธฐ๋กœ ํ•œ๋‹ค. 

 

โœด๏ธ Sync(๋™๊ธฐ)์™€ Async(๋น„๋™๊ธฐ)

๐Ÿ”ถ Sync : ํด๋ผ์ด์–ธํŠธ์—์„œ ๋ฉ”์„œ๋“œ ํ˜ธ์ถœ → ๋ฉ”์„œ๋“œ ์ œ๊ณตํ•˜๋Š” ๊ณณ์—์„œ ๊ธฐ๋Šฅ์„ ๋ชจ๋‘ ์ˆ˜ํ–‰ํ•ด ๊ฒฐ๊ณผ๊ฐ’์ด ๊ฒฐ์ •๋˜๋ฉด ๊ทธ๋•Œ ๋ฐ˜ํ™˜
๐Ÿ‘‰ ์š”์ฒญํ•œ ์ž‘์—…์— ๋Œ€ํ•ด ์™„๋ฃŒ ์—ฌ๋ถ€๋ฅผ ์‹ ๊ฒฝ ์จ์„œ ์ž‘์—…์„ ์ˆœ์ฐจ์ ์œผ๋กœ ์ˆ˜ํ–‰ํ• ์ง€ ์•„๋‹Œ์ง€ ๊ฒฐ์ •

๐Ÿ”ถ Async : ๊ฒฐ๊ณผ๊ฐ’์ด ๊ฒฐ์ •๋˜๊ธฐ ์ „์— ์ผ๋‹จ ๋ฐ˜ํ™˜ (์ˆœ์„œ ๋ณด์žฅ X)
๐Ÿ‘‰ ์š”์ฒญํ•œ ์ž‘์—…์— ๋Œ€ํ•ด ์™„๋ฃŒ ์—ฌ๋ถ€๋ฅผ ๋”ฐ์ง€์ง€ ์•Š๊ณ  ์ž์‹ ์˜ ๋‹ค์Œ ์ž‘์—…์„ ๊ทธ๋Œ€๋กœ ์ˆ˜ํ–‰

 

โœด๏ธ Blocking๊ณผ Non-Blocking

๐Ÿ”ท Blocking : ์š”์ฒญํ•œ ์ž‘์—…์ด ๋๋‚  ๋•Œ๊นŒ์ง€ ๋‹ค๋ฅธ ์ž‘์—…ํ•˜์ง€ ์•Š๊ณ  ๊ธฐ๋‹ค๋ฆผ

๐Ÿ”ท Non-Blocking : ์š”์ฒญํ•œ ์ž‘์—…์ด ์ˆ˜ํ–‰๋˜๋Š” ๋™์•ˆ ๋‹ค๋ฅธ ์ž‘์—… ๊ฐ€๋Šฅ. 

 

โœด๏ธ ์ฝœ๋ฐฑํ•จ์ˆ˜ 

๐Ÿ‘‰ ๋‹ค๋ฅธ ์ž‘์—…์˜ ์™„๋ฃŒ ์—ฌ๋ถ€๋‚˜ ๊ฒฐ๊ณผ์— ๋Œ€ํ•œ ํ›„์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด ์ด์šฉ

 

 

CompletableFuture ๋ž€? 

java.util.concurrent ํŒจํ‚ค์ง€์— ์†ํ•˜๋Š” ํด๋ž˜์Šค ์ค‘ ํ•˜๋‚˜๋กœ, ๋น„๋™๊ธฐ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ๋ณด๋‹ค ์‰ฝ๊ฒŒ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋„๋ก ๋„์™€์ค€๋‹ค. 

Java 8์— ์ฒ˜์Œ ๋„์ž…๋˜์—ˆ๊ณ , Future์™€ CompletionStage์„ ๊ตฌํ˜„ํ•˜๊ณ  ์žˆ๋‹ค. 

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

 

Future์™€ CompletionStage๋Š” ๋ญ˜๊นŒ.

 

 

Future

  • Java 5์— ๋„์ž…
  • ๋น„๋™๊ธฐ์ ์ธ ์ž‘์—… ์ˆ˜ํ–‰
  • ์ž‘์—… ์ƒํƒœ ํ™•์ธ - ์™„๋ฃŒ, ์ทจ์†Œ, ์ง„ํ–‰ ์ค‘ ์—ฌ๋ถ€
  • ํ•ด๋‹น ์ž‘์—…์ด ์™„๋ฃŒ๋˜๋ฉด ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค

๐Ÿ”ถ ์ฃผ์š” ๋ฉ”์„œ๋“œ 

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    
    boolean isCancelled();
    
    boolean isDone();
    
    V get() throws InterruptedException, ExecutionException;
    
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

 

  • cancel : ์ž‘์—… ์ทจ์†Œ ๐Ÿ‘‰ mayInterruptIfRunning์ด true์ด๋ฉด ์‹คํ–‰ ์ค‘์ธ ์ž‘์—… ์ค‘๋‹จ 
  • get() : ์ž‘์—…์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฐ ํ›„ ๊ฒฐ๊ณผ ๋ฐ˜ํ™˜ ๐Ÿ‘‰ ์ž‘์—…์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ํ˜ธ์ถœ ์Šค๋ ˆ๋“œ๋Š” ๋ธ”๋กœํ‚น๋จ

๐Ÿ”ถ ํ•œ๊ณ„

  • cancel์„ ์ œ์™ธํ•˜๊ณ  ์™ธ๋ถ€์—์„œ future ์ปจํŠธ๋กค X ๐Ÿ‘‰ ์ฝœ๋ฐฑ ๋ฏธ์ง€์›
  • ๋ธ”๋กœํ‚น : Future.get() ๋ฉ”์„œ๋“œ๋Š” ๊ฒฐ๊ณผ๊ฐ€ ์ค€๋น„๋  ๋•Œ๊นŒ์ง€ ๋ธ”๋กœํ‚น๋˜๋ฏ€๋กœ ๋น„๋™๊ธฐ ์ž‘์—…์˜ ์ด์  ์ถฉ๋ถ„ํžˆ ํ™œ์šฉ X
  • ๋ฐ˜ํ™˜๋œ ๊ฒฐ๊ณผ๋ฅผ get()ํ•ด์„œ ์ ‘๊ทผํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ์–ด๋ ค์›€
  • ์™„๋ฃŒ๋˜๊ฑฐ๋‚˜ ์—๋Ÿฌ ๋ฐœ์ƒํ–ˆ๋Š”์ง€ ๊ตฌ๋ถ„ ์–ด๋ ค์›€ 
  • ์กฐํ•ฉ ์–ด๋ ค์›€ : ์—ฌ๋Ÿฌ ๋น„๋™๊ธฐ ์ž‘์—…์„ ์กฐํ•ฉ, ์ˆœ์ฐจ์ ์œผ๋กœ ์‹คํ–‰, ๊ฒฐ๊ณผ ์กฐํ•ฉ ์–ด๋ ค์›€ 

 

 

CompletionStage

  • Java 8์— ๋„์ž… 
  • ๋น„๋™๊ธฐ์ ์ธ ์ž‘์—… ์ˆ˜ํ–‰
  • ํ•ด๋‹น ์ž‘์—…์ด ์™„๋ฃŒ๋˜๋ฉด ๊ฒฐ๊ณผ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ฑฐ๋‚˜ ๋‹ค๋ฅธ CompletionStage๋ฅผ ์—ฐ๊ฒฐํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค
  • ๋น„๋™๊ธฐ task๋“ค์„ ์‹คํ–‰ํ•˜๊ณ  ๊ฐ’์„ ๋ณ€ํ˜•ํ•˜๋Š” ๋“ฑ chaining์„ ์ด์šฉํ•œ ์กฐํ•ฉ ๊ฐ€๋Šฅ
  • ์—๋Ÿฌ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ์ฝœ๋ฐฑ ์ œ๊ณต

 

๐Ÿ”ถ ์ฃผ์š” ๋ฉ”์„œ๋“œ 

public interface CompletionStage<T> {
    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
    public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
    
    public CompletionStage<Void> thenAccept(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
    
    public CompletionStage<Void> thenRun(Runnable action);
    public CompletionStage<Void> thenRunAsync(Runnable action);
    
    public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
    public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
    
    public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
}
  • thenAccept[Async] 
    • Consumer๋ฅผ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋ฐ›์Œ 
    • ํ˜„์žฌ ๋‹จ๊ณ„์˜ ๊ฒฐ๊ณผ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋™๊ธฐ์ ์œผ๋กœ ์ž‘์—… ์ˆ˜ํ–‰ํ•˜๊ณ  ๋ฐ˜ํ™˜ X
    • ๊ฐ’์„ ๋ฐ›์•„์„œ action๋งŒ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒฝ์šฐ์— ์œ ์šฉ

 

  • thenApply[Async]
    • Function์„ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋ฐ›์Œ
    • ์ด์ „ task๋กœ๋ถ€ํ„ฐ T ํƒ€์ž…์˜ ๊ฐ’์„ ๋ฐ›์•„์„œ ๊ฐ€๊ณตํ•˜๊ณ  U ํƒ€์ž…์˜ ๊ฐ’ ๋ฐ˜ํ™˜
    • ๋‹ค์Œ task์—๊ฒŒ ๋ฐ˜ํ™˜ํ–ˆ๋˜ ๊ฐ’์ด ์ „๋‹ฌ๋จ
    • thenApply → ํ˜„์žฌ CompletableFuture๋ฅผ ์™„๋ฃŒํ•œ ๋™์ผํ•œ ์Šค๋ ˆ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ณ€ํ™˜ ํ•จ์ˆ˜ ์‹คํ–‰
      ๐Ÿ‘‰ ๋ณ€ํ™˜ ํ•จ์ˆ˜์˜ ์‹คํ–‰์€ ๊ฒฐ๊ณผ๊ฐ€ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•ด์ง„ ์งํ›„์— ๋ฐœ์ƒ 
    • CompletableFuture์˜ ๊ฒฐ๊ณผ์— ๋Œ€ํ•ด ์ฒด์ด๋‹ ์ž‘์—… ์ˆ˜ํ–‰ 
    • ๊ฐ’์„ ๋ณ€ํ˜•ํ•ด์„œ ์ „๋‹ฌํ•ด์•ผ ํ•˜๋Š” ๊ฒฝ์šฐ ์œ ์šฉ

 

  • thenCompose[Async]
    • Fuction์„ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋ฐ›์Œ
    • ์ด์ „ task๋กœ๋ถ€ํ„ฐ T ํƒ€์ž…์˜ ๊ฐ’์„ ๋ฐ›์•„์„œ ๊ฐ€๊ณตํ•˜๊ณ  U ํƒ€์ž…์˜ CompletionStage ๋ฐ˜ํ™˜
    • ๋ฐ˜ํ™˜ํ•œ CompletionStage๊ฐ€ done ์ƒํƒœ๊ฐ€ ๋˜๋ฉด ๊ฐ’์„ ๋‹ค์Œ task์— ์ „๋‹ฌ
    • ๋‹ค๋ฅธ future๋ฅผ ๋ฐ˜ํ™˜ํ•ด์•ผํ•˜๋Š” ๊ฒฝ์šฐ ์œ ์šฉ 

 

  • thenRun[Async]
    • Runnable์„ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋ฐ›์Œ
    • ์ด์ „ task๋กœ๋ถ€ํ„ฐ ๊ฐ’์„ ๋ฐ›์ง€ ์•Š๊ณ  ๊ฐ’์„ ๋ฐ˜ํ™˜ x
    • ๋‹ค์Œ task์—๊ฒŒ null์ด ์ „๋‹ฌ๋จ
    • future๊ฐ€ ์™„๋ฃŒ๋˜์—ˆ๋‹ค๋Š” ์ด๋ฒคํŠธ๋ฅผ ๊ธฐ๋กํ•  ๋•Œ ์œ ์šฉ 

 

  • then*[Async]
    • ๋น„๋™๊ธฐ์ ์œผ๋กœ ์‹คํ–‰
    • Async ๐Ÿ‘‰ ๋ณ„๋„์˜ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰ 
    • ํ˜ธ์ถœ๋œ ์Šค๋ ˆ๋“œ๋Š” ์ฆ‰์‹œ ๋ฐ˜ํ™˜๋˜๊ณ  ๊ฒฐ๊ณผ๋Š” ๋‚˜์ค‘์— ์‚ฌ์šฉ ๊ฐ€๋Šฅ
    • Executor ์‚ฌ์šฉํ•ด์„œ ์Šค๋ ˆ๋“œ ํ’€์„ ํ†ตํ•ด ์Šค๋ ˆ๋“œ ๊ด€๋ฆฌ ๊ฐ€๋Šฅ

 

  • exceptionally
    • Function์„ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋ฐ›์Œ
    • ์ด์ „ task์—์„œ ๋ฐœ์ƒํ•œ exception์„ ๋ฐ›์•„์„œ ์ฒ˜๋ฆฌํ•˜๊ณ  ๊ฐ’ ๋ฐ˜ํ™˜
    • ๋‹ค์Œ task์—๊ฒŒ ๋ฐ˜ํ™˜๋œ ๊ฐ’์„ ์ „๋‹ฌ
    • future ํŒŒ์ดํ”„์—์„œ ๋ฐœ์ƒํ•œ ์—๋Ÿฌ๋ฅผ ์ฒ˜๋ฆฌํ•  ๋•Œ ์œ ์šฉ 

 

 

 

CompletableFuture

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { … }
    public static CompletableFuture<Void> runAsync(Runnable runnable) { … }
    
    public boolean complete(T value) { … }
    public boolean isCompletedExceptionally() { … }
    
    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { … }
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { … }
}

 

  • supplyAsync
    • ๋น„๋™๊ธฐ์ ์œผ๋กœ ๊ฐ’์„ ๊ณ„์‚ฐํ•˜๊ณ  ๊ทธ ๊ฐ’์„ ๋ฐ˜ํ™˜ 
    • Supplier๋ฅผ ์ œ๊ณตํ•˜์—ฌ CompletableFuture ์ƒ์„ฑ ๊ฐ€๋Šฅ
    • Supplier์˜ ๋ฐ˜ํ™˜๊ฐ’์ด CompletableFuture์˜ ๊ฒฐ๊ณผ๋กœ

 

  • runAsync
    • ๋น„๋™๊ธฐ์ ์œผ๋กœ ์‹คํ–‰๋˜์ง€๋งŒ ๋ฐ˜ํ™˜๊ฐ’์ด ์—†๋Š” ์ž‘์—… ์‹คํ–‰ 
    • Runnable ์ œ๊ณตํ•˜์—ฌ CompletableFuture ์ƒ์„ฑ ๊ฐ€๋Šฅ
    • ๋‹ค์Œ task์— null ์ „๋‹ฌ๋จ 
๐Ÿ”ท Supplier ์ธํ„ฐํŽ˜์ด์Šค 
@FunctionalInterface
public interface Supplier<T> {
    T get();
}โ€‹

 

๋งค๊ฐœ๋ณ€์ˆ˜ ์—†์ด ์–ด๋–ค ๊ฐ’์„ ์ œ๊ณตํ•˜๋Š” ๋ฉ”์„œ๋“œ๋ฅผ ์ •์˜ํ•˜๋Š” ํ•จ์ˆ˜ํ˜• ์ธํ„ฐํŽ˜์ด์Šค
๐Ÿ‘‰ ๊ฐ’์„ ์ƒ์„ฑํ•˜๊ฑฐ๋‚˜ ๊ณ„์‚ฐํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋จ

 

๐Ÿ”ท Runnable ์ธํ„ฐํŽ˜์ด์Šค
@FunctionalInterface
public interface Runnable {
    void run();
}โ€‹

 

๋งค๊ฐœ๋ณ€์ˆ˜๊ฐ€ ์—†๊ณ  ๋ฐ˜ํ™˜๊ฐ’์ด ์—†๋Š” ์ž‘์—…์„ ์ •์˜ํ•˜๋Š” ํ•จ์ˆ˜ํ˜• ์ธํ„ฐํŽ˜์ด์Šค
๐Ÿ‘‰ ์ž‘์—…์„ ์‹คํ–‰ํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋จ 

 

  • complete
    • complete์— ์˜ํ•ด์„œ ์ƒํƒœ๊ฐ€ ๋ฐ”๋€Œ์—ˆ๋‹ค๋ฉด true, ์•„๋‹ˆ๋ผ๋ฉด false ๋ฐ˜ํ™˜

 

  • isCompletedExceptionally
    • CompletableFuture ๊ฐ์ฒด๊ฐ€ ์™„๋ฃŒ๋œ ์ƒํƒœ
    • exception์— ์˜ํ•ด์„œ complete ๋˜์—ˆ๋Š”์ง€ ํ™•์ธ 
    • ์™„๋ฃŒ๋œ ๊ฒฐ๊ณผ๊ฐ€ ์˜ˆ์™ธ๋ฅผ ๋˜์ง€๋ฉด true ๋ฐ˜ํ™˜, ์•„๋‹ˆ๋ฉด false ๋ฐ˜ํ™˜
    • ์˜ˆ์™ธ๋ฅผ ๋˜์ง€์ง€ ์•Š๊ณ  ์ •์ƒ์ ์œผ๋กœ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ–ˆ์„ ๊ฒฝ์šฐ false ๋ฐ˜ํ™˜
    • ๋น„๋™๊ธฐ ์ž‘์—…์ด ์˜ˆ์™ธ๋ฅผ ๋˜์ง€๋Š”์ง€ ์—ฌ๋ถ€ ํ™•์ธํ•  ๋•Œ ์œ ์šฉํ•˜๊ฒŒ ์‚ฌ์šฉ๋จ 

 

  • allOf
    • ์—ฌ๋Ÿฌ completableFuture๋ฅผ ๋ชจ์•„์„œ ํ•˜๋‚˜๋กœ ๋ณ€ํ™˜
    • ๋ชจ๋“  completableFuture๊ฐ€ ์™„๋ฃŒ๋˜๋ฉด ์ƒํƒœ๊ฐ€ done์œผ๋กœ ๋ณ€๊ฒฝ
    • void ๋ฐ˜ํ™˜ํ•˜๋ฏ€๋กœ ๊ฐ๊ฐ์˜ ๊ฐ’์— get์œผ๋กœ ์ ‘๊ทผ

 

  • anyOf
    • ์—ฌ๋Ÿฌ completableFuture๋ฅผ ๋ชจ์•„์„œ ํ•˜๋‚˜๋กœ ๋ณ€ํ™˜
    • ์ฃผ์–ด์ง„ future ์ค‘ ํ•˜๋‚˜๋ผ๋„ ์™„๋ฃŒ๋˜๋ฉด ์ƒํƒœ๊ฐ€ done์œผ๋กœ ๋ณ€๊ฒฝ
    • ์ œ์ผ ๋จผ์ € done ์ƒํƒœ๊ฐ€ ๋˜๋Š” future์˜ ๊ฐ’ ๋ฐ˜ํ™˜

๐Ÿ”ถ ํ•œ๊ณ„

  • ์ง€์—ฐ ๋กœ๋”ฉ ๊ธฐ๋Šฅ ์ œ๊ณต X
  • ์ง€์†์ ์œผ๋กœ ์ƒ์„ฑ๋˜๋Š” ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์–ด๋ ค์›€

 

 

 

 

 

์ฐธ๊ณ ์ž๋ฃŒ 

https://www.baeldung.com/java-completablefuture

https://11st-tech.github.io/2024/01/04/completablefuture/

https://www.youtube.com/watch?v=oEIoqGd-Sns

https://dkswnkk.tistory.com/733