Jun 01, 2021 Article blog
Queues are a
FIFO
(first in, first out) data structure, and
BlockingQueue
in this article, is also a queue, emphasizing thread-safe features.
BlockingQueue
full name:
java.util.concurrent.BlockingQueue
It is a thread-safe queue interface where multiple threads are able to insert data from the queue in a concurrent manner, taking the data out without thread-safe problems.
BlockingQueue
is typically used by consumer threads to store data into a queue, and consumer threads take data out of the queue, as follows
(Recommended tutorial: Java tutorial)
BlockingQueue
provides four different types of methods for inserting numbers, taking out data, and examining data, as follows
true/false
add(o)
offer(o)
put(o)
offer(o, timeout, timeunit)
remove
remove(o)
take()
poll(timeout, timeunit)
check
element()
peek()
poll()
BlockingQueue
is just an interface that is implemented in the following classes in the actual development.
Here is an example of the concrete implementation class
ArrayBlockingQueue
for the
BlockingQueue
interface.
Implement a consumer and producer multithreaded model with
ArrayBlockingQueue
The core content is as follows:
ArrayBlockingQueue
as a data container for producers and consumers
ExecutorService
(Recommended micro-class: Java micro-class)
ArrayBlockingQueueProducer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 生产者线程向容器存入指定总量的 任务
*
*/
public class ArrayBlockingQueueProducer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ArrayBlockingQueueProducer.class);
// 容器
private ArrayBlockingQueue<String> queue;
// 生产指定的数量
private AtomicInteger numberOfElementsToProduce;
public ArrayBlockingQueueProducer(ArrayBlockingQueue<String> queue, AtomicInteger numberOfElementsToProduce) {
this.queue = queue;
this.numberOfElementsToProduce = numberOfElementsToProduce;
}
@Override
public void run() {
try {
while (numberOfElementsToProduce.get() > 0) {
try {
// 向队列中存入任务
String task = String.format("task_%s", numberOfElementsToProduce.getAndUpdate(x -> x-1));
queue.put(task);
logger.info("thread {}, produce task {}", Thread.currentThread().getName(), task);
// 任务为0,生产者线程退出
if (numberOfElementsToProduce.get() == 0) {
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
logger.error(this.getClass().getName().concat(". has error"), e);
}
}
}
ArrayBlockingQueueConsumer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 消费者线程向容器 消费 指定总量的任务
*
*/
public class ArrayBlockingQueueConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ArrayBlockingQueueConsumer.class);
private ArrayBlockingQueue<String> queue;
private AtomicInteger numberOfElementsToProduce;
public ArrayBlockingQueueConsumer(ArrayBlockingQueue<String> queue, AtomicInteger numberOfElementsToProduce) {
this.queue = queue;
this.numberOfElementsToProduce = numberOfElementsToProduce;
}
@Override
public void run() {
try {
while (!queue.isEmpty() || numberOfElementsToProduce.get() >= 0) {
// 从队列中获取任务,并执行任务
String task = queue.take();
logger.info("thread {} consume task {}", Thread.currentThread().getName(),task);
// 队列中数据为空,消费者线程退出
if (queue.isEmpty()) {
break;
}
}
} catch (Exception e) {
logger.error(this.getClass().getName().concat(". has error"), e);
}
}
}
Test BlockingQueue
import com.ckjava.synchronizeds.appCache.WaitUtils;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 1. 以 ArrayBlockingQueue 作为生产者和消费者的数据容器 <br>
* 2. 通过 ExecutorService 启动 3 个线程,2 两个生产者,1 个消费者 <br>
* 3. 指定数据总量
*/
public class TestBlockingQueue {
public static void main(String[] args) {
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(10);
/*BlockingQueue delayQueue = new DelayQueue();
BlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(10);
BlockingQueue<String> priorityBlockingQueue = new PriorityBlockingQueue<>(10);
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();*/
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 最多生产 5 个数据
AtomicInteger numberOfElementsToProduce = new AtomicInteger(5);
// 2 个生产者线程
executorService.submit(new ArrayBlockingQueueProducer(arrayBlockingQueue, numberOfElementsToProduce));
executorService.submit(new ArrayBlockingQueueProducer(arrayBlockingQueue, numberOfElementsToProduce));
// 1 个消费者线程
executorService.submit(new ArrayBlockingQueueConsumer(arrayBlockingQueue, numberOfElementsToProduce));
executorService.shutdown();
WaitUtils.waitUntil(() -> executorService.isTerminated(), 1000L);
}
}
The output is as follows:
13:54:17.884 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_5
13:54:17.884 [pool-1-thread-1] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-1, produce task task_5
13:54:17.884 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_4
13:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_4
13:54:17.887 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_2
13:54:17.887 [pool-1-thread-1] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-1, produce task task_3
13:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_3
13:54:17.887 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_1
13:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_2
13:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_1
(Recommended content: Java interview basic questions)
Here's a look at
JUC
BlockingQueue
interface and
ArrayBlockingQueue
implementation classes.