简单的说,采用该结构的集合,对元素的存取有如下的特点:
前面
的元素依次取出后,才能取出该元素)数据结构演示网站:
https://www.cs.usfca.edu/~galles/visualization/QueueArray.html
Queue接口
队列在Java中有对应的接口Queue
阻塞队列(BlockingQueue)是java.util.concurrent包下的阻塞队列接口,BlockingQueue提供了线程安全的队列访问方式:向阻塞队列中插入数据时,如果队列满了,线程将会阻塞等待,直到队列有空闲,从阻塞队列中取数据时,如果队列为空,线程将会阻塞等待,直到队列非空。
BlockingQueue接口
方法 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞指定时间 |
---|---|---|---|---|
入队 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
出队 | remove() | poll() | take() | poll(time,unit) |
获取队首元素 | element() | peek() | 不支持 | 不支持 |
常用方法详细说明
应用场景
生产者-消费者模型下,生产者负责向队列中添加元素,消费者负责从队列中消费元素,阻塞队列可以解决生产者和消费者的并发问题。
生产者插入数据时,如果队列中满了,就会进行阻塞等待,直到消费者消费了元素
消费者消费元素时,如果队列中为空,就会进行阻塞等待,直到生产者生产了元素
阻塞队列在实际场景中可以帮我们解决并发问题
队列 | 描述 |
---|---|
ArrayBlockingQueue | 基于数组结构实现的一个有界阻塞队列 |
LinkedBlockingQueue | 基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列 |
PriorityBlockingQueue | 支持根据优先级排序的无界阻塞队列 |
DelayQueue | 基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列,支持延迟获取元素 |
SynchronousQueue | 不存储元素的队列 |
LinkedTransferQueue | 基于链表结构实现的一个无界阻塞队列 |
LinkedBlockingDeque | 基于链表结构实现的一个双端阻塞队列 |
ArrayBlockingQueue内部是用数组存储元素的,初始化时需要指定容量的大小
ArrayBlockingQueue基本使用
/**
* 有界阻塞队列基本使用
*
* @throws InterruptedException
*/
public static void queueHandler00() throws InterruptedException {
BlockingQueue<Object> objects = new ArrayBlockingQueue<>(1024);
objects.put("程云");
Object take = objects.take();
System.out.println(take);
}
在生产者-消费者模型中,生产者生产数据的速度和消费者消费数据的速度匹配下,可以使用ArrayBlockingQueue,如果生产的速度比消费的速度快,会导致队列满,出现生产线程的阻塞。
/**
* 有界阻塞队列 - 生产者与消费者模式
*/
public static void queueHandler01() {
BlockingQueue<String> queue = new ArrayBlockingQueue(5);
// 生产者生产数据
Runnable producer = () -> {
while (true) {
try {
queue.put("程云");
System.out.println("生产者生产了一个元素,队列中元素个数:" + queue.size());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(producer).start();
// 消费者消费数据
Runnable consumer = () -> {
while (true) {
try {
String take = queue.take();
System.out.println("消费者消费了一个元素,队列中元素个数:" + queue.size());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(consumer).start();
}
流量控制
/**
* 流量控制
*/
public static void queueHandler02() {
BlockingQueue queue = new ArrayBlockingQueue(100);
new Thread(new Runnable() {
@Override
public void run() {
// 处理请求
while (true) {
// 获取队列中的请求
Object poll = queue.poll();
if (poll != null) {
// 处理请求
System.out.println("处理请求:" + poll + ",队列元素个数:" + queue.size());
try {
// 模拟处理时间
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}).start();
// 模拟200次请求
for (int i = 0; i < 200; i++) {
// 重试机制
// 判断队列是否已满
if (queue.size() >= 100) {
try {
// 等待一段时间重试
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 将请求任务添加到队列中
queue.offer(new Object());
}
}
ArrayBlockingQueue底层
ArrayBlockingQueue使用ReentrantLock实现了线程的安全,入队和出队操作的是同一个锁对象,那么只能有一个线程进行入队或出队,因此生产者和消费者无法并行操作,在高并发下会成为性能瓶颈。
ArrayBlockingQueue双指针
使用双指针可以避免数组的复制操作,如果使用的是单指针,当删除元素时,后面所有的元素都需要向前移动,这样会导致时间复杂度为O(n),但是使用双指针的话,我们只需要将takeIndex指向下一个元素,不需要将其前面的元素向前移动,当插入元素时,我们只需要将元素插入到putIndex指向的位置,不需要将其后面所有的元素向后移动,这样时间复杂度都是O(1)级别,提高了队列的性能。
LinkedBlockingQueue是一个基于链表实现的阻塞队列,该阻塞队列的大小默认是Integer.MAX_VALUE,由于这个数值特别大,所以LinkedBlockingQueue被称为无界队列,表示几乎没有界限,并且队列还可以随着元素的添加进行动态的增长,但是如果满了,就会抛出OOM异常,因此为了避免出现异常情况,建议手动设置一个大小。
LinkedBlockingQueue基本使用
package com.cy.example.blockingqueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @Description TODO
* @Author 程云
* @Date 2024/8/24 17:54
* @Version 1.0
*/
public class LinkedBlockingQueueClass {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue1 = new LinkedBlockingQueue<>();
BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(1024);
// 插入元素
queue2.put("程云");
// 队列长度
System.out.println(queue2.size());
// 获取元素
System.out.println(queue2.take());
}
}
LinkedBlockingQueue基本原理
LinkedBlockingQueue底层由单链表实现,只能从head取元素,从tail添加元素,并且采用了两把锁的锁分离技术实习了入队出队互不阻塞,添加元素和获取元素都有独立的锁,LinkedBlockingQueue是读写分离的,读写操作可以并行执行
ArrayBlockingQueue的数据存储容器是数组,LinkedBlockingQueue的数据存储容器是链表
ArrayBlockingQueue数组存储容器,在插入或删除元素时不会产生或销毁任何额外的对象实例,LinkedBlockingQueue会生成一个Node对象,这可能会在长时间内需要高并发的处理大数据时,对于GC可能存在影响
DelayQueue是一个支持延迟获取元素的阻塞队列,内部采用优先队列PriorityQueue存储元素,同时元素必须实现Delayed接口,在创建元素时可以指定获取元素的延迟时间,只能在指定时间内获取元素。
延迟队列不是先进先出,而是根据延迟时间的长短进行排序,下一个即将执行的任务会排到队列的最前面
DelayQueue存入的元素必须实现Delay接口,Delay接口继承了Comparable接口,因此拥有了排序的能力
package com.cy.example.blockingqueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @Description TODO
* @Author 程云
* @Date 2024/8/25 9:19
* @Version 1.0
*/
public class DelayQueueClass {
public static void main(String[] args) {
DelayQueue<Delayed> delayeds = new DelayQueue<>();
}
}
package java.util.concurrent;
/**
* A mix-in style interface for marking objects that should be
* acted upon after a given delay.
*
* <p>An implementation of this interface must define a
* {@code compareTo} method that provides an ordering consistent with
* its {@code getDelay} method.
*
* @since 1.5
* @author Doug Lea
*/
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
* 返回与此对象关联的剩余延迟,在给定时间单位。
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
DelayQueue应用场景
订单延迟消费
package com.cy.example.blockingqueue;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @Description 延迟订单业务
* @Author 程云
* @Date 2024/8/25 9:38
* @Version 1.0
*/
public class Order implements Delayed {
private String orderId;
private ZonedDateTime expireTime;
public Order() {
}
public Order(String orderId, ZonedDateTime expireTime) {
this.orderId = orderId;
this.expireTime = expireTime;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public ZonedDateTime getExpireTime() {
return expireTime;
}
public void setExpireTime(ZonedDateTime expireTime) {
this.expireTime = expireTime;
}
/**
* 计算出订单延迟剩余的时间
*
* @param unit 时间单位
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
// 过期时间转换时间戳 - 当前系统的时间戳
long convert = unit.convert(expireTime.toInstant().toEpochMilli() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return convert;
}
/**
* 排序时间
*
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
public static void main(String[] args) throws InterruptedException {
DelayQueue<Order> delayeds = new DelayQueue<>();
delayeds.put(new Order("order1001",ZonedDateTime.now(ZoneId.of("UTC")).plus(5,ChronoUnit.SECONDS)));
delayeds.put(new Order("order1002",ZonedDateTime.now(ZoneId.of("UTC")).plus(2,ChronoUnit.SECONDS)));
delayeds.put(new Order("order1003",ZonedDateTime.now(ZoneId.of("UTC")).plus(4,ChronoUnit.SECONDS)));
while (!delayeds.isEmpty()){
Order order = delayeds.take();
System.out.println("处理订单:"+order.getOrderId());
}
}
}
DelayQueue底层原理
线程池对于阻塞队列的选择
线程池有很多种,不同种类的线程池会根据自己的特点,选择合适的阻塞队列
Executors类下的线程池类型:
线程池对于阻塞队列的选择
线程池有很多种,不同种类的线程池会根据自己的特点,选择合适的阻塞队列
Executors类下的线程池类型: