This Java Concurrency tutorial helps you understand ArrayBlockingQueue - a concurrent collection with code examples.

ArrayBlockingQueue is a BlockingQueue implementation with the following characteristics:

  • Internal data structure: it is based on a circular array to store elements. The first and the last elements of the array are treated logically adjacent, hence circular array.

The queue advances the indices of the head and tail elements whenever an element is added/removed from the queue. When either index advances the last element in the array, it is restarted from 0.

This mechanism is better than using a regular array, as the queue doesn’t have to shift all the elements whenever the head is removed. However, if you remove an element in the middle (via Iterator.remove), it must shift the elements.

  • Capacity: there’s a limit on the number of elements an ArrayBlockingQueue can hold, as you must specify a capacity when constructing a new object of this class. Hence ArrayBlockingQueueis a bounded BlockingQueue.
  • Order: since it is based on an array, elements are ordered by FIFO (First-In First-Out).
  • Operations: queue insertion and removal are executed in constant time (very fast).
  • Iterator: the iterators returned by are ArrayBlockingQueue weakly consistent which means the iterator may be used concurrently with other operations. It will never throw ConcurrentModificationException and support remove operation.
You also need to pay attention when creating a new object of ArrayBlockingQueue as it provides 3 constructors:

ArrayBlockingQueue(int capacity)

ArrayBlockingQueue(int capacity, boolean fair)

ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

Notice the parameter fair specifies the access policy for threads blocked on insertion or removal. Consider this scenario: several threads blocked because the queue is either full or empty, so when space becomes available or elements available, which thread is granted access first? which is granted next?

 

So ArrayBlockingQueue allows you to choose one of two policies according to the value of the parameter fair:

  • if fair is set to true: threads are granted access in FIFO order, which means the longest waiting thread will have access first.
  • if fair is set to false: the access order is unspecified.
By default (as using the first constructor), the access order is unspecified.

Now, let’s see how to use the ArrayBlockingQueuein details with code examples.

 

1. Creating a new ArrayBlockingQueue



The following line creates an ArrayBlockingQueue object with a fixed capacity of 100 elements of type String:

BlockingQueue<String> queue1 = new ArrayBlockingQueue<>(100); 
This line creates an ArrayBlockingQueue object with a fixed capacity of 50 Integer elements and the access policy is fair:

BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<>(50, true);
And the following line creates an ArrayBlockingQueue object with a fixed capacity, fair access policy and initially elements from a given collection:

List<String> list = Arrays.asList("One", "Two", "Three");
BlockingQueue<String> queue3 = new ArrayBlockingQueue<>(50, true, list);
 

2. Inserting an element to the tail of the queue

The following code snippet uses the put() method to insert an element to the tail of the queue:

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

try {
	queue.put("Four");
} catch (InterruptedException ie) {
	ie.printStackTrace();
}
Remember the put() method will block if the queue is full, waiting for space becomes available. And while waiting, it will throw InterruptedException if the current thread is interrupted.

In case you want to wait only a specified amount of time when the queue is full, use the offer(element, timeout, timeunit) method. For example:

try {
	long timeout = 1000;
	boolean success = queue.offer("Five", timeout, TimeUnit.MILLISECONDS);

	if (!success) {
		System.out.println("Queue is full and timeout elapsed");
	}

} catch (InterruptedException ie) {
	ie.printStackTrace();
}
In this code, the offer()method will return false if the queue is full and the current thread has been waiting for more than the specified time.

 

3. Retrieving and removing an element from the head of the queue

The following code snippet uses the take() method to retrieve and remove an element at the head of the queue, waiting if necessary until an element becomes available in case the queue is empty:

try {
	String head = queue.take();
	System.out.println("Head element: " + head);

} catch (InterruptedException ie) {
	ie.printStackTrace();
}
And use the poll(timeout, timeunit) method if you want to wait up to a specified time if the queue is empty, for an element to become available. Here’s an example:

try {
	long timeout = 1000;
	String head = queue.poll(timeout, TimeUnit.MILLISECONDS);
	System.out.println("Head element: " + head);

} catch (InterruptedException ie) {
	ie.printStackTrace();
}
Note that the poll(timeout, timeunit) method will return null if the timeout expires.

 

4. Draining the queue

When the queue is no longer used with insertion and removal operations, and if you want to get all the remaining elements in the queue, use the drainTo(collection) method that removes all available elements in the queue and adds them to the specified collection. Here’s an example:

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

//...
// insertion and removal operations by threads
//...

List<String> list = new ArrayList<>();

queue.drainTo(list);
Using this method is more efficient than repeated polling the queue.

 

5. A Producer - Consumer Example using ArrayBlockingQueue

Let’s see a full example that makes use of ArrayBlockingQueue in a file text search program that searches all text files in a directory for a given keyword. The program will print which file contains the keyword and at which line.

