Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

THE BLOCKINGQUEUE INTERFACE OF JUC AND THE ARRAYBLOCKING QUUE IMPLEMENTATION CLASS ARE DETAILED


Jun 01, 2021 Article blog


Table of contents


Queues are a FIFO (first in, first out) data structure, and BlockingQueue in this article, is also a queue, emphasizing thread-safe features.

BlockingQueue full name: java.util.concurrent.BlockingQueue It is a thread-safe queue interface where multiple threads are able to insert data from the queue in a concurrent manner, taking the data out without thread-safe problems.

Examples of producers and consumers

BlockingQueue is typically used by consumer threads to store data into a queue, and consumer threads take data out of the queue, as follows

 THE BLOCKINGQUEUE INTERFACE OF JUC AND THE ARRAYBLOCKING QUUE IMPLEMENTATION CLASS ARE DETAILED1

  1. The producer thread keeps inserting data into the queue until the queue is full and the producer thread is blocked
  2. The consumer thread keeps taking data out of the queue until the queue is empty and the consumer thread is blocked

(Recommended tutorial: Java tutorial)

Blocking Quue method

BlockingQueue provides four different types of methods for inserting numbers, taking out data, and examining data, as follows

  1. The operation failed and an exception was thrown
  2. Return true/false immediately, regardless of true/false
  3. If the queue is empty/full, block the current thread
  4. If the queue is empty/full, block the current thread and have a timeout mechanism add(o) offer(o) put(o) offer(o, timeout, timeunit) remove remove(o) take() poll(timeout, timeunit) check element() peek() poll()

BlockingQueue's specific implementation class

BlockingQueue is just an interface that is implemented in the following classes in the actual development.

  1. ArrayBlockingQueue
  2. DelayQueue
  3. LinkedBlockingQueue
  4. PriorityBlockingQueue
  5. SynchronousQueue

Use of ArrayBlockingQueue

 THE BLOCKINGQUEUE INTERFACE OF JUC AND THE ARRAYBLOCKING QUUE IMPLEMENTATION CLASS ARE DETAILED2

Here is an example of the concrete implementation class ArrayBlockingQueue for the BlockingQueue interface. Implement a consumer and producer multithreaded model with ArrayBlockingQueue

The core content is as follows:

  1. Use ArrayBlockingQueue as a data container for producers and consumers
  2. Start 3 threads, 2 producers, 1 consumer with ExecutorService
  3. Specifies the total amount of data

(Recommended micro-class: Java micro-class)

Producer thread

ArrayBlockingQueueProducer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * 生产者线程向容器存入指定总量的 任务
 *
 */
public class ArrayBlockingQueueProducer implements Runnable {


    private static final Logger logger = LoggerFactory.getLogger(ArrayBlockingQueueProducer.class);


    // 容器
    private ArrayBlockingQueue<String> queue;
    // 生产指定的数量
    private AtomicInteger numberOfElementsToProduce;


    public ArrayBlockingQueueProducer(ArrayBlockingQueue<String> queue, AtomicInteger numberOfElementsToProduce) {
        this.queue = queue;
        this.numberOfElementsToProduce = numberOfElementsToProduce;
    }


    @Override
    public void run() {
        try {
            while (numberOfElementsToProduce.get() > 0) {
                try {
                    // 向队列中存入任务
                    String task = String.format("task_%s", numberOfElementsToProduce.getAndUpdate(x -> x-1));
                    queue.put(task);
                    logger.info("thread {}, produce task {}", Thread.currentThread().getName(), task);


                    // 任务为0,生产者线程退出
                    if (numberOfElementsToProduce.get() == 0) {
                        break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } catch (Exception e) {
            logger.error(this.getClass().getName().concat(". has error"), e);
        }


    }
}

Consumer thread

ArrayBlockingQueueConsumer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * 消费者线程向容器 消费 指定总量的任务
 *
 */
public class ArrayBlockingQueueConsumer implements Runnable {


    private static final Logger logger = LoggerFactory.getLogger(ArrayBlockingQueueConsumer.class);


    private ArrayBlockingQueue<String> queue;
    private AtomicInteger numberOfElementsToProduce;


    public ArrayBlockingQueueConsumer(ArrayBlockingQueue<String> queue, AtomicInteger numberOfElementsToProduce) {
        this.queue = queue;
        this.numberOfElementsToProduce = numberOfElementsToProduce;
    }


    @Override
    public void run() {
        try {
            while (!queue.isEmpty() || numberOfElementsToProduce.get() >= 0) {
                // 从队列中获取任务,并执行任务
                String task = queue.take();
                logger.info("thread {} consume task {}", Thread.currentThread().getName(),task);


                // 队列中数据为空,消费者线程退出
                if (queue.isEmpty()) {
                    break;
                }
            }
        } catch (Exception e) {
            logger.error(this.getClass().getName().concat(". has error"), e);
        }
    }
}

Test BlockingQueue

import com.ckjava.synchronizeds.appCache.WaitUtils;


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * 1. 以 ArrayBlockingQueue 作为生产者和消费者的数据容器 <br>
 * 2. 通过 ExecutorService 启动 3 个线程,2 两个生产者,1 个消费者 <br>
 * 3. 指定数据总量
 */
public class TestBlockingQueue {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(10);
        /*BlockingQueue delayQueue = new DelayQueue();
        BlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(10);
        BlockingQueue<String> priorityBlockingQueue = new PriorityBlockingQueue<>(10);
        BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();*/


        ExecutorService executorService = Executors.newFixedThreadPool(3);
        // 最多生产 5 个数据
        AtomicInteger numberOfElementsToProduce = new AtomicInteger(5);


        // 2 个生产者线程
        executorService.submit(new ArrayBlockingQueueProducer(arrayBlockingQueue, numberOfElementsToProduce));
        executorService.submit(new ArrayBlockingQueueProducer(arrayBlockingQueue, numberOfElementsToProduce));
        // 1 个消费者线程
        executorService.submit(new ArrayBlockingQueueConsumer(arrayBlockingQueue, numberOfElementsToProduce));


        executorService.shutdown();
        WaitUtils.waitUntil(() -> executorService.isTerminated(), 1000L);
    }
}

The output is as follows:

13:54:17.884 [pool-1-thread-3] INFO  c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_5
13:54:17.884 [pool-1-thread-1] INFO  c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-1, produce task task_5
13:54:17.884 [pool-1-thread-2] INFO  c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_4
13:54:17.887 [pool-1-thread-3] INFO  c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_4
13:54:17.887 [pool-1-thread-2] INFO  c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_2
13:54:17.887 [pool-1-thread-1] INFO  c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-1, produce task task_3
13:54:17.887 [pool-1-thread-3] INFO  c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_3
13:54:17.887 [pool-1-thread-2] INFO  c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_1
13:54:17.887 [pool-1-thread-3] INFO  c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_2
13:54:17.887 [pool-1-thread-3] INFO  c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_1

(Recommended content: Java interview basic questions)

Here's a look at JUC BlockingQueue interface and ArrayBlockingQueue implementation classes.