[Java] CompletableFuture ๋น๋๊ธฐ ์์
xml ํ์์ ์์ธ ์ค์๊ฐ ๋์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์์ ํ์ฑํ๊ณ DB์ ์ ์ฅํ๋ ๋ฐ๊น์ง 442์ด(7๋ถ 22์ด) ์์๋๋ค.
ํ ์ง์ญ์ ๋ฐ์ดํฐ๋ฅผ ํ์ฑํ๋ ๋ฐ 3~10์ด ๊ฐ๋ ์์๋๊ณ , ์ด 115๊ณณ์ ๊ฐ์ ธ์์ผํ๋ฏ๋ก ์ฝ 7-8๋ถ ์์๋๋ ๊ฒ์ด์๋ค.
์๋ ๊ฐ์ ธ์์ผํ ๋ฐ์ดํฐ๊ฐ ๋ง์ ์ ์ด์ ์ฃผ์์ฐฝ์ xml์ด ์กด์ฌํ๋ url์ ์์ฒญํ์ ๋ 3~5์ด ์ ๋ ์ง์ฐ๋จ์ด ๋ฐ์ํ๊ธด ํ๋ค.
๊ฝค ์ค๋ ์๊ฐ์ด ๊ฑธ๋ ค ์ฑ๋ฅ์ ๊ฐ์ ์ํค๊ธฐ ์ํด ์ฌ๋ฌ ๋ฐฉ์๋ค์ ์๋ํด๋ดค๋ค...
CompletableFuture ๋?
Java 8์์ ์ถ๊ฐ๋ ํด๋์ค๋ก ๋น๋๊ธฐ ์ฐ์ฐ์ ์ฒ๋ฆฌํ๊ณ ์๋ฃ๋ ๊ฒฐ๊ณผ๋ ์์ธ๋ฅผ ์ฒ๋ฆฌํ๋ค.
๊ธฐ์กด์ ์๋ Java5์ Future ์ธํฐํ์ด์ค์ ํ๊ณ์ ์ ๊ฐ์ ์ํจ ๊ฒ์ด๋ค.
๐น Future๋ ?
๐ ๋น๋๊ธฐ ์์ ์ ๊ฒฐ๊ณผ๋ฅผ ๋ํ๋ด๋ ๋ฐ ์ฌ์ฉ๋๋ฉฐ ์๋ฃ ์ํ๋ฅผ ํ์ธํ๊ฑฐ๋ ์์ ๊ฒฐ๊ณผ๋ฅผ ์ป๋ ๊ธฐ๋ฅ๋ง ์ ๊ณต
๐น Future์ ํ๊ณ์
- ์ธ๋ถ์์ ์๋ฃ์ํฌ ์ ์์ โ get()์ ํ์์์ ์ค์ ์ผ๋ก๋ง ์๋ฃ ๊ฐ๋ฅ
- ๋ธ๋กํน ๋ฉ์๋ ํธ์ถ : get() ๋ฉ์๋ ํธ์ถ ์ ์์ ์ด ์๋ฃ๋ ๋๊น์ง ๋๊ธฐ
- ์์ ์กฐํฉ์ ์ด๋ ค์
- ์์ธ ์ฒ๋ฆฌ ํ๊ณ
Future ์ธํฐํ์ด์ค๋ฅผ ํ์ฅํ์ฌ ์ธ๋ถ์์ ์๋ฃ์ํฌ ์ ์์ผ๋ฉฐ ์์ ์ฝ๋ฐฑ, ์์ธ ์ฒ๋ฆฌ, ๋น๋๊ธฐ์ ์ผ๋ก ์ฌ๋ฌ ์์ ์ ์กฐํฉํ๋ ๊ธฐ๋ฅ ๋ฑ์ ์ ๊ณตํ์ฌ ๋ณด๋ค ํจ์จ์ ์ธ ๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐ์ ์ง์ํ๋ค.
๐น ๋น๋๊ธฐ ์์ ์คํ ๋ฉ์๋
- runAsync : ๋ฐํ๊ฐ์ด ์๋ ๊ฒฝ์ฐ
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// ๋น๋๊ธฐ ์์
์ํ
});
- supplyAsync : ๋ฐํ๊ฐ์ด ์๋ ๊ฒฝ์ฐ
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// ๋น๋๊ธฐ ์์
์ํ ํ ๊ฒฐ๊ณผ ๋ฐํ
return "์์
๊ฒฐ๊ณผ";
});
๐น ์์ ์ฝ๋ฐฑ
- thenApply : ๋น๋๊ธฐ ์์ ์ด ์๋ฃ๋ ํ์ ํด๋น ๊ฒฐ๊ณผ๋ฅผ ๊ฐ๊ณตํ๊ฑฐ๋ ๋ณํ
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(result -> result + " World");
- thenAccept : ๋น๋๊ธฐ ์์ ์ด ์๋ฃ๋ ํ์ ํด๋น ๊ฒฐ๊ณผ๋ฅผ ์๋น
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(result -> System.out.println("Result: " + result));
- thenRun : ๋น๋๊ธฐ ์์ ์ด ์๋ฃ๋ ํ์ ์ถ๊ฐ์ ์ธ ์์ ์ํ
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("Additional task after completion"));
๐น ์์ ์กฐํฉ
- thenCompose : ์ด์ ์์ ์ ๊ฒฐ๊ณผ์ ๋ฐ๋ผ ๋ค์ ์์ ์ํํ๋๋ฐ, ๋ค์ ์์ ์ ์ด์ ์์ ์ ๊ฒฐ๊ณผ๋ฅผ ์ ๋ ฅ์ผ๋ก ๋ฐ์
public class Main {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " World"));
future.thenAccept(System.out::println);
}
}
- thenCombine : ๋ ๊ฐ์ CompletableFuture๊ฐ ๋ชจ๋ ์๋ฃ๋ ํ์ ๋ ์์ ์ ๊ฒฐ๊ณผ๋ฅผ ์กฐํฉํ์ฌ ์๋ก์ด ๊ฒฐ๊ณผ ์์ฑ
public class Main {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);
combinedFuture.thenAccept(System.out::println);
}
}
- allOf : ์ฌ๋ฌ ๊ฐ์ CompletableFuture๊ฐ ๋ชจ๋ ์๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ ธ๋ค๊ฐ ๋ชจ๋ ์์ ์ด ์๋ฃ๋๋ฉด ์๋ก์ด CompletableFuture ๋ฐํ
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);
// ๋ชจ๋ CompletableFuture๊ฐ ์๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ ํ์ ์คํ
allOfFuture.get();
System.out.println("All futures completed");
}
}
- anyOf : ์ฌ๋ฌ ๊ฐ์ CompletableFuture ์ค์์ ํ๋๋ผ๋ ์๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ ํ์ ํด๋น ์์ ์ ๊ฒฐ๊ณผ ๋ฐํ
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);
// ๊ฐ์ฅ ๋จผ์ ์๋ฃ๋ CompletableFuture์ ๊ฒฐ๊ณผ ์ถ๋ ฅ
System.out.println(anyOfFuture.get());
}
}
CompletableFuture์ non-blocking๊ณผ blocking
CompletableFuture์ ์ฅ์ ์ ๋ค์ํ task๋ฅผ blockingํ์ง ์๊ณ ์กฐํฉํ ์ ์๋ค.
์กฐํฉ ์ธ์๋ supplyAsync(), thenApply(), thenCompose() ๋ฑ์ ํตํด์ non-blocking์ผ๋ก ์ํํ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
๊ทธ๋ ๋ค๋ฉด blocking์ ์ธ์ ๋ฐ์ํ๋ ๊ฒ์ผ๊น?
๊ฒฐ๋ก ๋ถํฐ ๋งํ๋ฉด,
- get()
- join()
๋ฑ์ ๋ฉ์๋๋ฅผ ์ํํ ๋์ด๋ค.
get(), join() ๋ฉ์๋๋ CompletableFuture๊ฐ ์๋ฃ๋ ๋๊น์ง ํธ์ถํ ์ค๋ ๋๊ฐ ๋ธ๋กํน๋์ด ์์ ์ด ์๋ฃ๋๊ธฐ๋ฅผ ๊ธฐ๋ค๋ฆฐ๋ค.
CompletableFuture - 3๋ถ ์์
์๋์ ๊ฐ์ ์ฝ๋๋ฅผ ๋๋ ค๋ณด๋ 3๋ถ ๊ฐ๋ ์์๋๋ค.
@RequiredArgsConstructor
@Service
@Slf4j
public class ApiScheduler {
private final AreaRepository areaRepository;
private final CityDataRepository cityDataRepository;
@Value("${seoul.open.api.url}")
private StringBuilder url;
@Transactional
@Scheduled(cron = "* * * * * *")
public void call() {
List<Area> areas = areaRepository.findAll();
List<CompletableFuture<CityData>> futures = areas.stream()
.map(this::fetchCityDataAsync)
.collect(Collectors.toList());
List<CityData> cityDataList = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
cityDataRepository.deleteAll();
cityDataRepository.saveAll(cityDataList);
}
private CompletableFuture<CityData> fetchCityDataAsync(Area area) {
return CompletableFuture.supplyAsync(() -> {
try {
log.info("areaId: {} areaName: {}", area.getAreaId(), area.getAreaName());
String apiUrl = url + ("/" + urlEncoding(area.getAreaName()));
Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(apiUrl);
return parseCityData(document);
} catch (SAXException | IOException | ParserConfigurationException e) {
log.error("error fetching citydata for areaname {}", area.getAreaName(), e);
return null;
}
});
}
private CityData parseCityData(Document document) {
return CityData cityData = CityData.builder()
.areaName(getElement(documentInfo, "AREA_NM"))
.areaCongestionLevel(getElement(documentInfo, "AREA_CONGEST_LVL"))
.areaCongestionMessage(getElement(documentInfo, "AREA_CONGEST_MSG"))
.pplUpdateTime(getElement(documentInfo, "PPLTN_TIME"))
.forecastPopulation(getElement(documentInfo, "FCST_PPLTN"))
.forecastCongestionLevel(getElement(documentInfo, "FCST_CONGEST_LVL"))
.temperature(getElement(documentInfo, "TEMP"))
.maxTemperature(getElement(documentInfo, "MAX_TEMP"))
.minTemperature(getElement(documentInfo, "MIN_TEMP"))
.pm25Index(getElement(documentInfo, "PM25_INDEX"))
.pm10Index(getElement(documentInfo, "PM10_INDEX"))
.pcpMsg(getElement(documentInfo, "PCP_MSG"))
.weatherTime(getElement(documentInfo,"WEATHER_TIME"))
.culturalEventName(getElement(documentInfo, "EVENT_NM"))
.culturalEventPeriod(getElement(documentInfo, "EVENT_PERIOD"))
.culturalEventPlace(getElement(documentInfo, "EVENT_PLACE"))
.culturalEventUrl(getElement(documentInfo, "URL"))
.build();
}
private String getElementText(Document document, String tag) {
// return document.getElementsByTagName(tag).item(0).getTextContent();
// Document ๊ฐ์ฒด์์ ํ๊ทธ ์ด๋ฆ์ ํด๋นํ๋ ์์ ๊ฐ์ ธ์ค๊ธฐ
NodeList nodeList = document.getElementsByTagName(tag);
// ๊ฐ์ ธ์จ ์์๊ฐ ๋น์ด ์๋์ง ํ์ธ
if (nodeList.getLength() > 0) {
// ์ฒซ ๋ฒ์งธ๋ก ๋ฐ๊ฒฌ๋ ์์์ ํ
์คํธ ๋ด์ฉ์ ๋ฐํ
return nodeList.item(0).getTextContent();
} else {
// ํด๋น ํ๊ทธ๊ฐ ์์ ๊ฒฝ์ฐ
return "NO TAG";
}
}
}
CompletableFuture & ์ํฐํฐ ๋ถ๋ฅํ๊ธฐ - 60~80์ด ์์
๋ ์จ, ์ธ๊ตฌ, ๋ฌธํํ์ฌ ์ํฐํฐ๋ฅผ ๊ฐ๊ฐ ๋ถ๋ฅํ๊ณ , ํ์ํ ๋ฐ์ดํฐ๋ค์ ๊ฐ ์ํฐํฐ ํ๋๋ก ์์ฑํ๋ค.
์คํ์์ผ๋ณด๋ 60~80์ด ์ ๋ ์์๋์๋ค !
๐
์์งํ 60~80์ด ๊ฐ๋ ์์๋๋ ๊ฒ๋ ๋ง์ด ๊ฑธ๋ฆฌ๋ ๊ฒ์ด๋ผ ์๊ฐ๋๋ค.
7-8๋ถ ์์๋๋ ๊ฒ์ ๊ทธ๋๋ 1๋ถ๋๋ก ์ค์์ผ๋ ๋ฟ๋ฏํ๋ค๋ง....
์ฝ๋๊ฐ ํจ์ฌ ๊ธธ์ด์ก๊ณ , ์ค๋ณต๋๋ ๋ด์ฉ๋ ์์ด ์ฝ๋ ์์ฒด๋ก๋ ์ฉ ๋ง์์ ๋ค์ง ์๋๋ค.
WebClient๋ก API๋ฅผ ํธ์ถํ์ฌ ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ ๋ฑ ๋ค๋ฅธ ํด๊ฒฐ์ฑ
์ ์ถํ์ ์ ์ฉ์์ผ ์ข๋ ์์๋ด์ผ๊ฒ ๋ค..๐
์ฐธ๊ณ ์๋ฃ
https://11st-tech.github.io/2024/01/04/completablefuture/#%EB%B3%91%EB%A0%AC%EC%B2%98%EB%A6%AC
https://mangkyu.tistory.com/263
https://www.baeldung.com/java-completablefuture
https://www.baeldung.com/java-completablefuture-non-blocking