您的当前位置:首页正文

高并发下阻塞队列的选择

2024-11-09 来源:个人技术集锦

高并发下阻塞队列的选择

一、队列

简单的说,采用该结构的集合,对元素的存取有如下的特点:

  • 先进先出(即,存进去的元素,要在它前面的元素依次取出后,才能取出该元素)
  • 队列的入口、出口各占一侧

数据结构演示网站:

https://www.cs.usfca.edu/~galles/visualization/QueueArray.html

Queue接口

队列在Java中有对应的接口Queue

  • add:添加元素并返回true,如果队列满抛出异常
  • offer:添加元素并返回true,如果队列满返回false
  • remove:删除队首元素并返回元素,队列为空抛出异常
  • poll:删除队首元素并返回元素,队列为空返回null
  • element:返回队首元素,不移除,队列为空抛出异常
  • peek:返回队首元素,不移除,队列为空返回null

二、阻塞队列

阻塞队列(BlockingQueue)是java.util.concurrent包下的阻塞队列接口,BlockingQueue提供了线程安全的队列访问方式:向阻塞队列中插入数据时,如果队列满了,线程将会阻塞等待,直到队列有空闲,从阻塞队列中取数据时,如果队列为空,线程将会阻塞等待,直到队列非空。

BlockingQueue接口

方法抛出异常返回特定值阻塞阻塞指定时间
入队add(e)offer(e)put(e)offer(e,time,unit)
出队remove()poll()take()poll(time,unit)
获取队首元素element()peek()不支持不支持

常用方法详细说明

  • 入队
    • put(e):插入元素时,如果队列已满,进入阻塞,直到队列有空闲
    • offer(e):插入元素时,如果队列已满,返回false,不会进入阻塞
    • offer(e,time,unit):插入元素时,如果队列已满,进入阻塞,超过阻塞时间返回false
  • 出队
    • poll():获取元素时,如果队列为空,返回null,不进入阻塞
    • poll(time,unit):获取元素时,如果队列为空,进入阻塞,超过阻塞时间返回null
    • take():获取元素时,如果队列为空,进入阻塞,直到队列插入数据

应用场景

  • 线程池
    • 线程池中的任务队列通常是一个阻塞队列,当任务数超出了线程池的容量时,新提交的任务就会被存入任务队列中进行等待,线程池中的工作线程从任务队列中获取出任务进行执行,如果队列为空,那么工作线程就会被阻塞,一直等到队列中有新任务被提交。
  • 生产者-消费者
    • 生产者-消费者模型下,生产者负责向队列中添加元素,消费者负责从队列中消费元素,阻塞队列可以解决生产者和消费者的并发问题。

    • 生产者插入数据时,如果队列中满了,就会进行阻塞等待,直到消费者消费了元素

    • 消费者消费元素时,如果队列中为空,就会进行阻塞等待,直到生产者生产了元素

  • 消息队列
    • 消息队列使用了阻塞队列来存储消息,生产者生成出消息存入队列中,消费者从队列中消费消息,消息队列可以实现异步通信,提高了系统的吞吐量和响应性能,并且还可以将不同的组件解耦,提高了系统的可维护性以及可扩展性
  • 缓存系统
    • 缓存系统使用阻塞队列存储缓存数据,当缓存数据被更新时,它会被存入队列中,其他线程可以从队列中获取更新后的缓存数据进行使用,使用阻塞队列可以避免并发更新缓存数据时的冲突
  • 并发任务处理
    • 并发处理时,可以将待处理的任务存入队列中,多个工作线程可以从队列中获取任务进行处理,使用阻塞队列可以避免多个线程同时处理同一个任务的问题,并且可以把任务的提交和任务的执行进行解耦。

阻塞队列在实际场景中可以帮我们解决并发问题

三、JUC包下的阻塞队列

队列描述
ArrayBlockingQueue基于数组结构实现的一个有界阻塞队列
LinkedBlockingQueue基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列
PriorityBlockingQueue支持根据优先级排序的无界阻塞队列
DelayQueue基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列,支持延迟获取元素
SynchronousQueue不存储元素的队列
LinkedTransferQueue基于链表结构实现的一个无界阻塞队列
LinkedBlockingDeque基于链表结构实现的一个双端阻塞队列

四、ArrayBlockingQueue

ArrayBlockingQueue内部是用数组存储元素的,初始化时需要指定容量的大小

ArrayBlockingQueue基本使用

/**
 * 有界阻塞队列基本使用
 *
 * @throws InterruptedException
 */
public static void queueHandler00() throws InterruptedException {

    BlockingQueue<Object> objects = new ArrayBlockingQueue<>(1024);

    objects.put("程云");

    Object take = objects.take();

    System.out.println(take);

}

在生产者-消费者模型中,生产者生产数据的速度和消费者消费数据的速度匹配下,可以使用ArrayBlockingQueue,如果生产的速度比消费的速度快,会导致队列满,出现生产线程的阻塞。