The producer class is a thread that recursively lists the content of a given directory for files that match the given extension, and then it puts each file on the queue. The following code is of the producer class:

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

/**
 * This producer enumerates files of a specific type in a given directory
 * and then put the files on the queue.
 *
 * @author www.codejava.net
 */
public class DirectoryLister extends Thread {
	private BlockingQueue<File> queue;
	private File directory;
	private String extension;

	public DirectoryLister(BlockingQueue<File> queue,
			File directory, String extension) {
		this.queue = queue;
		this.directory = directory;
		this.extension = extension;
	}

	public void run() {

		try {

			listDirectory(directory);
			queue.put(new File("END"));

		} catch (InterruptedException ie) {
			ie.printStackTrace();
		}
	}

	private void listDirectory(File dir) throws InterruptedException {
		File[] files = dir.listFiles(new FileFilter() {
			public boolean accept(File file) {
				return file.getName().endsWith(extension);
			}
		});

		if (files != null && files.length > 0) {
			for (File aFile : files) {
				if (aFile.isDirectory()) {
					listDirectory(aFile);
				} else {
					queue.put(aFile);
				}
			}
		}
	}
}
You can notice that the producer, after reading directory’s content, put a special element on the queue:

queue.put(new File("END"));
This acts as a “signal” element that tells the consumer that there’s no more elements added to the queue. You can see how the consumer class processes elements from the queue in the following code:

import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;

/**
 * This consumer takes a File from the queue, and parses the file
 * to search for the given keyword.
 *
 * @author www.codejava.net
 */
public class FileParser extends Thread {
	private BlockingQueue<File> queue;
	private String keyword;

	public FileParser(BlockingQueue<File> queue, String keyword) {
		this.queue = queue;
		this.keyword = keyword;
	}

	public void run() {
		while (true) {
			try {
				File file = queue.peek();

				if (file != null && !file.getName().equals("END")) {
					file = queue.take();
					parseFile(file);
				} else {
					break;
				}
			} catch (InterruptedException | IOException ex) {
				ex.printStackTrace();
			}
		}
	}

	private void parseFile(File file) throws IOException {
		List<String> lines = Files.readAllLines(file.toPath());
		int lineCount = 0;

		for (String aLine : lines) {
			lineCount++;
			if (aLine.contains(keyword)) {
				String result = "Found in %s at line %d\n";
				System.out.printf(result, file.getAbsolutePath(), lineCount);
				break;
			}
		}

	}
}
First, it examines the head of the queue:

File file = queue.peek();
If the head is not the “signal element”, it removes the head from the queue and parses the file:

if (file != null && !file.getName().equals("END")) {
	file = queue.take();
	parseFile(file);
} else {
	break;
}
If the head is the “signal element”, exits the while loop and the thread terminates. Note that we must keep the “signal element” always at the head of the queue so that other consumers can examine and act accordingly, hence the peek() method is used first, then take() is used if the element is actually a file.

And here’s code of the main program:

import java.io.*;
import java.util.concurrent.*;

/**
 * This program demonstrates using ArrayBlockingQueue
 * in a producer-consumer application.
 *
 * @author www.codejava.net
 */
public class FileTextSearch {
	public static void main(String[] args) {
		String dirPath = args[0];
		String extension = args[1];
		String keyword = args[2];

		BlockingQueue<File> queue = new ArrayBlockingQueue<>(100);

		DirectoryLister lister = new DirectoryLister(queue, new File(dirPath), extension);
		lister.start();

		for (int i = 0; i < 10; i++) {
			FileParser parser = new FileParser(queue, keyword);
			parser.start();
		}
	}
}
As you can see, the program creates one producer thread and 10 consumer threads, sharing an ArrayBlockingQueuewhich contains a maximum of 100 elements. Run this program from command like this:

java FileTextSearch “D:\JDKSource\java\util\concurrent” “.java” “BlockingQueue”
The program will search all .java files in the D:\JDKSource\java\util\concurrent directory to find the files that contain the text “BlockingQueue”. The output looks something like this:

Found in D:\JDKSource\java\util\concurrent\ArrayBlockingQueue.java at line 82
Found in D:\JDKSource\java\util\concurrent\package-info.java at line 114
Found in D:\JDKSource\java\util\concurrent\ThreadPoolExecutor.java at line 183
As you can see, an ArrayBlockingQueue is shared among multiple threads but we don’t have to write any code for synchronization or locking, as the ArrayBlockingQueue handles concurrent threads itself.

Now, try to run this program on your computer with different inputs and check the results.

 

API References:

 

Other Java Concurrent Queues:

 

Other Java Concurrency Tutorials:


About the Author:

is certified Java programmer (SCJP and SCWCD). He started programming with Java in the time of Java 1.4 and has been falling in love with Java since then. Make friend with him on Facebook and watch his Java videos you YouTube.



Add comment