HelloKoding

Practical coding guides

CompletableFuture supplyAsync Tutorial with Examples

Since Java 8, you can use CompletableFuture.supplyAsync to asynchronously run a task/method as the following example

@Test
public void supplyAsync() throws ExecutionException, InterruptedException {
    CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello future!");

    assertThat(completableFuture.get()).isEqualTo("Hello future!");
}

The CompletableFuture.supplyAsync run asynchronously a Supplier functional interface which represented by the lambda expression () -> "Hello future!"

The completableFuture.get() blocks until the completableFuture is complete and return the result

Apart from using get(), you can also use its result to continuously execute other methods in the callbacks chain

Let’s walk through this tutorial to see the examples in practice

Transform the result with thenApplyAsync callbacks

  • The thenApplyAsync takes and transforms the result returned from supplyAsync
@Test
public void thenApplyAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Future");

    completableFuture = completableFuture.thenApplyAsync((s) -> s.concat(" is awesome!"));

    assertThat(completableFuture.get()).isEqualTo("Future is awesome!");
}
  • APIs in CompletableFuture may come in 3 forms: default synchronous execution, default asynchronous execution (name with Async suffix) and asynchronous execution with custom Executor. The default async uses ForkJoinPool.commonPool() to execute. The following gives you an example of using custom Executor
@Test
public void thenApplyAsyncWithExecutor() throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(2);

    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Future", executorService);

    completableFuture = completableFuture.thenApplyAsync((s) -> s.concat(" is awesome!"), executorService);
    
    stop(executorService);

    assertThat(completableFuture.get()).isEqualTo("Future is awesome!");
}

Combine the results with thenComposeAsync and thenCombineAsync

  • The thenComposeAsync passes the result returned from one to another CompletableFuture
@Test
public void thenComposeAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<String> composedCompletableFuture = CompletableFuture
            .supplyAsync(() -> "Future")
            .thenComposeAsync(s -> CompletableFuture.supplyAsync(() -> s.concat(" is awesome!")));

    assertThat(composedCompletableFuture.get()).isEqualTo("Future is awesome!");
}
  • The thenCombineAsync combines the result of two independent CompletableFuture
@Test
public void thenCombineAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> "Future");
    CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> " is awesome!");

    CompletableFuture<String> combinedCompletableFuture = completableFuture1.thenCombineAsync(completableFuture2, (s1, s2) -> s1.concat(s2));

    assertThat(combinedCompletableFuture.get()).isEqualTo("Future is awesome!");
}

Await completion of independent CompletableFutures

  • The allOf returns a new CompletableFuture<Void> which completed when all of the given CompletableFuture completed
@Test
public void allOf() {
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "Future");
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> " is awesome!");
    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "!");
    CompletableFuture<String>[] cfs = new CompletableFuture[]{cf1, cf2, cf3};

    CompletableFuture<Void> allCf = CompletableFuture.allOf(cfs);
    allCf.join();

    String result = Arrays.stream(cfs).map(CompletableFuture::join).collect(Collectors.joining());
    assertThat(result).isEqualTo("Future is awesome!!");
}
  • The anyOf returns a new CompletableFuture<Object> which completed when any of the given CompletableFuture completed
@Test
public void anyOf() throws ExecutionException, InterruptedException {
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "Future");
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> " is awesome!");
    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "!");

    CompletableFuture<Object> anyCf = CompletableFuture.anyOf(cf1, cf2, cf3);
    System.out.println(anyCf.get());

    assertThat(anyCf).isDone();
}

Complete the stage with thenAcceptAsync and thenRunAsync

  • The thenAcceptAsync accepts the result returned from this stage, passes it to the supplied Consumer function and returns a new CompletionStage<Void>
@Test
public void thenAcceptAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Future");

    completableFuture = completableFuture.thenApplyAsync((s) -> s.concat(" is awesome!"));
    CompletableFuture<Void> procedureFuture = completableFuture.thenAcceptAsync(System.out::println);

    assertThat(procedureFuture.get()).isNull();
}
  • The thenRunAsync runs the supplied Runnable action when the stage completes normally and returns a new CompletionStage<Void>
