java / / 2023. 11. 6. 15:25

Future와 CompletableFuture

1. Future

Future는 비동기 연산의 결과를 나타낸다. 연산작업이 완료되었는지, 대기하는지를 체크하는 메소드들이 있다. 응답 결과는 연산이 완료되었을 때 get 메소드를 사용하여 조회될 수 있고 필요하다면 블로킹을 할 수도 있다. 작업 취소는 cancel 메소드를 통해 수행될 수 있다. 작업이 정상적으로 완료되었는지? 취소되었는지 체크하는 메소드들도 제공된다. 연산 작업이 완료되면 취소는 될 수 없다.

아래에서 기본적인 Future의 동작 샘플 코드를 한번 보자.

1.1 async - 비동기 실행

1.2 isDone - 작업 완료상태

1.3 cancel - 작업을 취소

1.4 invokeAll - 모든 작업 완료

1.5 invokeAny - 하나의 작업 완료

1.1 async

@Test
public void future() {
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    executorService.execute(() -> log.info("작업 실행")); // 새로운 Thread로 실행
    log.info("future started");
    log.info("future ended");

    executorService.shutdown();
}
실행 결과
19:23:24.206 [main] INFO com.example.future.FutureSampleTest - future started
19:23:24.208 [main] INFO com.example.future.FutureSampleTest - future ended
19:23:24.206 [pool-1-thread-1] INFO com.example.future.FutureSampleTest - 작업 실행

executorService로 특정 작업을 수행한다. 이는 main 쓰레드가 아닌 다른 쓰레드(pool-1-thread-1)로 실행이 된다. ExecutorService에서 생성한 쓰레드이다. 그래서 main 쓰레드가 아니기 때문에 기본적으로 non-blocking으로 실행되는 것을 알 수 있다.

위의 코드는 두 개의 쓰레드로 실행되기 때문에 작업이 순차대로 수행되는 것을 보장하지 않는다. 즉 main 쓰레드 보다 pool-1-thread-1 쓰레드가 먼저 실행될 수 있다는 의미이다. (실행 결과에서는 main 쓰레드가 우선 실행됨)

1.2. isDone : 작업의 완료상태

@Test
public void isDone() throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    Callable<String> request = () -> { // 새로운 Thread로 실행
        Thread.sleep(1000L);
        log.info("작업 실행");
        return "홍길동";
    };

    Future<String> stringFuture = executorService.submit(request);
    log.info("future started");

    stringFuture.get(); // request 작업이 완료될 때까지 대기 (main Thread 대기)

    log.info("future isDone: {}", stringFuture.isDone()); // true
    log.info("future ended");

    executorService.shutdown();
}
실행결과
07:31:58.851 [main] INFO com.example.future.FutureSampleTest - future started
07:31:59.852 [pool-1-thread-1] INFO com.example.future.FutureSampleTest - 작업 실행
07:31:59.852 [main] INFO com.example.future.FutureSampleTest - future isDone: true
07:31:59.853 [main] INFO com.example.future.FutureSampleTest - future ended

executorService에서 응답값을 가져오기 위해 Callable을 사용해서 호출하였다. 이때 결과값은 Future로 리턴받는데 작업이 진행될 때 새로운 쓰레드(pool-1-thread-1)로 실행되는 것을 알 수 있다.

그리고 해당 작업에 1초 대기를 뒀는데 main 쓰레드가 대기하는 것을 알 수 있다.

stringFuture.isDone()은 Future 작업이 완료된 상태를 나타낸다. 완료되었으면 true, 그렇지 않으면 false를 반환한다.

1.3. cancel : 작업을 취소

