Java

[Java] CompletableFuture ๋น„๋™๊ธฐ ์ž‘์—…

soogoori 2024. 5. 6. 02:46

xml ํ˜•์‹์˜ ์„œ์šธ ์‹ค์‹œ๊ฐ„ ๋„์‹œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€์„œ ํŒŒ์‹ฑํ•˜๊ณ  DB์— ์ €์žฅํ•˜๋Š” ๋ฐ๊นŒ์ง€ 442์ดˆ(7๋ถ„ 22์ดˆ) ์†Œ์š”๋๋‹ค.

ํ•œ ์ง€์—ญ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ํŒŒ์‹ฑํ•˜๋Š” ๋ฐ 3~10์ดˆ ๊ฐ€๋Ÿ‰ ์†Œ์š”๋๊ณ , ์ด 115๊ณณ์„ ๊ฐ€์ ธ์™€์•ผํ•˜๋ฏ€๋กœ ์•ฝ 7-8๋ถ„ ์†Œ์š”๋˜๋Š” ๊ฒƒ์ด์—ˆ๋‹ค. 

์›Œ๋‚™ ๊ฐ€์ ธ์™€์•ผํ•  ๋ฐ์ดํ„ฐ๊ฐ€ ๋งŽ์•„ ์• ์ดˆ์— ์ฃผ์†Œ์ฐฝ์— xml์ด ์กด์žฌํ•˜๋Š” url์„ ์š”์ฒญํ–ˆ์„ ๋•Œ 3~5์ดˆ ์ •๋„ ์ง€์—ฐ๋จ์ด ๋ฐœ์ƒํ•˜๊ธด ํ–ˆ๋‹ค. 

๊ฝค ์˜ค๋žœ ์‹œ๊ฐ„์ด ๊ฑธ๋ ค ์„ฑ๋Šฅ์„ ๊ฐœ์„ ์‹œํ‚ค๊ธฐ ์œ„ํ•ด ์—ฌ๋Ÿฌ ๋ฐฉ์‹๋“ค์„ ์‹œ๋„ํ•ด๋ดค๋‹ค...

 

CompletableFuture ๋ž€?

Java 8์—์„œ ์ถ”๊ฐ€๋œ ํด๋ž˜์Šค๋กœ ๋น„๋™๊ธฐ ์—ฐ์‚ฐ์„ ์ฒ˜๋ฆฌํ•˜๊ณ  ์™„๋ฃŒ๋œ ๊ฒฐ๊ณผ๋‚˜ ์˜ˆ์™ธ๋ฅผ ์ฒ˜๋ฆฌํ•œ๋‹ค. 

๊ธฐ์กด์— ์žˆ๋˜ Java5์˜ Future ์ธํ„ฐํŽ˜์ด์Šค์˜ ํ•œ๊ณ„์ ์„ ๊ฐœ์„ ์‹œํ‚จ ๊ฒƒ์ด๋‹ค. 

๋”๋ณด๊ธฐ

๐Ÿ”น Future๋ž€ ?

๐Ÿ‘‰ ๋น„๋™๊ธฐ ์ž‘์—…์˜ ๊ฒฐ๊ณผ๋ฅผ ๋‚˜ํƒ€๋‚ด๋Š” ๋ฐ ์‚ฌ์šฉ๋˜๋ฉฐ ์™„๋ฃŒ ์ƒํƒœ๋ฅผ ํ™•์ธํ•˜๊ฑฐ๋‚˜ ์ž‘์—… ๊ฒฐ๊ณผ๋ฅผ ์–ป๋Š” ๊ธฐ๋Šฅ๋งŒ ์ œ๊ณต

๐Ÿ”น Future์˜ ํ•œ๊ณ„์  

  • ์™ธ๋ถ€์—์„œ ์™„๋ฃŒ์‹œํ‚ฌ ์ˆ˜ ์—†์Œ โž” get()์˜ ํƒ€์ž„์•„์›ƒ ์„ค์ •์œผ๋กœ๋งŒ ์™„๋ฃŒ ๊ฐ€๋Šฅ 
  • ๋ธ”๋กœํ‚น ๋ฉ”์†Œ๋“œ ํ˜ธ์ถœ : get() ๋ฉ”์„œ๋“œ ํ˜ธ์ถœ ์‹œ ์ž‘์—…์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
  • ์ž‘์—… ์กฐํ•ฉ์˜ ์–ด๋ ค์›€
  • ์˜ˆ์™ธ ์ฒ˜๋ฆฌ ํ•œ๊ณ„

Future ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ํ™•์žฅํ•˜์—ฌ ์™ธ๋ถ€์—์„œ ์™„๋ฃŒ์‹œํ‚ฌ ์ˆ˜ ์žˆ์œผ๋ฉฐ ์ž‘์—… ์ฝœ๋ฐฑ, ์˜ˆ์™ธ ์ฒ˜๋ฆฌ, ๋น„๋™๊ธฐ์ ์œผ๋กœ ์—ฌ๋Ÿฌ ์ž‘์—…์„ ์กฐํ•ฉํ•˜๋Š” ๊ธฐ๋Šฅ ๋“ฑ์„ ์ œ๊ณตํ•˜์—ฌ ๋ณด๋‹ค ํšจ์œจ์ ์ธ ๋น„๋™๊ธฐ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ์ง€์›ํ•œ๋‹ค. 


๐Ÿ”น ๋น„๋™๊ธฐ ์ž‘์—… ์‹คํ–‰ ๋ฉ”์„œ๋“œ 

  • runAsync : ๋ฐ˜ํ™˜๊ฐ’์ด ์—†๋Š” ๊ฒฝ์šฐ
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // ๋น„๋™๊ธฐ ์ž‘์—… ์ˆ˜ํ–‰
});
  • supplyAsync : ๋ฐ˜ํ™˜๊ฐ’์ด ์žˆ๋Š” ๊ฒฝ์šฐ 
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // ๋น„๋™๊ธฐ ์ž‘์—… ์ˆ˜ํ–‰ ํ›„ ๊ฒฐ๊ณผ ๋ฐ˜ํ™˜
    return "์ž‘์—… ๊ฒฐ๊ณผ";
});

 

๐Ÿ”น ์ž‘์—… ์ฝœ๋ฐฑ 

  • thenApply : ๋น„๋™๊ธฐ ์ž‘์—…์ด ์™„๋ฃŒ๋œ ํ›„์— ํ•ด๋‹น ๊ฒฐ๊ณผ๋ฅผ ๊ฐ€๊ณตํ•˜๊ฑฐ๋‚˜ ๋ณ€ํ™˜
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
        .thenApply(result -> result + " World");

 

  • thenAccept : ๋น„๋™๊ธฐ ์ž‘์—…์ด ์™„๋ฃŒ๋œ ํ›„์— ํ•ด๋‹น ๊ฒฐ๊ณผ๋ฅผ ์†Œ๋น„
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
        .thenAccept(result -> System.out.println("Result: " + result));
  • thenRun : ๋น„๋™๊ธฐ ์ž‘์—…์ด ์™„๋ฃŒ๋œ ํ›„์— ์ถ”๊ฐ€์ ์ธ ์ž‘์—… ์ˆ˜ํ–‰
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
        .thenRun(() -> System.out.println("Additional task after completion"));

 

 

๐Ÿ”น ์ž‘์—… ์กฐํ•ฉ 

  • thenCompose : ์ด์ „ ์ž‘์—…์˜ ๊ฒฐ๊ณผ์— ๋”ฐ๋ผ ๋‹ค์Œ ์ž‘์—… ์ˆ˜ํ–‰ํ•˜๋Š”๋ฐ, ๋‹ค์Œ ์ž‘์—…์€ ์ด์ „ ์ž‘์—…์˜ ๊ฒฐ๊ณผ๋ฅผ ์ž…๋ ฅ์œผ๋กœ ๋ฐ›์Œ
public class Main {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
                .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " World"));

        future.thenAccept(System.out::println);
    }
}
  • thenCombine : ๋‘ ๊ฐœ์˜ CompletableFuture๊ฐ€ ๋ชจ๋‘ ์™„๋ฃŒ๋œ ํ›„์— ๋‘ ์ž‘์—…์˜ ๊ฒฐ๊ณผ๋ฅผ ์กฐํ•ฉํ•˜์—ฌ ์ƒˆ๋กœ์šด ๊ฒฐ๊ณผ ์ƒ์„ฑ
public class Main {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);

        combinedFuture.thenAccept(System.out::println);
    }
}
  • allOf : ์—ฌ๋Ÿฌ ๊ฐœ์˜ CompletableFuture๊ฐ€ ๋ชจ๋‘ ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ ธ๋‹ค๊ฐ€ ๋ชจ๋“  ์ž‘์—…์ด ์™„๋ฃŒ๋˜๋ฉด ์ƒˆ๋กœ์šด CompletableFuture ๋ฐ˜ํ™˜