@Test
public void thenRunAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Future");

    completableFuture = completableFuture.thenApplyAsync((s) -> s.concat(" is awesome!"));
    CompletableFuture<Void> procedureFuture = completableFuture.thenRunAsync(() -> System.out.println("!"));

    assertThat(procedureFuture.isDone()).isTrue();
    assertThat(procedureFuture.get()).isNull();
}

Conclusion

In this tutorial, we learned using CompletableFuture.supplyAsync to run asynchronously a method and attach thenApplyAsync, thenComposeAsync, thenCombineAsync, thenAcceptAsync, thenRunAsync to its callbacks chain. You can find the full source code as below

SupplyAsyncTest.java

package com.hellokoding.java.concurrent;

import org.junit.Test;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static com.hellokoding.java.concurrent.ConcurrentUtils.stop;
import static org.assertj.core.api.Assertions.assertThat;

public class SupplyAsyncTest {
    @Test
    public void supplyAsync() throws ExecutionException, InterruptedException {
        CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello, Future!");

        assertThat(completableFuture.get()).isEqualTo("Hello, Future!");
    }

    @Test
    public void thenApplyAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Future");

        completableFuture = completableFuture.thenApplyAsync((s) -> s.concat(" is awesome!"));

        assertThat(completableFuture.get()).isEqualTo("Future is awesome!");
    }

    @Test
    public void thenApplyAsyncWithExecutor() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Future", executorService);

        completableFuture = completableFuture.thenApplyAsync((s) -> s.concat(" is awesome!"), executorService);

        stop(executorService);

        assertThat(completableFuture.get()).isEqualTo("Future is awesome!");
    }

    @Test
    public void thenComposeAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<String> composedCompletableFuture = CompletableFuture
                .supplyAsync(() -> "Future")
                .thenComposeAsync(s -> CompletableFuture.supplyAsync(() -> s.concat(" is awesome!")));

        assertThat(composedCompletableFuture.get()).isEqualTo("Future is awesome!");
    }

    @Test
    public void thenCombineAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> "Future");
        CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> " is awesome!");

        CompletableFuture<String> combinedCompletableFuture = completableFuture1.thenCombineAsync(completableFuture2, (s1, s2) -> s1.concat(s2));

        assertThat(combinedCompletableFuture.get()).isEqualTo("Future is awesome!");
    }

    @Test
    public void allOf() {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "Future");
        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> " is awesome!");
        CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "!");
        CompletableFuture<String>[] cfs = new CompletableFuture[]{cf1, cf2, cf3};

        CompletableFuture<Void> allCf = CompletableFuture.allOf(cfs);
        allCf.join();

        String result = Arrays.stream(cfs).map(CompletableFuture::join).collect(Collectors.joining());
        assertThat(result).isEqualTo("Future is awesome!!");
    }

    @Test
    public void anyOf() throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "Future");
        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> " is awesome!");
        CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "!");

        CompletableFuture<Object> anyCf = CompletableFuture.anyOf(cf1, cf2, cf3);
        System.out.println(anyCf.get());

        assertThat(anyCf).isDone();
    }

    @Test
    public void thenAcceptAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Future");

        completableFuture = completableFuture.thenApplyAsync((s) -> s.concat(" is awesome!"));
        CompletableFuture<Void> procedureFuture = completableFuture.thenAcceptAsync(System.out::println);

        assertThat(procedureFuture.get()).isNull();
    }

    @Test
    public void thenRunAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Future");

        completableFuture = completableFuture.thenApplyAsync((s) -> s.concat(" is awesome!"));
        CompletableFuture<Void> procedureFuture = completableFuture.thenRunAsync(() -> System.out.println("!"));

        assertThat(procedureFuture.get()).isNull();
    }
}
Follow HelloKoding