@Test
public void cancel() throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    Callable<String> request = () -> {
        Thread.sleep(1000L);
        return "홍길동";
    };

    Future<String> stringFuture = executorService.submit(request);
    log.info("future started");
    log.info("future isDone: {}", stringFuture.isDone());

    boolean cancel = stringFuture.cancel(true);
    if (cancel) {
        log.info("작업이 취소");
    } else {
        log.info("작업이 취소 안됨");
    }

    log.info("future isDone: {}", stringFuture.isDone());
    log.info("future isCancelled: {}", stringFuture.isCancelled());

    stringFuture.get();

    log.info("future ended");

    executorService.shutdown();
}
실행결과
07:40:02.590 [main] INFO com.example.future.FutureSampleTest - future started
07:40:02.591 [main] INFO com.example.future.FutureSampleTest - future isDone: false
07:40:02.592 [main] INFO com.example.future.FutureSampleTest - 작업이 취소
07:40:02.592 [main] INFO com.example.future.FutureSampleTest - future isDone: true
07:40:02.592 [main] INFO com.example.future.FutureSampleTest - future isCancelled: true

java.util.concurrent.CancellationException

cancel은 작업을 취소하려고 시도한다. 작업이 이미 완료되었거나, 취소되었거나, 혹은 다른 이유로 인해 취소될 수 없다면 취소작업은 실패할 것이다. cancel이 호출될 때 작업이 시작되지 않았거나, 성공한다면 이 작업은 수행되지 않을 것이다. 작업이 이미 시작되었다면, mayInterruptIfRunning 파라미터가 작업을 중지하려는 시도에서 쓰레드가 인터럽트 될 지 결정한다.

메소드가 리턴된 이후 isDone() 호출은 항상 true가 된다. 그 다음 메소드 리턴값이 true이라면 isCancelled()는 항상 true이다.

1.4. invokeAll : 모든 작업 완료

@Test
public void invokeAll() throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(3);

    Callable<String> request1 = () -> {
        Thread.sleep(1000L);
        log.info("request1 실행");
        return "홍길동1";
    };

    Callable<String> request2 = () -> {
        Thread.sleep(2000L);
        log.info("request2 실행");
        return "홍길동2";
    };

    Callable<String> request3 = () -> {
        Thread.sleep(3000L);
        log.info("request3 실행");
        return "홍길동3";
    };

    log.info("future start");

    List<Future<String>> futures = executorService.invokeAll(Arrays.asList(request1, request2, request3));
    for (Future<String> future : futures) {
        log.info("future response : {}", future.get());
    }

    executorService.shutdown();
}
실행결과
07:58:58.004 [main] INFO com.example.future.FutureSampleTest - future start
07:58:59.006 [pool-1-thread-1] INFO com.example.future.FutureSampleTest - request1 실행
07:59:00.011 [pool-1-thread-2] INFO com.example.future.FutureSampleTest - request2 실행
07:59:01.006 [pool-1-thread-3] INFO com.example.future.FutureSampleTest - request3 실행
07:59:01.007 [main] INFO com.example.future.FutureSampleTest - future response : 홍길동1
07:59:01.009 [main] INFO com.example.future.FutureSampleTest - future response : 홍길동2
07:59:01.009 [main] INFO com.example.future.FutureSampleTest - future response : 홍길동3

invokeAll은 여러 작업 수행 시 모든 Future의 응답이 리턴될 때 완료된다.

위의 3개의 작업이 한 번에 실행된다. 각 작업이 실행시간이 1초, 2초, 3초로 되어있는데 각 작업이 순차대로 실행이 되고 모든 작업이 끝나면 동시에 응답이 온다. 그래서 응답까지 걸리는 시간은 총 3초이다. (세번째 작업이 완료되면 응답이 옴)

또한 실행 시 Executors.newFixedThreadPool(3)으로 3개의 작업을 수행할 수 있도록 3개의 쓰레드를 만들었다. 이럴 경우 3개의 작업이 동시에 수행이 되지만 Executors.newSingleThreadExecutor()으로 한 개의 쓰레드로 실행하면 총 6초가 걸린다. 작업이 하나의 쓰레드로 수행이 되니깐 순차대로 수행이 되기 때문이다.

1.5. invokeAny : 하나의 작업 완료

