This Java Concurrency tutorial helps you understand the characteristics of LinkedBlockingQueue with detailed code example.

LinkedBlockingQueue is a BlockingQueue’s implementation with the following characteristics:

  • Internal data structure: LinkedBlockingQueue uses doubly-linked nodes. Each node contains the element and has references to its next and previous nodes. Hence the insertion and removal are executed very fast at constant time.
  • Capacity: if created with no-argument constructor, LinkedBlockingQueue can store up to Integer.MAX_VALUE of elements (can be considered unlimited or unbounded). You can specify a fixed capacity by using an optional constructor LinkedBlockingQueue(int capacity) - in this case the queue is bounded.
  • Order: since the queue uses linked nodes, it orders elements by FIFO (first-in first-out). That means the first element at the head of the queue has been on the queue the longest time. And the last element at the tail of the queue has been on the queue the shortest time.
  • Operations: implement the typical operations of a BlockingQueue: put, offer, take and poll.
  • Iterator behavior: is weakly consistent which means the iterator can be used concurrently with other operations, and it will never throw ConcurrentModificationException. However, it may not reflect any modifications after the iterator is constructed.
Now, let’s see a complete code example based on producer-consumer processing.

The producer thread continuously created and placed objects on the queue, and the consumer thread processes objects on the queue when they arrive.

The following code is for the producer class:

import java.util.*;
import java.util.concurrent.*;

/**
 * A producer that puts elements on a BlockingQueue
 * @author www.codejava.net
 */
public class Producer implements Runnable {

	private BlockingQueue<Integer> queue;

	public Producer (BlockingQueue<Integer> queue) {
		this.queue = queue;
	}

	public void run() {

		try {

			for (int i = 0; i < 50; i++) {

				  Integer number = produce();

				  queue.put(number);

				  System.out.println("PRODUCER: created " + number);

			}

			queue.put(-1);       // indicates end of producing

			System.out.println("PRODUCER: STOPPED.");

		} catch (InterruptedException ie) {

			ie.printStackTrace();

		}
	}

	private Integer produce() {
		Random random = new Random();

		Integer number = random.nextInt(100);

		// fake producing time
		try {
			Thread.sleep(random.nextInt(1000));
		} catch (InterruptedException ie) { ie.printStackTrace(); }


		return number;
	}
}
As you can see, this producer will put 50 objects on the queue. The objects are created by the produce() method which uses Random class to generate random numbers and fakes a random producing time. It puts a negative number “-1” to indicate that no objects will be created.

Next, the consumer class looks like this:

import java.util.*;
import java.util.concurrent.*;

/**
 * A consumer that takes elements from a BlockingQueue
 * @author www.codejava.net
 */
public class Consumer implements Runnable {

	private BlockingQueue<Integer> queue;

	public Consumer(BlockingQueue<Integer> queue) {
		this.queue = queue;
	}

	public void run() {

		try {

			 while (true) {

				Integer number = queue.take();

				if (number == -1) {
					System.out.println("Consumer: STOPPED.");
					break;
				}

				consume(number);
			 }



		} catch (InterruptedException ie) {

			 ie.printStackTrace();
		}
	}

	private void consume(Integer number) {
		// fake consuming time
		Random random = new Random();

		try {
			Thread.sleep(random.nextInt(1000));
		} catch (InterruptedException ie) { ie.printStackTrace(); }

		System.out.println("Consumer: processed " + number);
	}
}
As you can see, this consumer takes elements from the queue until it find a negative number “-1” which indicates the producer has stopped. The consume() method uses Random class to fake processing time.

And the following code is the test program:

import java.util.*;
import java.util.concurrent.*;

/**
 * A producer-consumer test program for LinkedBlockingQueue
 * @author www.codejava.net
 */
public class LinkedBlockingQueueTest {

	public static void main(String[] args) {

		BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

		Thread producer = new Thread(new Producer(queue));

		Thread consumer = new Thread(new Consumer(queue));

		producer.start();

		consumer.start();

	}
}


As you can see, this program uses a bounded LinkedBlockingQueue with a capacity of 10 elements. Then it creates and starts two threads: one for the producer and one for the consumer.

Run this program and observe the output. The output is random but typically it looks like this:

PRODUCER: created 80
PRODUCER: created 68
PRODUCER: created 80
PRODUCER: created 95
Consumer: processed 80
Consumer: processed 68
...
PRODUCER: created 16
Consumer: processed 80
PRODUCER: created 19
Consumer: processed 95
PRODUCER: created 36
Consumer: processed 75
PRODUCER: created 71
Consumer: processed 74
...
Consumer: processed 13
Consumer: processed 68
PRODUCER: created 10
PRODUCER: created 95
PRODUCER: created 22
PRODUCER: STOPPED.
Consumer: processed 59
Consumer: processed 43
Consumer: processed 46
Consumer: processed 73
Consumer: processed 10
Consumer: processed 95
Consumer: processed 22
Consumer: STOPPED.
At the beginning, the producer quickly fills up the queue to reach its capacity. Then the producer and consumer works in tandem as the queue is full, the producer can only put new element on the queue if the consumer processes and removes one.

And at the end, the producer stops first and the consumer processes the remaining elements in the queue.

You can experiment more by using the timeout methods offer() and poll().

Update the run() method of the Producer class to the following code:

public void run() {
	try {

		for (int i = 0; i < 50; i++) {

			  Integer number = produce();

			  boolean success = queue.offer(number, 100, TimeUnit.MILLISECONDS);

			  if (success) {
				  System.out.println("PRODUCER: created " + number);
			  } else {
				  System.out.println("PRODUCER: gave up");
			  }
		}

		queue.put(-1);       // indicates end of producing

		System.out.println("PRODUCER: STOPPED.");

	} catch (InterruptedException ie) {

		ie.printStackTrace();
	}
}
Here, by using the offer() method:  if the queue is full, the producer will wait up to a specified time before giving up, instead of waiting until space become available in the case of put() method.

And update the run() method of the Consumer class to the following code:

public void run() {

	try {

		 while (true) {

			Integer number = queue.poll(100, TimeUnit.MILLISECONDS);

			if (number != null) {
				if (number == -1) {
					System.out.println("Consumer: STOPPED.");
					break;
				}

				consume(number);
			} else {
				System.out.println("Consumer: gave up");
			}
		 }

	} catch (InterruptedException ie) {
		 ie.printStackTrace();
	}
}
Here, by using the poll() method: if the queue is empty, the consumer will wait up to a specified time before giving up, instead of waiting until space become available in the case of take() method.

You can test how this timeout methods work by increasing the producing and consuming time to longer values, and observe the results.

 

API References:

 

Other Java Concurrent Queues:

 

Other Java Concurrency Tutorials:


About the Author:

is certified Java programmer (SCJP and SCWCD). He started programming with Java in the time of Java 1.4 and has been falling in love with Java since then. Make friend with him on Facebook and watch his Java videos you YouTube.



Add comment

   


Comments 

#1Carlos Palma2021-09-22 13:23
This is brilliant! Good reminder of the workings of a fundamental class in the concurrent package
Quote