A Java 8 variation on the Producer-Consumer pattern

This post is an attempt to use Functional Interfaces/Lambdas brought to us by Java 8 with the Producer-Consumer pattern (the BlockingQueue implementation)
If you want to know more about this pattern see the Resources section

Requirements

  • JavaSE 8

Using Functional interfaces

The idea is to make use of java.util.function.Supplier and java.util.function.Consumer interfaces
The former will be responsible for creating the object and passing it to the producer thread to be put in the queue and the latter for consuming that object when the consumer thread retrieves it from the queue

As a consequence of this approach the logic of creating and consuming the queue’s elements will not reside in the threads but passed to them as functions

Let’s see how all this looks like

The producer thread

	class MyProducer<T> {

		private BlockingQueue<T> queue;

		public MyProducer(BlockingQueue<T> queue) {
			this.queue = queue;
		}

		/**
		 * Insert the supplied object in the queue
		 * 
		 * @param supplier
		 *            Is responsible for supplying the object that will be put
		 *            in the queue
		 */
		public void produce(Supplier<T> supplier) {
			final T msg = supplier.get();
			try {
				queue.put(msg);
				out.println("Added message: " + msg);
				
				// Simulate a long running process
				MILLISECONDS.sleep(900);
				
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
		}
	}

Here the produce() method accepts an argument of type Supplier, this functional interface is responsible for creating and returning an object of type T with its get() method

The Consumer thread

class MyConsumer<T> {

	private BlockingQueue<T> queue;

	public MyConsumer(BlockingQueue<T> queue) {
		this.queue = queue;
	}

	/**
	 * Retrieves an object from the head of the queue and passes it to the
	 * consumer
	 * 
	 * @param consumer
	 *            Contains the logic on what to do with the retrieved object
	 */
	public void consume(Consumer<T> consumer) {
		try {
			consumer.accept(queue.take());
				
			// Simulate a long running process
			MILLISECONDS.sleep(1250);
			
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
	}
}

The consume() method receives an argument or type Consumer, this functional interface will contain the logic to consume the retrieved element

Starting the producer thread

	private void startProducer() {

		final MyProducer<String> myProducer = new MyProducer<>(queue);
		final Supplier<String> supplier = () -> "Hello World";
		new Thread(() -> {
			for (int i = 0; i < MSG_NBR; i++) {
				myProducer.produce(supplier);
			}
		}).start();
	}

A simple implementation of the Supplier functional interface, which returns a “Hello World” message is created and passed to the produce() method
The thread is created with Lambda expressions, this way the MyProducer and MyConsumer classes don’t have to implement the Runnable interface
The producer will produce a number of messages hence the loop

Starting the consumer thread

	private void startConsumer() {

		final MyConsumer<String> myConsumer = new MyConsumer<>(queue);
		final Consumer<String> consumer = (s) -> out.println("Consumed message: " + s);
		new Thread(() -> {
			for (int i = 0; i < MSG_NBR; i++)
				myConsumer.consume(consumer);
		}).start();
	}

A simple implementation of the Consumer functional interface, which displays the message is created and passed to the consume() method
The consumer will consume the produced messages inside the loop

Putting it all together

import static java.lang.System.out;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
 * Starts 2 threads, a producer and a consumer
 * <br>
 * Both threads share the same BlockingQueue
 * 
 * @author Djallal Serradji
 *
 */
public class ProducerConsumer {

	private static final int MSG_NBR = 10;

	private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

	public static void main(String[] args) {
		new ProducerConsumer().startEngine();
	}

	public void startEngine() {
		startProducer();
		startConsumer();
	}

	// Producer thread
	private void startProducer() {

		final MyProducer<String> myProducer = new MyProducer<>(queue);
		final Supplier<String> supplier = () -> "Hello World";
		new Thread(() -> {
			for (int i = 0; i < MSG_NBR; i++) {
				myProducer.produce(supplier);
			}
		}).start();
	}

	// Consumer thread
	private void startConsumer() {

		final MyConsumer<String> myConsumer = new MyConsumer<>(queue);
		final Consumer<String> consumer = (s) -> out.println("Consumed message: " + s);
		new Thread(() -> {
			for (int i = 0; i < MSG_NBR; i++)
				myConsumer.consume(consumer);
		}).start();
	}

	static class MyProducer<T> {

		private BlockingQueue<T> queue;

		public MyProducer(BlockingQueue<T> queue) {
			this.queue = queue;
		}

		/**
		 * Insert the supplied object in the queue
		 * 
		 * @param supplier
		 *            Is responsible for supplying the object that will be put
		 *            in the queue
		 */
		public void produce(Supplier<T> supplier) {
			final T msg = supplier.get();
			try {
				queue.put(msg);
				out.println("Added message: " + msg);
				
				// Simulate a long running process
				MILLISECONDS.sleep(900);
				
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
		}
	}

	static class MyConsumer<T> {

		private BlockingQueue<T> queue;

		public MyConsumer(BlockingQueue<T> queue) {
			this.queue = queue;
		}

		/**
		 * Retrieves an object from the head of the queue and passes it to the
		 * consumer
		 * 
		 * @param consumer
		 *            Contains the logic on what to do with the retrieved object
		 */
		public void consume(Consumer<T> consumer) {
			try {
				consumer.accept(queue.take());
				
				// Simulate a long running process
				MILLISECONDS.sleep(1250);
				
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
		}
	}
}

The project can be found on github

What do you think about this implementation ?
Your comments are welcome.

Resources

Advertisements

Injecting external properties in Java EE 6/7 applications

My first post is about a project I started a few weeks ago after reading a good post from Markus Eisele (here) about injecting external properties into a Java EE 6/7 project with CDI enabled.
External means that the properties files are not packaged inside the war/ear file but instead deployed separately and accessible to the application’s code through its classpath, this is Application Server specific and will not be covered here.

How to generate the jar

The project is available on gitub as source code only but it’s easy to generate the jar file

Requirements

  • Java 1.7
  • Maven 3
  • Git (only needed if you want to clone the project)

Build the project

  • Clone the project (you can also download it as a zip file and import it in your IDE)
  • Type the maven command below to generate the jar file and add it to your local maven repository
    mvn clean install
  • Add the jar to your project dependencies, if you use maven add this snippet to your pom.xml
    		<dependency>
    			<groupId>com.github.djallalserradji</groupId>
    			<artifactId>inject.property</artifactId>
    			<version>${version_you_cloned}</version>
    		</dependency>
    

How it works

Let’s say we have a billing class that needs 3 web services, a Billing web service to process billing information received from customers, an email web service to notify the customer that the billing happened and a SMS web service to notify the customer on their mobile phone if they choose to.

public class BillingService {
	
	private String billingEndpoint;
	private String emailEndpoint;
	private String smsEndpoint;

	// Business logic
	// .........
}

And we want to inject the endpoint URLs stored in the external properties file billing.properties

billing.endpoint=http://billing.company.com
email.endpoint=http://email.company.com
sms.endpoint=http://sms.company.com

First annotate the class with @PropertiesFiles like this

@PropertiesFiles({"billing.properties"})
public class BillingService {

The annotation takes a list of files (coma separated)

Then annotate each field which will receive the value of a property with @Property

@PropertiesFiles({"billing.properties"})
public class BillingService {
	
	@Property("billing.endpoint")
	@Inject
	private String billingEndpoint;
	
	@Property("email.endpoint")
	@Inject
	private String emailEndpoint;
	
	@Property("sms.endpoint")
	@Inject
	private String smsEndpoint;
	
	// Business logic
	// .........
}

@Property takes a single parameter that represents the name of the property
Please note that @Inject annotation is mandatory in order for the injection to happen.

That’s it!

Alternatively, if you don’t want to use the class annotation @PropertiesFiles you can list all the properties files in a single configuration file named inject.property.xml (they will be available to all classes), this file must be available in the classpath (either packaged inside the war/ear or externally available) and has this structure:

<propertiesfiles>
	<file>file1.properties</file>
	<file>file2.properties</file>
	<file>...</file>
</propertiesfiles>

Important:

If a class is annotated with @PropertiesFiles the configuration file is ignored

The source code is available on gitub

Thank you for reading.
Your comments are welcome.