A Java 8 variation on the Producer-Consumer pattern

This post is an attempt to use Functional Interfaces/Lambdas brought to us by Java 8 with the Producer-Consumer pattern (the BlockingQueue implementation)
If you want to know more about this pattern see the Resources section

Requirements

  • JavaSE 8

Using Functional interfaces

The idea is to make use of java.util.function.Supplier and java.util.function.Consumer interfaces
The former will be responsible for creating the object and passing it to the producer thread to be put in the queue and the latter for consuming that object when the consumer thread retrieves it from the queue

As a consequence of this approach the logic of creating and consuming the queue’s elements will not reside in the threads but passed to them as functions

Let’s see how all this looks like

The producer thread

	class MyProducer<T> {

		private BlockingQueue<T> queue;

		public MyProducer(BlockingQueue<T> queue) {
			this.queue = queue;
		}

		/**
		 * Insert the supplied object in the queue
		 * 
		 * @param supplier
		 *            Is responsible for supplying the object that will be put
		 *            in the queue
		 */
		public void produce(Supplier<T> supplier) {
			final T msg = supplier.get();
			try {
				queue.put(msg);
				out.println("Added message: " + msg);
				
				// Simulate a long running process
				MILLISECONDS.sleep(900);
				
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
		}
	}

Here the produce() method accepts an argument of type Supplier, this functional interface is responsible for creating and returning an object of type T with its get() method

The Consumer thread

class MyConsumer<T> {

	private BlockingQueue<T> queue;

	public MyConsumer(BlockingQueue<T> queue) {
		this.queue = queue;
	}

	/**
	 * Retrieves an object from the head of the queue and passes it to the
	 * consumer
	 * 
	 * @param consumer
	 *            Contains the logic on what to do with the retrieved object
	 */
	public void consume(Consumer<T> consumer) {
		try {
			consumer.accept(queue.take());
				
			// Simulate a long running process
			MILLISECONDS.sleep(1250);
			
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
	}
}

The consume() method receives an argument or type Consumer, this functional interface will contain the logic to consume the retrieved element

Starting the producer thread

	private void startProducer() {

		final MyProducer<String> myProducer = new MyProducer<>(queue);
		final Supplier<String> supplier = () -> "Hello World";
		new Thread(() -> {
			for (int i = 0; i < MSG_NBR; i++) {
				myProducer.produce(supplier);
			}
		}).start();
	}

A simple implementation of the Supplier functional interface, which returns a “Hello World” message is created and passed to the produce() method
The thread is created with Lambda expressions, this way the MyProducer and MyConsumer classes don’t have to implement the Runnable interface
The producer will produce a number of messages hence the loop

Starting the consumer thread

	private void startConsumer() {

		final MyConsumer<String> myConsumer = new MyConsumer<>(queue);
		final Consumer<String> consumer = (s) -> out.println("Consumed message: " + s);
		new Thread(() -> {
			for (int i = 0; i < MSG_NBR; i++)
				myConsumer.consume(consumer);
		}).start();
	}

A simple implementation of the Consumer functional interface, which displays the message is created and passed to the consume() method
The consumer will consume the produced messages inside the loop

Putting it all together

import static java.lang.System.out;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
 * Starts 2 threads, a producer and a consumer
 * <br>
 * Both threads share the same BlockingQueue
 * 
 * @author Djallal Serradji
 *
 */
public class ProducerConsumer {

	private static final int MSG_NBR = 10;

	private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

	public static void main(String[] args) {
		new ProducerConsumer().startEngine();
	}

	public void startEngine() {
		startProducer();
		startConsumer();
	}

	// Producer thread
	private void startProducer() {

		final MyProducer<String> myProducer = new MyProducer<>(queue);
		final Supplier<String> supplier = () -> "Hello World";
		new Thread(() -> {
			for (int i = 0; i < MSG_NBR; i++) {
				myProducer.produce(supplier);
			}
		}).start();
	}

	// Consumer thread
	private void startConsumer() {

		final MyConsumer<String> myConsumer = new MyConsumer<>(queue);
		final Consumer<String> consumer = (s) -> out.println("Consumed message: " + s);
		new Thread(() -> {
			for (int i = 0; i < MSG_NBR; i++)
				myConsumer.consume(consumer);
		}).start();
	}

	static class MyProducer<T> {

		private BlockingQueue<T> queue;

		public MyProducer(BlockingQueue<T> queue) {
			this.queue = queue;
		}

		/**
		 * Insert the supplied object in the queue
		 * 
		 * @param supplier
		 *            Is responsible for supplying the object that will be put
		 *            in the queue
		 */
		public void produce(Supplier<T> supplier) {
			final T msg = supplier.get();
			try {
				queue.put(msg);
				out.println("Added message: " + msg);
				
				// Simulate a long running process
				MILLISECONDS.sleep(900);
				
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
		}
	}

	static class MyConsumer<T> {

		private BlockingQueue<T> queue;

		public MyConsumer(BlockingQueue<T> queue) {
			this.queue = queue;
		}

		/**
		 * Retrieves an object from the head of the queue and passes it to the
		 * consumer
		 * 
		 * @param consumer
		 *            Contains the logic on what to do with the retrieved object
		 */
		public void consume(Consumer<T> consumer) {
			try {
				consumer.accept(queue.take());
				
				// Simulate a long running process
				MILLISECONDS.sleep(1250);
				
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
		}
	}
}

The project can be found on github

What do you think about this implementation ?
Your comments are welcome.

Resources

Advertisements

One thought on “A Java 8 variation on the Producer-Consumer pattern

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s