博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java并发阻塞队列
阅读量:4919 次
发布时间:2019-06-11

本文共 4844 字,大约阅读时间需要 16 分钟。

Java 并发编程利用 Condition 来实现阻塞队列
You are here:   - 
文章 发布于 2017年06月26日  阅读 944
 
什么是阻塞队列 BlockingQueue队列是一种数据结构,它的特点是先进先出(First In First Out),它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素。队列在多线程应用中,常用于生产-消费场景。BlockingQueue 是 Java util.concurrent 包下重要的数据结构,BlockingQueue 提供了线程安全的队列访问方式:当阻塞队列进行插入...

什么是阻塞队列 BlockingQueue

队列是一种数据结构,它的特点是先进先出(First In First Out),它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素。队列在多线程应用中,常用于生产-消费场景。

BlockingQueue 是 Java util.concurrent 包下重要的数据结构,BlockingQueue 提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于 BlockingQueue 实现的。

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下: 

BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue,Java.util.concurrent 包下具有以下 BlockingQueue 接口的实现类:

  • ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了。

  • DelayQueue:DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。

  • LinkedBlockingQueue:LinkedBlockingQueue 内部以一个链式结构对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。

  • PriorityBlockingQueue:PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。

  • SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

下面用 BlockQueue 技术来实现一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/** 定义一个盘子类,可以放鸡蛋和取鸡蛋 */  
public 
class 
BigPlate {  
   
    
/** 装鸡蛋的盘子,大小为5 */  
    
private 
BlockingQueue<Object> eggs = 
new 
ArrayBlockingQueue<Object>(
5
);  
       
    
/** 放鸡蛋 */  
    
public 
void 
putEgg(Object egg) {  
        
try 
{  
            
eggs.put(egg);
// 向盘子末尾放一个鸡蛋,如果盘子满了,当前线程阻塞  
        
catch 
(InterruptedException e) {  
            
e.printStackTrace();  
        
}  
   
        
// 下面输出有时不准确,因为与put操作不是一个原子操作  
        
System.out.println(
"放入鸡蛋"
);  
    
}  
       
    
/** 取鸡蛋 */  
    
public 
Object getEgg() {  
        
Object egg = 
null
;  
        
try 
{  
            
egg = eggs.take();
// 从盘子开始取一个鸡蛋,如果盘子空了,当前线程阻塞  
        
catch 
(InterruptedException e) {  
            
e.printStackTrace();  
        
}  
   
        
// 下面输出有时不准确,因为与take操作不是一个原子操作  
        
System.out.println(
"拿到鸡蛋"
);  
        
return 
egg;  
    
}  
       
    
/** 放鸡蛋线程 */  
    
static 
class 
AddThread 
extends 
Thread {  
        
private 
BigPlate plate;  
        
private 
Object egg = 
new 
Object();  
   
        
public 
AddThread(BigPlate plate) {  
            
this
.plate = plate;  
        
}  
   
        
public 
void 
run() {  
            
plate.putEgg(egg);  
        
}  
    
}  
   
    
/** 取鸡蛋线程 */  
    
static 
class 
GetThread 
extends 
Thread {  
        
private 
BigPlate plate;  
   
        
public 
GetThread(BigPlate plate) {  
            
this
.plate = plate;  
        
}  
   
        
public 
void 
run() {  
            
plate.getEgg();  
        
}  
    
}  
       
    
public 
static 
void 
main(String[] args) {  
        
BigPlate plate = 
new 
BigPlate();  
        
// 先启动10个放鸡蛋线程  
        
for
(
int 
i = 
0
; i < 
10
; i++) {  
            
new 
Thread(
new 
AddThread(plate)).start();  
        
}  
        
// 再启动10个取鸡蛋线程  
        
for
(
int 
i = 
0
; i < 
10
; i++) {  
            
new 
Thread(
new 
GetThread(plate)).start();  
        
}  
    
}  
}

 

利用 Condition 来实现阻塞队列

Java 1.5 之后新增了显式锁的接口 java.util.concurrent.locks.Lock 接口,同样提供了显式的条件接口 Condition,并对条件队列进行了增强。

Condition 对象可以提供和 Object 的 wait 和 notify 一样的行为,但是后者必须使用 synchronized 这个内置的monitor锁,而 Condition 使用的是 RenentranceLock 。这两种方式在阻塞等待时都会将相应的锁释放掉,但是 Condition 的等待可以中断,这是二者唯一的区别。

下面就用 Condition 技术来实现一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class 
Buffer {
    
final 
Lock lock = 
new 
ReentrantLock(); 
//定义一个锁
    
final 
Condition notFull = lock.newCondition(); 
//定义阻塞队列满了的Condition
    
final 
Condition notEmpty = lock.newCondition();
//定义阻塞队列空了的Condition
 
    
final 
Object[] items = 
new 
Object[
10
]; 
//为了下面模拟,设置阻塞队列的大小为10,不要设太大
 
    
int 
putptr, takeptr, count; 
//数组下标,用来标定位置的
 
    
//往队列中存数据
    
public 
void 
put(Object x) 
throws 
InterruptedException {
        
lock.lock(); 
//上锁
        
try 
{
            
while 
(count == items.length) {
                
System.out.println(Thread.currentThread().getName() + 
" 被阻塞了,暂时无法存数据!"
);
                
notFull.await();    
//如果队列满了,那么阻塞存数据这个线程,等待被唤醒
            
}
            
//如果没满,按顺序往数组中存
            
items[putptr] = x;
            
if 
(++putptr == items.length) 
//这是到达数组末端的判断,如果到了,再回到始端
                
putptr = 
0
;
            
++count;    
//消息数量
            
System.out.println(Thread.currentThread().getName() + 
" 存好了值: " 
+ x);
            
notEmpty.signal(); 
//好了,现在队列中有数据了,唤醒队列空的那个线程,可以取数据啦
        
finally 
{
            
lock.unlock(); 
//放锁
        
}
    
}
 
    
//从队列中取数据
    
public 
Object take() 
throws 
InterruptedException {
        
lock.lock(); 
//上锁
        
try 
{
            
while 
(count == 
0
) {
                
System.out.println(Thread.currentThread().getName() + 
" 被阻塞了,暂时无法取数据!"
);
                
notEmpty.await();  
//如果队列是空,那么阻塞取数据这个线程,等待被唤醒
            
}
            
//如果没空,按顺序从数组中取
            
Object x = items[takeptr];
            
if 
(++takeptr == items.length) 
//判断是否到达末端,如果到了,再回到始端
                
takeptr = 
0
;
            
--count; 
//消息数量
            
System.out.println(Thread.currentThread().getName() + 
" 取出了值: " 
+ x);
            
notFull.signal(); 
//好了,现在队列中有位置了,唤醒队列满的那个线程,可以存数据啦
            
return 
x;
        
finally 
{
            
lock.unlock(); 
//放锁
        
}
    
}
}

转载于:https://www.cnblogs.com/huojg-21442/p/7249841.html

你可能感兴趣的文章
iOS-回收键盘的几种方法
查看>>
knockoutJS学习笔记09:使用mapping插件
查看>>
API开发之接口安全(二)-----sign校验
查看>>
bzoj 1047 单调队列
查看>>
Windows Phone开发之路(11) 方向处理之动态布局
查看>>
数据分析笔试题
查看>>
Random在高并发下的缺陷以及JUC对其的优化
查看>>
C# 获取文件路径,读取项目中某程序集下文件
查看>>
static关键字
查看>>
Java面向对象之接口interface 入门实例
查看>>
想成为web开发大神?那你应该从拥有良好的代码规范走起!(JavaScript篇 - 第一篇)...
查看>>
node 删除和复制文件或文件夹
查看>>
便捷开发之mybatis逆向工程
查看>>
前端移动端开发总结(Vue)
查看>>
实现一个EventEmitter类,这个类包含以下方法: on/ once/fire/off
查看>>
Javascript 词法分析
查看>>
ConcurrentHashMap1.7和1.8对比
查看>>
分布式系统整理
查看>>
RocketMQ整理
查看>>
Spring框架整理
查看>>