Java Concurrency - Multi Threading with ExecutorService

In this post I'll look how the ExeutorService can be used to perform multi threaded asynchronous tasks. I'll begin by looking at the traditional approach of creating threads directly and then move on to examine the ExecutorService and how it can be used to simplify things.

Instantiating Threads Directly

Prior to the Executor API developers were responsible for instantiating and managing threads directly. Lets look at a simple example below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
   /**
    * Call 2 expensive methods on separate threads 
    *    
    * @throws InterruptedException 
    */
    public void doMultiThreadedWork() throws InterruptedException {
  
        /* create Runnable using anonymous inner class */
        Thread t1 = new Thread(new Runnable() {   
            public void run() {
                System.out.println("starting expensive task thread t1");
                doSomethingExpensive();    
                System.out.println("finished expensive task thread t1");
            }
        });
  
        /* start processing on new threads */
        t1.start();            
  
        /* block current thread until t1 has finished */
        t1.join();
    }
Simple Thread example

We begin by creating a new Thread object t1 and passing a Runnable to its constructor. Runnable is an interface with a single abstract method public void run(). To create a Thread we need to supply an implementation of the Runnable interface and in this instance we do that with an anonymous inner class. The run method contains logic that will be executed by the Thread when it is started. Note that if the code inside run throws a checked Exception it must be caught and handled inside the run method.

On line 18 the Thread is started by calling its start method. The JVM spawns a new process and executes the run() method in the context of the newly created Thread.  Note: you should be careful not to call the run() method directly as this will cause the method to execute in the context of the current thread and will result in single threaded behaviour.

On line 21 we call the join method to block the main thread execution until Thread t1 has terminated. This is only necessary if you want the main thread to wait for the spawned thread to terminate. Often this is not necessary, but for sake of our sample code we want to allow t1 to complete before continuing.

Introducing the Executor Service

Dealing with threads directly is all well and good but Oracle have made things a little easier by providing a layer of abstraction via its Executor API.  An Executor allows you to process tasks asynchronously and in parallel without having to deal with threads directly.

Creating an Executor

The Executors factory class is used to create an instance of an Executor, either an ExecutorService or a ScheduledExecutorService. Some of the most common types of Executors are described below.
  • Executors.newCachedThreadPool() - An ExecutorService with a thread pool that creates threads as required but reuses previously created threads as they become available.
  • Executors.newFixedThreadPool(int numThreads) - An ExecutorService that has a thread pool with a fixed number of threads. This is the maximum number of threads that can be active in the ExecutorService at any one time. If the number of requests submitted to the pool exceeds the pool size, requests are queued until a thread becomes available. 
  • Executors.newScheduledThreadPool(int numThreads) - A ScheduledExecutorService with a thread pool that is used to run tasks periodically or after a specified delay.
  • Executors.newSingleThreadExecutor() - An ExecutorService that uses a single thread. Tasks submitted to this ExecutorService will be executed one at a time and in the order submitted. 
  • Executors.newSingleThreadScheduledExecutor() - An ExecutorService that uses a single thread to execute tasks periodically or after a specified delay. 

Below is an example of creating a simple fixed thread pool ExecutorService with a pool size of 2. I'll use this ExecutorService in the following sections. 

1
    ExecutorService executorService = Executors.newFixedThreadPool(2);
Creating an ExecutorService using Executor factory method

Using an Executor

In the following sections I'm going to look at the various methods available on the ExecutorService for executing tasks asynchronously.

execute(Runnable)

The execute method takes a Runnable and is very similar to the simple Thread example we looked at earlier. The execute method is useful when you want to run a task and are not concern about checking its status or obtaining a result. Think of it as a means of invoking a fire and forget asynchronous task.