@Test
public void invokeAny() throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(3);

    Callable<String> request1 = () -> {
        Thread.sleep(1000L);
        log.info("request1 실행");
        return "홍길동1";
    };

    Callable<String> request2 = () -> {
        Thread.sleep(2000L);
        log.info("request2 실행");
        return "홍길동2";
    };

    Callable<String> request3 = () -> {
        Thread.sleep(3000L);
        log.info("request3 실행");
        return "홍길동3";
    };

    log.info("future start");

    String response = executorService.invokeAny(Arrays.asList(request1, request2, request3));
    log.info("future response : {}", response);

    executorService.shutdown();
}
실행결과
07:50:59.177 [main] INFO com.example.future.FutureSampleTest - future start
07:51:00.183 [pool-1-thread-1] INFO com.example.future.FutureSampleTest - request1 실행
07:51:00.185 [main] INFO com.example.future.FutureSampleTest - future response : 홍길동1

invokeAny는 여러 작업 수행 시 하나의 결과라도 먼저 리턴되면 완료된다.

3개의 작업 중 가장 빠르게 실행되는 작업(request1)이 완료되면 작업은 완료된다. 여기서는 request1이 1초가 걸리므로 request1의 응답이 오면 작업이 종료된다.




2. CompletableFuture

CompletableFutureFuture 인터페이스에 CompletionStage 인터페이스를 구현한다. 이 인터페이스는 다른 작업과 조합할 수 있는 비동기 연산을 정의한다.

2.1 async

2.2 runAsync

2.3 supplyAsync

2.4 ExecutorService 사용

2.5 thenApply

2.6 thenAccept

2.7 thenRun

2.8 callback ExecutorService

2.9 thenCompose

2.10 thenCombine

2.11 allOf

2.12 anyOf

2.13 exceptionally

2.14 handleException

2.1 기본 실행

@Test
public void async() throws ExecutionException, InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    log.info("future start");
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(1000);
        completableFuture.complete("홍길동");
        log.info("future complete");
        return null;
    });

    String response = completableFuture.get();
    log.info("future response: {}", response);
}
실행결과
15:09:32.599 [main] INFO com.example.future.CompletableFutureSampleTest - future start
15:09:33.605 [pool-1-thread-1] INFO com.example.future.CompletableFutureSampleTest - future complete
15:09:33.605 [main] INFO com.example.future.CompletableFutureSampleTest - future response: 홍길동

2.2 runAsync

@Test
public void runAsync() throws ExecutionException, InterruptedException {
    log.info("future start");
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("작업 실행");
    });

    future.get();
}
실행결과
15:10:22.078 [main] INFO com.example.future.CompletableFutureSampleTest - future start
15:10:23.085 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - 작업 실행

runAsync는 비동기로 작업을 실행하고 응답값이 없다. 실행결과를 보면 다른 쓰레드(ForkJoinPool.commonPool-worker-9)로 작업이 실행된 것을 확인할 수 있다.

2.3 supplyAsync

@Test
public void supplyAsync() throws ExecutionException, InterruptedException {
    log.info("future start");
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("작업 실행");
        return "홍길동";
    });
    String response = future.get();
    log.info("future response: {}", response);
}
실행결과
15:11:01.024 [main] INFO com.example.future.CompletableFutureSampleTest - future start
15:11:02.032 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - 작업 실행
15:11:02.033 [main] INFO com.example.future.CompletableFutureSampleTest - future response: 홍길동

supplyAsync는 비동기로 작업을 실행하고 응답값을 받는다. 실행결과를 보면 다른 쓰레드(ForkJoinPool.commonPool-worker-9)로 작업이 실행된 것을 확인할 수 있다.

2.4 ExecutorService 사용