public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);

        // ๋ชจ๋“  CompletableFuture๊ฐ€ ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฐ ํ›„์— ์‹คํ–‰
        allOfFuture.get();

        System.out.println("All futures completed");
    }
}
  • anyOf : ์—ฌ๋Ÿฌ ๊ฐœ์˜ CompletableFuture ์ค‘์—์„œ ํ•˜๋‚˜๋ผ๋„ ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฐ ํ›„์— ํ•ด๋‹น ์ž‘์—…์˜ ๊ฒฐ๊ณผ ๋ฐ˜ํ™˜
public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);

        // ๊ฐ€์žฅ ๋จผ์ € ์™„๋ฃŒ๋œ CompletableFuture์˜ ๊ฒฐ๊ณผ ์ถœ๋ ฅ
        System.out.println(anyOfFuture.get());
    }
}

 

 

 

CompletableFuture์˜ non-blocking๊ณผ blocking

CompletableFuture์˜ ์žฅ์ ์€ ๋‹ค์–‘ํ•œ task๋ฅผ blockingํ•˜์ง€ ์•Š๊ณ  ์กฐํ•ฉํ•  ์ˆ˜ ์žˆ๋‹ค.

์กฐํ•ฉ ์™ธ์—๋„ supplyAsync(), thenApply(), thenCompose() ๋“ฑ์„ ํ†ตํ•ด์„œ non-blocking์œผ๋กœ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค. 


๊ทธ๋ ‡๋‹ค๋ฉด blocking์€ ์–ธ์ œ ๋ฐœ์ƒํ•˜๋Š” ๊ฒƒ์ผ๊นŒ?

๊ฒฐ๋ก ๋ถ€ํ„ฐ ๋งํ•˜๋ฉด, 

  • get()
  • join()

๋“ฑ์˜ ๋ฉ”์„œ๋“œ๋ฅผ ์ˆ˜ํ–‰ํ•  ๋•Œ์ด๋‹ค. 

 

get(), join() ๋ฉ”์„œ๋“œ๋Š” CompletableFuture๊ฐ€ ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ํ˜ธ์ถœํ•œ ์Šค๋ ˆ๋“œ๊ฐ€ ๋ธ”๋กœํ‚น๋˜์–ด ์ž‘์—…์ด ์™„๋ฃŒ๋˜๊ธฐ๋ฅผ ๊ธฐ๋‹ค๋ฆฐ๋‹ค.

 

 

 

 

CompletableFuture - 3๋ถ„ ์†Œ์š”

์•„๋ž˜์™€ ๊ฐ™์€ ์ฝ”๋“œ๋ฅผ ๋Œ๋ ค๋ณด๋‹ˆ 3๋ถ„ ๊ฐ€๋Ÿ‰ ์†Œ์š”๋๋‹ค. 

@RequiredArgsConstructor
@Service
@Slf4j
public class ApiScheduler {

	private final AreaRepository areaRepository;
	private final CityDataRepository cityDataRepository;

	@Value("${seoul.open.api.url}")
	private StringBuilder url;

	@Transactional
	@Scheduled(cron = "* * * * * *")
	public void call() {
		List<Area> areas = areaRepository.findAll();
		List<CompletableFuture<CityData>> futures = areas.stream()
			.map(this::fetchCityDataAsync)
			.collect(Collectors.toList());
            
		List<CityData> cityDataList = futures.stream()
			.map(CompletableFuture::join)
			.collect(Collectors.toList());
            
		cityDataRepository.deleteAll();
		cityDataRepository.saveAll(cityDataList);
	}

	private CompletableFuture<CityData> fetchCityDataAsync(Area area) {
		return CompletableFuture.supplyAsync(() -> {
			try {
				log.info("areaId: {} areaName: {}", area.getAreaId(), area.getAreaName());
				String apiUrl = url + ("/" + urlEncoding(area.getAreaName()));
				Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(apiUrl);
				return parseCityData(document);
			} catch (SAXException | IOException | ParserConfigurationException e) {
				log.error("error fetching citydata for areaname {}", area.getAreaName(), e);
				return null;
			}
		});
	}

	private CityData parseCityData(Document document) {
		return CityData cityData = CityData.builder()
                .areaName(getElement(documentInfo, "AREA_NM"))
                .areaCongestionLevel(getElement(documentInfo, "AREA_CONGEST_LVL"))
                .areaCongestionMessage(getElement(documentInfo, "AREA_CONGEST_MSG"))
                .pplUpdateTime(getElement(documentInfo, "PPLTN_TIME"))
                .forecastPopulation(getElement(documentInfo, "FCST_PPLTN"))
                .forecastCongestionLevel(getElement(documentInfo, "FCST_CONGEST_LVL"))
                .temperature(getElement(documentInfo, "TEMP"))
                .maxTemperature(getElement(documentInfo, "MAX_TEMP"))
                .minTemperature(getElement(documentInfo, "MIN_TEMP"))
                .pm25Index(getElement(documentInfo, "PM25_INDEX"))
                .pm10Index(getElement(documentInfo, "PM10_INDEX"))
                .pcpMsg(getElement(documentInfo, "PCP_MSG"))
                .weatherTime(getElement(documentInfo,"WEATHER_TIME"))
                .culturalEventName(getElement(documentInfo, "EVENT_NM"))
                .culturalEventPeriod(getElement(documentInfo, "EVENT_PERIOD"))
                .culturalEventPlace(getElement(documentInfo, "EVENT_PLACE"))
                .culturalEventUrl(getElement(documentInfo, "URL"))
                .build();
	}

