网站建设的成功经验,信息图表网站,小地方的旅游网站怎么建设,海南响应式网站建设方案上文我们了解了多线程案例中的单例模式#xff0c;此文我们来探讨多线程案例之阻塞队列吧 1. 阻塞队列是什么#xff1f;
阻塞队列是⼀种特殊的队列.也遵守先进先出的原则. 阻塞队列是⼀种线程安全的数据结构,并且具有以下特性:
当队列满的时候,继续⼊队列就会… 上文我们了解了多线程案例中的单例模式此文我们来探讨多线程案例之阻塞队列吧 1. 阻塞队列是什么
阻塞队列是⼀种特殊的队列.也遵守先进先出的原则. 阻塞队列是⼀种线程安全的数据结构,并且具有以下特性:
当队列满的时候,继续⼊队列就会阻塞,直到有其他线程从队列中取⾛元素.当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插⼊元素.
阻塞队列的⼀个典型应用场景就是生产者消费者模型.这是⼀种非常典型的开发模型
那么什么是生产者消费者模型呢
1.1 生产者消费者模型
1. 生产者消费者模式就是通过⼀个容器来解决生产者和消费者的强耦合问题
生产者和消费者彼此之间不直接通讯而是通过阻塞队列来进行通讯所以生产者生产完数据之后不用等待消费者处理直接扔给阻塞队列消费者不找⽣产者要数据而是直接从阻塞队列⾥取.
应用场景 一对夫妻开包子店早上夫妻二人包包子女的负责擀包子皮擀好的包子皮放在桌板上类似于阻塞队列男的负责包包子其中擀包子皮的就是生产者包包子的就是消费者。擀包子皮的人不在意谁消耗他生产出来的包子皮包包子的人也不在意是谁生产的包子皮能用就可以
2. 阻塞队列就相当于⼀个缓冲区平衡了⽣产者和消费者的处理能⼒. (削峰填⾕
应用场景 在每年的双十一、双十二购物节服务器同⼀时刻可能会收到大量的支付请求如果同时处理这些支付请求服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放到⼀个阻塞队列中(类似于上面提到的放包子皮的桌板消费者不需要按照生产者的请求速度来完成可以按照自己的速度来完成就保证了不会服务器崩掉), 然后再由消费者线程慢慢的来处理每个⽀付请求.这样做可以有效进⾏ “削峰”, 防⽌服务器被突然到来的⼀波请求直接冲垮.
3. 阻塞队列可以实现异步操作
1.2 标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在⼀些程序中使⽤阻塞队列, 直接使⽤标准库中的即可.
BlockingQueue 是⼀个接口. 真正实现的类是 LinkedBlockingQueue.put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性. 1.3 消息队列的自我实现
首先我们采用的是循环队列的方式实现消息队列 定义一个数组用于存放消息 定义一个头尾指针标记消息队列的队头和队尾 定义一个size用于标记有效数据个数 //定义一个消息队列数组
private Integer [] informationnull;
//定义一个头指针
private int head0;
//尾指针
private int tail0;
//定义一个size用于标记有效数据个数
private int size0;此时我们数组不给定大小我们采用构造方法在初始化时给定数组大小此时和JDK实现的雷同
//构造方法用于初始化消息队列的容量大小
public MyBlockQueue(Integer capacity) {if(capacity0){throw new RuntimeException(队列容量必须大于0.);}informationnew Integer[capacity];
}在队尾插入元素 为了实现消息队列的效果就要与普通队列不一样 在插入元素时 若消息队列满了就等待等到消息队列有空位时就会被唤醒 当插入元素时就唤醒出元素的的操作 其中wait()方法、notifyAll()需要搭配synchronize关键字使用 public void put(int value) throws InterruptedException {//没有位置就要等待if(sizeinformation.length){synchronized (this){this.wait();}}//有位置直接插入元素无需等待if(sizeinformation.length){information[tail]value;tail;size;if(tailinformation.length){tail0;}synchronized (this){this.notifyAll();}}}在队头出元素 为了实现消息队列的效果当消息队列中没有元素时就需要等待直到有元素进来再被唤醒 当出元素时有位置空余出来就可以唤醒要插入的操作 //取出元素public int take() throws InterruptedException {//队列为空需要等待if(size0){synchronized (this){this.wait();}}//有元素直接取出即可Integer valueinformation[head];head;size--;if(headinformation.length){head0;}synchronized (this){this.notifyAll();}return value;}synchronize关键字实现了原子性、在效果上实现了内存可见性但是没有保证有序性所以我们为涉及修改的变量加上volatile关键字 //定义一个消息队列数组private volatile Integer [] informationnull;//定义一个头指针private volatile int head0;//尾指针private volatile int tail0;//定义一个size用于标记有效数据个数private volatile int size0;由于在插入和删除的操作中涉及多个变量的修改我们可以扩大synchronize的范围
此时我们完整的代码如下
public class MyBlockQueue {//定义一个消息队列数组private volatile Integer [] informationnull;//定义一个头指针private volatile int head0;//尾指针private volatile int tail0;//定义一个size用于标记有效数据个数private volatile int size0;//构造方法用于初始化消息队列的容量大小public MyBlockQueue(Integer capacity) {if(capacity0){throw new RuntimeException(队列容量必须大于0.);}informationnew Integer[capacity];}//插入元素public void put(int value) throws InterruptedException {synchronized (this){//没有位置就要等待if(sizeinformation.length){this.wait();}//有位置直接插入元素无需等待if(sizeinformation.length){information[tail]value;tail;size;if(tailinformation.length){tail0;}synchronized (this){this.notifyAll();}}}}//取出元素public int take() throws InterruptedException {synchronized (this){//队列为空需要等待if(size0){synchronized (this){this.wait();}}//有元素直接取出即可Integer valueinformation[head];head;size--;if(headinformation.length){head0;}this.notifyAll();return value;}}
} 此时引发一个新问题假设此时有多个线程要进行put操作但只有一个空余位置会有什么问题吗 所以我们将if判断改成while判断并且在JDK提供的put方法中也是用while
1.4 消息队列代码
public class MyBlockQueue {//定义一个消息队列数组private volatile Integer [] informationnull;//定义一个头指针private volatile int head0;//尾指针private volatile int tail0;//定义一个size用于标记有效数据个数private volatile int size0;//构造方法用于初始化消息队列的容量大小public MyBlockQueue(Integer capacity) {if(capacity0){throw new RuntimeException(队列容量必须大于0.);}informationnew Integer[capacity];}//插入元素public void put(int value) throws InterruptedException {synchronized (this){//此处最好使⽤ while.//否则notifyAll 的时候, 该线程从wait 中被唤醒, 但是紧接着并未抢占到锁.//当锁被抢占的时候, 可能⼜已经队列满了while (sizeinformation.length){this.wait();}information[tail]value;tail;size;if (tailinformation.length){tail0;}this.notifyAll();}}//取出元素public int take() throws InterruptedException {synchronized (this){//队列为空需要等待while (size0){this.wait();}//有元素直接取出即可Integer valueinformation[head];head;size--;if(headinformation.length){head0;}this.notifyAll();return value;}}
}希望得到你的支持谢谢