@Test
public void executorServiceSupplyAsync() throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(1);

    log.info("future start");
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("작업 실행");
        return "홍길동";
    }, executorService).thenRun(() -> {
        log.info("future action");
    });

    future.get();
    executorService.shutdown();
}
실행결과
20:08:20.109 [main] INFO com.example.future.CompletableFutureSampleTest - future start
20:08:21.155 [pool-1-thread-1] INFO com.example.future.CompletableFutureSampleTest - 작업 실행
20:08:21.155 [pool-1-thread-1] INFO com.example.future.CompletableFutureSampleTest - future action

ExecutorService를 사용하여 작업을 실행할 수 있다. 기본은 ForkJoinPool.commonPool()으로 실행이 된다.

2.5 thenApply

@Test
public void thenApply() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        log.info("작업 실행");
        return "Gil Dong Hong";
    }).thenApply(fn -> {
        log.info("future fn: {}", fn);
        return fn.toUpperCase();
    });
    log.info("future response: {}", future.get());
}
실행결과
15:12:25.301 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - 작업 실행
15:12:25.302 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - future fn: Gil Dong Hong
15:12:25.302 [main] INFO com.example.future.CompletableFutureSampleTest - future response: GIL DONG HONG

thenApply는 리턴값을 받아 특정 타입으로 변경을 할 수 있다. 여기서는 Gil Dong Hong을 응답값으로 받아서 대문자로 변환했다.

2.6 thenAccept

@Test
public void thenAccept() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        log.info("작업 실행");
        return "홍길동";
    }).thenAccept(fn -> log.info("future fn: {}", fn.toUpperCase()));

    future.get();
}
실행결과
15:12:50.455 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - 작업 실행
15:12:50.456 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - future fn: 홍길동

thenAccept는 값을 받아서 응답값을 리턴하지 않고 특정 작업을 처리한다.

2.7 thenRun

@Test
public void thenRun() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        log.info("작업 실행");
        return "홍길동";
    }).thenRun(() -> log.info("future action"));

    future.get();
}
실행결과
15:13:02.792 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - 작업 실행
15:13:02.793 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - future action

thenRun은 응답값으로 다른 Runnable을 실행한다.

2.8 callback ExecutorService

@Test
public void callbackExecutorService() throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(1);

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        log.info("작업 실행");
        return "홍길동";
    }, executorService).thenRun(() -> log.info("future action"));

    future.get();
    executorService.shutdown();
}
실행결과
15:13:17.988 [pool-1-thread-1] INFO com.example.future.CompletableFutureSampleTest - 작업 실행
15:13:17.989 [pool-1-thread-1] INFO com.example.future.CompletableFutureSampleTest - future action

supplyAsync에서 ExecutorService를 지정하여 콜백을 다른 쓰레드로 실행하게 할 수 있다.

2.9 thenCompose

@Test
public void thenComposeCallAnotherFuture() throws ExecutionException, InterruptedException {
    CompletableFuture<String> request = CompletableFuture.supplyAsync(() -> {
        log.info("작업1 실행");
        return "홍길동";
    });

    CompletableFuture<String> future = request.thenCompose(CompletableFutureSampleTest::call);
    log.info("future response: {}", future.get());
}

private static CompletableFuture<String> call(String message) {
    return CompletableFuture.supplyAsync(() -> {
        log.info("작업2 실행");
        return "작업1 응답: " + message + ", 작업2 응답";
    });
}
실행결과
15:13:38.356 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - 작업1 실행
15:13:38.358 [ForkJoinPool.commonPool-worker-2] INFO com.example.future.CompletableFutureSampleTest - 작업2 실행
15:13:38.358 [main] INFO com.example.future.CompletableFutureSampleTest - future response: 작업1 응답: 홍길동, 작업2 응답

thenCompose는 두 작업을 이어서 실행할 수 있도록 한다.

2.10 thenCombine

