0%

用 Java 中的 wait 和 notify 实现简单的生产者消费者模型

问题定义

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多进程/多线程同步问题的经典案例。

生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。

参考:[维基百科] 生产者消费者问题

Java 简单实现:用 synchronized 锁和 wait、notify方法

实现要点:

  • 用一个 LinkedList 来存放物品(为方便实现,我们用 Integer 表示物品)
  • 用一个 capacity 标记缓存区的大小
  • 用 wait、notify 机制来控制缓存区满时不能继续生产,缓存区为空时不能消费

用 ProductManager 控制生产和消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import java.util.LinkedList;

public class ProducerConsumerWaitNotifyDemo {

public static void main(String[] args) throws InterruptedException {
final ProductManager productManager = new ProductManager(10);

// 创建一个生产者线程
Thread producerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
productManager.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

// 创建一个消费者线程
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
productManager.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

// 开始生产者和消费者线程
producerThread.start();
consumerThread.start();

// 生产者线程先结束,消费者线程后结束
producerThread.join();
consumerThread.join();
}

/**
* 这个类包含一个 LinkedList 用来存放物品,一个 produce 方法用来生产物品,一个 consume 方法用来消费物品。
*/
static class ProductManager {

// 缓存区,用来存放物品
private LinkedList<Integer> productList = new LinkedList<>();
// 缓存区大小
private int capacity;

public ProductManager(int cap) {
this.capacity = cap;
}

/**
* 用来被生产者调用进行生产
*/
public void produce() throws InterruptedException {
int productNum = 0;
while (true) {
synchronized (this) {
// 如果缓存区已满,线程进入等待状态
while (productList.size() == capacity) {
wait();
}

// 生产一个物品
System.out.println("Producer " + Thread.currentThread().getName()
+ " produces product-" + productNum);
productList.add(productNum++);

// 通知消费者线程可以进行消费了
notify();

// 休息一会儿,方便观察结果
Thread.sleep(500);
}
}
}

/**
* 用来被消费者调用进行消费
*/
public void consume() throws InterruptedException {
while (true) {
synchronized (this) {
// 如果缓存区为空,线程进入等待状态
while (productList.isEmpty()) {
wait();
}

// 消费一个物品
int product = productList.removeFirst();
System.out.println("Consumer " + Thread.currentThread().getName()
+ " consume product-" + product);

// 通知生产者线程可以进行生产了
notify();

// 休息一会儿,方便观察结果
Thread.sleep(300);
}
}
}
}
}

参考:[GeeksforGeeks] Producer-Consumer solution using threads in Java

分别创建 Producer 和 Consumer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import java.util.LinkedList;

public class ProducerConsumerWaitNotifyDemo2 {

// 缓存区,用来存放物品
private static LinkedList<Integer> PRODUCT_LIST = new LinkedList<>();
// 缓存区大小
private static int CAPACITY = 10;

public static void main(String[] args) throws InterruptedException {

// 创建一个生产者线程
Thread producerThread = new Thread(new Producer(PRODUCT_LIST, CAPACITY));
// 创建一个消费者线程
Thread consumerThread = new Thread(new Consumer(PRODUCT_LIST));

// 开始生产者和消费者线程
producerThread.start();
consumerThread.start();

// 生产者线程先结束,消费者线程后结束
producerThread.join();
consumerThread.join();
}

static class Producer implements Runnable {
private LinkedList<Integer> productList;
private int capacity;

public Producer(LinkedList<Integer> list, int cap) {
this.productList = list;
this.capacity = cap;
}

@Override
public void run() {
int productNum = 0;
while (true) {
synchronized (productList) {
// 如果缓存区已满,线程进入等待状态
while (productList.size() == capacity) {
try {
productList.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 生产一个物品
System.out.println("Producer " + Thread.currentThread().getName()
+ " produces product-" + productNum);
productList.add(productNum++);

// 通知消费者线程可以进行消费了
productList.notify();

// 休息一会儿,方便观察结果
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

static class Consumer implements Runnable {
private LinkedList<Integer> productList;

public Consumer(LinkedList<Integer> list) {
this.productList = list;
}

@Override
public void run() {
while (true) {
synchronized (productList) {
// 如果缓存区为空,线程进入等待状态
while (productList.isEmpty()) {
try {
productList.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 消费一个物品
Integer product = productList.removeFirst();
System.out.println("Consumer " + Thread.currentThread().getName()
+ " consume product-" + product);

// 通知生产者线程可以进行生产了
productList.notify();

// 休息一会儿,方便观察结果
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}