	private String getElementText(Document document, String tag) {
		// return document.getElementsByTagName(tag).item(0).getTextContent();
		// Document ๊ฐ์ฒด์—์„œ ํƒœ๊ทธ ์ด๋ฆ„์— ํ•ด๋‹นํ•˜๋Š” ์š”์†Œ ๊ฐ€์ ธ์˜ค๊ธฐ
		NodeList nodeList = document.getElementsByTagName(tag);

		// ๊ฐ€์ ธ์˜จ ์š”์†Œ๊ฐ€ ๋น„์–ด ์žˆ๋Š”์ง€ ํ™•์ธ
		if (nodeList.getLength() > 0) {
			// ์ฒซ ๋ฒˆ์งธ๋กœ ๋ฐœ๊ฒฌ๋œ ์š”์†Œ์˜ ํ…์ŠคํŠธ ๋‚ด์šฉ์„ ๋ฐ˜ํ™˜
			return nodeList.item(0).getTextContent();
		} else {
			// ํ•ด๋‹น ํƒœ๊ทธ๊ฐ€ ์—†์„ ๊ฒฝ์šฐ
			return "NO TAG";
		}
	}
}

 

 

 

 

CompletableFuture & ์—”ํ‹ฐํ‹ฐ ๋ถ„๋ฅ˜ํ•˜๊ธฐ - 60~80์ดˆ ์†Œ์š”

๋‚ ์”จ, ์ธ๊ตฌ, ๋ฌธํ™”ํ–‰์‚ฌ ์—”ํ‹ฐํ‹ฐ๋ฅผ ๊ฐ๊ฐ ๋ถ„๋ฅ˜ํ–ˆ๊ณ , ํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ๋“ค์„ ๊ฐ ์—”ํ‹ฐํ‹ฐ ํ•„๋“œ๋กœ ์ƒ์„ฑํ–ˆ๋‹ค. 

์‹คํ–‰์‹œ์ผœ๋ณด๋‹ˆ 60~80์ดˆ ์ •๋„ ์†Œ์š”๋˜์—ˆ๋‹ค !

 

 

๐Ÿ˜‡

์†”์งํžˆ 60~80์ดˆ ๊ฐ€๋Ÿ‰ ์†Œ์š”๋˜๋Š” ๊ฒƒ๋„ ๋งŽ์ด ๊ฑธ๋ฆฌ๋Š” ๊ฒƒ์ด๋ผ ์ƒ๊ฐ๋œ๋‹ค. 

7-8๋ถ„ ์†Œ์š”๋˜๋Š” ๊ฒƒ์„ ๊ทธ๋ž˜๋„ 1๋ถ„๋Œ€๋กœ ์ค„์˜€์œผ๋‹ˆ ๋ฟŒ๋“ฏํ•˜๋‹ค๋งŒ....

์ฝ”๋“œ๊ฐ€ ํ›จ์”ฌ ๊ธธ์–ด์กŒ๊ณ , ์ค‘๋ณต๋˜๋Š” ๋‚ด์šฉ๋„ ์žˆ์–ด ์ฝ”๋“œ ์ž์ฒด๋กœ๋Š” ์ฉ ๋งˆ์Œ์— ๋“ค์ง€ ์•Š๋Š”๋‹ค. 

WebClient๋กœ API๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ• ๋“ฑ ๋‹ค๋ฅธ ํ•ด๊ฒฐ์ฑ…์„ ์ถ”ํ›„์— ์ ์šฉ์‹œ์ผœ ์ข€๋” ์•Œ์•„๋ด์•ผ๊ฒ ๋‹ค..๐Ÿ˜‡

 

 

 

 

 

์ฐธ๊ณ ์ž๋ฃŒ

https://11st-tech.github.io/2024/01/04/completablefuture/#%EB%B3%91%EB%A0%AC%EC%B2%98%EB%A6%AC

https://mangkyu.tistory.com/263

https://www.baeldung.com/java-completablefuture

https://www.baeldung.com/java-completablefuture-non-blocking