@Test
public void thenCombine() throws ExecutionException, InterruptedException {
    CompletableFuture<String> request = CompletableFuture.supplyAsync(() -> {
        log.info("request 작업 실행");
        return "홍길동";
    });

    CompletableFuture<String> request2 = CompletableFuture.supplyAsync(() -> {
        log.info("request2 작업 실행");
        return "이승엽";
    });

    CompletableFuture<String> future = request.thenCombine(request2, (h, w) -> h + " " + w);
    String response = future.get();
    log.info("future response: " + response);
}
실행결과
15:13:52.066 [ForkJoinPool.commonPool-worker-2] INFO com.example.future.CompletableFutureSampleTest - request2 작업 실행
15:13:52.066 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - request 작업 실행
15:13:52.069 [main] INFO com.example.future.CompletableFutureSampleTest - future response: [홍길동, 이승엽]

thenCombine는 두 작업을 독립적으로 실행하고 모두 응답이 완료되었을 때 콜백이 호출된다.

2.11 allOf

@Test
public void allOf() throws ExecutionException, InterruptedException {
    CompletableFuture<String> request = CompletableFuture.supplyAsync(() -> {
        log.info("request1 작업 실행");
        return "홍길동";
    });

    CompletableFuture<String> request2 = CompletableFuture.supplyAsync(() -> {
        log.info("request2 작업 실행");
        return "이승엽";
    });

    List<CompletableFuture> futures = Arrays.asList(request, request2);

    CompletableFuture<Object> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
            .thenApply(v -> {
                return futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList());
            });

    log.info("future response: " + future.get());
}
실행결과
15:15:37.190 [ForkJoinPool.commonPool-worker-2] INFO com.example.future.CompletableFutureSampleTest - request2 작업 실행
15:15:37.190 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - request1 작업 실행
15:15:37.192 [main] INFO com.example.future.CompletableFutureSampleTest - future response: [홍길동, 이승엽]

allOf는 여러 작업을 모두 실행하고 모든 작업 결과에 콜백을 실행한다.

2.12 anyOf

@Test
public void anyOf() throws ExecutionException, InterruptedException {
    CompletableFuture<String> request1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("request1 작업 실행");
        return "홍길동";
    });

    CompletableFuture<String> request2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("request2 작업 실행");
        return "이승엽";
    });

    CompletableFuture<Void> future = CompletableFuture.anyOf(request1, request2)
            .thenAccept(action -> log.info(action+""));
    future.get();
}
실행결과
15:15:52.581 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - request1 작업 실행
15:15:52.583 [ForkJoinPool.commonPool-worker-1] INFO com.example.future.CompletableFutureSampleTest - 홍길동

anyOf는 여러 개의 작업 중에 가장 빨리 끝난 작업의 응답을 콜백으로 실행한다.

2.13 exceptionally

@Test
public void exceptionally() throws ExecutionException, InterruptedException {
    boolean throwError = true;

    CompletableFuture<String> request = CompletableFuture.supplyAsync(() -> {
        if (throwError) {
            throw new IllegalArgumentException();
        }

        log.info("작업 실행");
        return "홍길동";
    }).exceptionally(ex -> "Error!");

    log.info("future response: {}", request.get());
}
실행결과
15:16:33.432 [main] INFO com.example.future.CompletableFutureSampleTest - future response: Error!

Future 작업에 예외가 발생하면 예외 결과를 리턴한다.

2.14 handleException

@Test
public void handleException() throws ExecutionException, InterruptedException {
    boolean throwError = true;

    CompletableFuture<String> request = CompletableFuture.supplyAsync(() -> {
        if (throwError) {
            throw new IllegalArgumentException();
        }

        log.info("작업 실행");
        return "홍길동";
    }).handle((result, ex) -> {
        if (ex != null) {
            log.info(ex.getMessage());
            return "ERROR!";
        }
        return result;
    });

    log.info("future response: {}", request.get());
}
실행결과
15:17:52.490 [main] INFO com.example.future.CompletableFutureSampleTest - java.lang.IllegalArgumentException
15:17:52.491 [main] INFO com.example.future.CompletableFutureSampleTest - future response: ERROR!

예외를 직접 처리한다.

참고

https://www.baeldung.com/java-future

https://young-bin.tistory.com/66

반응형
  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유