1
2
3
4
5
    executorService.execute(()->{
        System.out.println(String.format("starting expensive task thread %s", 
        Thread.currentThread().getName()));
        doSomethingExpensive();    
    }
Execute Runnable with ExecutorService

The example above creates a Runnable as a lambda expression and passes it to the execute method. The Runnable will be executed as soon as a thread is available from the ExecutorService threadpool.

Future<?> submit(Runnable)

The submit method also takes a Runnable but differs from the execute method in that it returns a Future. A Future is an object that represents a pending response form an asynchronous task. Think of it as a handle that can be used to check the status of the task or retrieve its result when the task completes. Futures use generics to allow the user to specify the return type of the task. However, given that a Runnables run method does not return a value (return type void), the Future holds the status of the task rather than a pending result. This is represented as Future<?> as shown in the example below.

1
2
3
4
5
    Future<?> taskStatus = executorService.execute(()->{
          System.out.println(String.format("starting expensive task thread %s", 
                                            Thread.currentThread().getName()));
          doSomethingExpensive();    
    }
                                                     Submit Runnable using lambda expression

The submit(Runnable) method is useful when you want to run a task that doesn't return a value but you'd like to check the status of the task after its been submitted to the ExecutorService.

Checking Task Status

Future provide a few methods that allow you to check the status of a task that's been submitted to the ExecutorService. The isCancelled method checks if a submitted task has already been cancelled. The isDone method allows you to check if the submitted task has completed. isDone will return true regardless of whether a task completed successfully, unsuccessfully or was cancelled. Finally, the cancel method can be used to cancel a submitted task. A boolean parameter indicates whether the task should be interrupted after its already been started.


1
2
3
4
5
6
7
    /* check if both tasks have completed - if not sleep current thread 
     * for 1 second and check again
     */
    while(!task1Future.isDone() || !task2Future.isDone()){
        System.out.println("Task 1 and Task 2 are not yet complete....sleeping");
 Thread.sleep(1000);
    }
Future isDone method used to check status of task 

Future<T> submit(Callable)

The submit method is overloaded to take a Callable as well as a Runnable. A Callable is similar to Runnable in that it represents a task that can be executed on another thread, but differs in that it returns a value and can throw a checked Exception. The Callable interface has the single abstract method public T call() throws Exception and like Runnable can be implemented with an anonymous inner class or functional interface. The return type of the call method is used to type the Future returned by the ExecutorService. There are two examples below, one submitting a Callable using an anonymous inner class and the other using a lambda expression.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    Future<Double> task1Future = executorService.submit(new Callable<Double>() {
  
        public Double call() throws Exception {     
   
            System.out.println(String.format("starting expensive task thread %s", 
                                              Thread.currentThread().getName()));
            Double returnedValue = someExpensiveRemoteCall();    
   
            return returnedValue;
        }           
    });
Submit Callable using anonymous inner class 

1
2
3
4
5
6
7
8
    Future<Double> task2Future = executorService.submit(()->{
  
        System.out.println(String.format("starting expensive task thread %s", 
                                           Thread.currentThread().getName()));
        Double returnedValue = someExpensiveRemoteCall();    
  
        return returnedValue;
    });
Submit Callable using lambda

The example above creates a Callable as a lambda and passes it to the execute method. The Callable will be executed as soon as a thread is available.

Retrieving a Result  from a Future

When we submit a Callable to the ExecutorService we receive a Future with the return type of the call method. In the example above the call method returns a Double so the Future returned is Future<Double>. One way of retrieving the result from a Future is by calling its get method. This method will block indefinitely waiting on the submitted task to complete. If the task doesn't complete or takes a long time to complete, the main application thread will remain blocked.

Waiting indefinitely for a result is often not ideal. We'd rather have more control over how we retrieve the result and take some action if a task does not complete within a acceptable period of time. Luckily there is an overloaded version of the get method that takes a time value and time unit. This method waits for the specified period of time and if the task is not complete and a result not available, throws a TimeoutException.

1
2
    Double value1 = task1Future.get();
    Double value2 = task2Future.get(4,  TimeUnit.SECONDS); // throws TimeoutException
                                                                                     Get result from Callable


Submit Multiple Callables

As well as supporting the submission of a single Callable, the ExecutorService allows you to submit a Collection of Callables using the invokeAll method. As you might expect, instead of receiving a single Future response, a Collection of Futures is returned. A Future is returned representing the pending result of each submitted task.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    Collection<Callable<Double>> callables = new ArrayList<>();
    IntStream.rangeClosed(1, 8).forEach(i-> {
        callables.add(createCallable());
    });
   
    /* invoke all supplied Callables */ 
    List<Future<Double>> taskFutureList = executorService.invokeAll(callables);
   
    /* call get on Futures to retrieve result when it becomes available.
     * If specified period elapses before result is returned a TimeoutException
     * is thrown
     */
    for (Future<Double> future : taskFutureList) {
    
        /* get Double result from Future when it becomes available */
        Double value = future.get(4, TimeUnit.SECONDS);
        System.out.println(String.format("TaskFuture returned value %s", value)); 
    }
Invoking a Collection of Callables

The sample code above submits 8 Callables to the ExecutorService and retrieves a List containing 8 Futures. The Futures returned are in the same order as the Callables were submitted to the ExecutorService. Note that submitting a Collection of Callables will require that the size of the thread pool is tweaked if we want  most or all of the submitted tasks can be executed in parallel. For the example above we'd need a thread pool with 8 threads to run all tasks in parallel  

Shutting Down the ExecutorService

After all tasks have completed its important to shut down the ExecutorService gracefully so that resources used by the underlying thread pool are reclaimed by the JVM. There are 2 methods available, shutdown and shutdownNow. The shutDown method triggers a shutdown of the ExecutorService, allowing currently processing tasks to finish but rejecting newly submitted tasks.

shutDownNow also triggers a shutdown of the ExecutorService, but does not allow currently executing tasks to complete, attempting to terminate them immediately. shutDownNow returns a list of tasks that were queued for execution when the shutdown was initiated.  To avoid potential resource leaks its important that the ExecutorService is shut down inside a finally block.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    ExecutorService executorService = null;
  
    try{  
        executorService = Executors.newFixedThreadPool(2);
   
        executorService.execute(()->{
            System.out.println(String.format("starting expensive task thread %s", 
                                               Thread.currentThread().getName()));
            doSomethingExpensive();    
        });
      
    }
    finally{
        executorService.shutdown();    
    }
ExecutorService shut down in finally block

Source Code  

The source code used in this post is available on Github. Feel free to pull the code and have a play around with it. As always, feel free to post comments, questions or suggestions below. 

Comments

Popular posts from this blog

Spring Boot & Amazon Web Services (EC2, RDS & S3)

Spring Web Services Tutorial

Health Checks, Metrics & More with Spring Boot Actuator

Spring JMS Tutorial with ActiveMQ

Axis2 Web Service Client Tutorial

An Introduction to Wiremock

Spring Batch Tutorial

Externalising Spring Configuration

Spring Quartz Tutorial