/**
 * 有界阻塞队列 - 生产者与消费者模式
 */
public static void queueHandler01() {

    BlockingQueue<String> queue = new ArrayBlockingQueue(5);

    // 生产者生产数据
    Runnable producer = () -> {
        while (true) {
            try {
                queue.put("程云");
                System.out.println("生产者生产了一个元素,队列中元素个数:" + queue.size());
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    new Thread(producer).start();

    // 消费者消费数据
    Runnable consumer = () -> {
        while (true) {
            try {
                String take = queue.take();
                System.out.println("消费者消费了一个元素,队列中元素个数:" + queue.size());
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };

    new Thread(consumer).start();
}

流量控制

/**
 * 流量控制
 */
public static void queueHandler02() {

    BlockingQueue queue = new ArrayBlockingQueue(100);

    new Thread(new Runnable() {
        @Override
        public void run() {
            // 处理请求
            while (true) {
                // 获取队列中的请求
                Object poll = queue.poll();
                if (poll != null) {
                    // 处理请求
                    System.out.println("处理请求:" + poll + ",队列元素个数:" + queue.size());

                    try {
                        // 模拟处理时间
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }
    }).start();

    // 模拟200次请求
    for (int i = 0; i < 200; i++) {

        // 重试机制
        // 判断队列是否已满
        if (queue.size() >= 100) {
            try {
                // 等待一段时间重试
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 将请求任务添加到队列中
        queue.offer(new Object());
    }


}

ArrayBlockingQueue底层

ArrayBlockingQueue使用ReentrantLock实现了线程的安全,入队和出队操作的是同一个锁对象,那么只能有一个线程进行入队或出队,因此生产者和消费者无法并行操作,在高并发下会成为性能瓶颈。

ArrayBlockingQueue双指针

使用双指针可以避免数组的复制操作,如果使用的是单指针,当删除元素时,后面所有的元素都需要向前移动,这样会导致时间复杂度为O(n),但是使用双指针的话,我们只需要将takeIndex指向下一个元素,不需要将其前面的元素向前移动,当插入元素时,我们只需要将元素插入到putIndex指向的位置,不需要将其后面所有的元素向后移动,这样时间复杂度都是O(1)级别,提高了队列的性能。

五、LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表实现的阻塞队列,该阻塞队列的大小默认是Integer.MAX_VALUE,由于这个数值特别大,所以LinkedBlockingQueue被称为无界队列,表示几乎没有界限,并且队列还可以随着元素的添加进行动态的增长,但是如果满了,就会抛出OOM异常,因此为了避免出现异常情况,建议手动设置一个大小。

LinkedBlockingQueue基本使用

package com.cy.example.blockingqueue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @Description TODO
 * @Author 程云
 * @Date 2024/8/24 17:54
 * @Version 1.0
 */
public class LinkedBlockingQueueClass {


    public static void main(String[] args) throws InterruptedException {


        BlockingQueue<String> queue1 = new LinkedBlockingQueue<>();

        BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(1024);

        // 插入元素
        queue2.put("程云");

        // 队列长度
        System.out.println(queue2.size());

        // 获取元素
        System.out.println(queue2.take());
    }


}

LinkedBlockingQueue基本原理

LinkedBlockingQueue底层由单链表实现,只能从head取元素,从tail添加元素,并且采用了两把锁的锁分离技术实习了入队出队互不阻塞,添加元素和获取元素都有独立的锁,LinkedBlockingQueue是读写分离的,读写操作可以并行执行

六、LinkedBlockingQueue和ArrayBlockingQueue区别

  • 队列大小不同
    • ArrayBlockingQueue是有界队列,必须指定大小,LinkedBlockingQueue是无界队列,默认有大小,如果指定大小,也可成为有界队列,在无界下,如果添加速度大于移除速度,可能会出现内存溢出等问题
  • 数据存储容器不同
    • ArrayBlockingQueue的数据存储容器是数组,LinkedBlockingQueue的数据存储容器是链表

    • ArrayBlockingQueue数组存储容器,在插入或删除元素时不会产生或销毁任何额外的对象实例,LinkedBlockingQueue会生成一个Node对象,这可能会在长时间内需要高并发的处理大数据时,对于GC可能存在影响

  • 锁不同
    • ArrayBlockingQueue队列中的锁是没有分离,添加移除操作都是一个锁,LinkedBlockingQueue队列的锁是分离的,提高队列的吞吐量,在高并发下生产者和消费者可以并行执行,提高并发性能

七、DelayQueue

DelayQueue是一个支持延迟获取元素的阻塞队列,内部采用优先队列PriorityQueue存储元素,同时元素必须实现Delayed接口,在创建元素时可以指定获取元素的延迟时间,只能在指定时间内获取元素。

延迟队列不是先进先出,而是根据延迟时间的长短进行排序,下一个即将执行的任务会排到队列的最前面

DelayQueue存入的元素必须实现Delay接口,Delay接口继承了Comparable接口,因此拥有了排序的能力

package com.cy.example.blockingqueue;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @Description TODO
 * @Author 程云
 * @Date 2024/8/25 9:19
 * @Version 1.0
 */
public class DelayQueueClass {
    public static void main(String[] args) {
        DelayQueue<Delayed> delayeds = new DelayQueue<>();
    }
}

package java.util.concurrent;

/**
 * A mix-in style interface for marking objects that should be
 * acted upon after a given delay.
 *
 * <p>An implementation of this interface must define a
 * {@code compareTo} method that provides an ordering consistent with
 * its {@code getDelay} method.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     * 返回与此对象关联的剩余延迟,在给定时间单位。
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

DelayQueue应用场景

  • 商城订单超时关闭
  • 异步短信通知功能
  • 关闭空闲连接
  • 缓存过期清楚
  • 任务超时处理

订单延迟消费

package com.cy.example.blockingqueue;

import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @Description 延迟订单业务
 * @Author 程云
 * @Date 2024/8/25 9:38
 * @Version 1.0
 */
public class Order implements Delayed {

    private String orderId;

    private ZonedDateTime expireTime;

    

    public Order() {
    }

    public Order(String orderId, ZonedDateTime expireTime) {
        this.orderId = orderId;
        this.expireTime = expireTime;
    }


    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public ZonedDateTime getExpireTime() {
        return expireTime;
    }

    public void setExpireTime(ZonedDateTime expireTime) {
        this.expireTime = expireTime;
    }

    /**
     * 计算出订单延迟剩余的时间
     *
     * @param unit 时间单位
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        // 过期时间转换时间戳 - 当前系统的时间戳
        long convert = unit.convert(expireTime.toInstant().toEpochMilli() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

        return convert;
    }

    /**
     * 排序时间
     *
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
            return 1;
        } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
            return -1;
        } else {
            return 0;
        }
    }


    public static void main(String[] args) throws InterruptedException {


        DelayQueue<Order> delayeds = new DelayQueue<>();
        delayeds.put(new Order("order1001",ZonedDateTime.now(ZoneId.of("UTC")).plus(5,ChronoUnit.SECONDS)));
        delayeds.put(new Order("order1002",ZonedDateTime.now(ZoneId.of("UTC")).plus(2,ChronoUnit.SECONDS)));
        delayeds.put(new Order("order1003",ZonedDateTime.now(ZoneId.of("UTC")).plus(4,ChronoUnit.SECONDS)));

        while (!delayeds.isEmpty()){
            Order order = delayeds.take();
            System.out.println("处理订单:"+order.getOrderId());
        }
    }
}

DelayQueue底层原理

  • DelayQueue内部使用了PriorityQueue队列来维护元素的排序,元素根据延迟时间来进行排序,延迟时间最短的元素排在队列的头部
  • 使用take或poll方法获取元素时,如果队列为空,进入阻塞,直到元素达到指定延迟时间
  • 元素的延迟时间通过getDelay方法返回,如果返回值小于等于0,表示该元素已经到期,可以从队列中取出

八、如何选择合适的阻塞队列

  • 功能
    • 是否需要阻塞队列帮我们排序,比如优先级排序、延迟执行等,如果有这个需要,就选择类似于PriorityBlockingQueue之类的有排序能力的阻塞队列
  • 容量
    • 是否有存储的要求,还是只需要直接传递,在考虑这一点的时候,我们知道前面介绍的那几种阻塞队列,有的是容量固定的,如 ArrayBlockingQueue;有的 默认是容量无限的,如 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数量来推算出合适的容量,从而去选取合适的 BlockingQueue。
  • 扩容
    • 是能否扩容。因为有时我们并不能在初始的时候很好的准确估计队列的大小,因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。
  • 内存结构
    • 我们分析过 ArrayBlockingQueue 的源码,看到了它的内部结构是“数组”的形式。和它不同的是,LinkedBlockingQueue 的内部是用链表实现的,所以这里就需要我们考虑到,ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。
  • 性能
    • 从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑SynchronousQueue。

线程池对于阻塞队列的选择

线程池有很多种,不同种类的线程池会根据自己的特点,选择合适的阻塞队列

Executors类下的线程池类型:

  • FixedThreadPool(SingleThreadExecutor同理)
    • 选择的是LinkedBlockingQueue
  • CachedThreadPool
    • 选择的是SynchronousQueue
  • ScheduledThreadPool(SingleThreadScheduledExecutor同理)
    内存的结构角度去考虑这个问题。
  • 性能
    • 从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑SynchronousQueue。

线程池对于阻塞队列的选择

线程池有很多种,不同种类的线程池会根据自己的特点,选择合适的阻塞队列

Executors类下的线程池类型:

  • FixedThreadPool(SingleThreadExecutor同理)
    • 选择的是LinkedBlockingQueue
  • CachedThreadPool
    • 选择的是SynchronousQueue
  • ScheduledThreadPool(SingleThreadScheduledExecutor同理)
    • 选择的是延迟队列
Top