问题定义
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多进程/多线程同步问题的经典案例。
生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。
参考:[维基百科] 生产者消费者问题
Java 简单实现:用 synchronized 锁和 wait、notify方法
实现要点:
- 用一个 LinkedList 来存放物品(为方便实现,我们用 Integer 表示物品)
- 用一个 capacity 标记缓存区的大小
- 用 wait、notify 机制来控制缓存区满时不能继续生产,缓存区为空时不能消费
用 ProductManager 控制生产和消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| import java.util.LinkedList;
public class ProducerConsumerWaitNotifyDemo {
public static void main(String[] args) throws InterruptedException { final ProductManager productManager = new ProductManager(10);
Thread producerThread = new Thread(new Runnable() { @Override public void run() { try { productManager.produce(); } catch (InterruptedException e) { e.printStackTrace(); } } });
Thread consumerThread = new Thread(new Runnable() { @Override public void run() { try { productManager.consume(); } catch (InterruptedException e) { e.printStackTrace(); } } });
producerThread.start(); consumerThread.start();
producerThread.join(); consumerThread.join(); }
static class ProductManager {
private LinkedList<Integer> productList = new LinkedList<>(); private int capacity;
public ProductManager(int cap) { this.capacity = cap; }
public void produce() throws InterruptedException { int productNum = 0; while (true) { synchronized (this) { while (productList.size() == capacity) { wait(); }
System.out.println("Producer " + Thread.currentThread().getName() + " produces product-" + productNum); productList.add(productNum++);
notify();
Thread.sleep(500); } } }
public void consume() throws InterruptedException { while (true) { synchronized (this) { while (productList.isEmpty()) { wait(); }
int product = productList.removeFirst(); System.out.println("Consumer " + Thread.currentThread().getName() + " consume product-" + product);
notify();
Thread.sleep(300); } } } } }
|
参考:[GeeksforGeeks] Producer-Consumer solution using threads in Java
分别创建 Producer 和 Consumer 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| import java.util.LinkedList;
public class ProducerConsumerWaitNotifyDemo2 {
private static LinkedList<Integer> PRODUCT_LIST = new LinkedList<>(); private static int CAPACITY = 10;
public static void main(String[] args) throws InterruptedException {
Thread producerThread = new Thread(new Producer(PRODUCT_LIST, CAPACITY)); Thread consumerThread = new Thread(new Consumer(PRODUCT_LIST));
producerThread.start(); consumerThread.start();
producerThread.join(); consumerThread.join(); }
static class Producer implements Runnable { private LinkedList<Integer> productList; private int capacity;
public Producer(LinkedList<Integer> list, int cap) { this.productList = list; this.capacity = cap; }
@Override public void run() { int productNum = 0; while (true) { synchronized (productList) { while (productList.size() == capacity) { try { productList.wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
System.out.println("Producer " + Thread.currentThread().getName() + " produces product-" + productNum); productList.add(productNum++);
productList.notify();
try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
static class Consumer implements Runnable { private LinkedList<Integer> productList;
public Consumer(LinkedList<Integer> list) { this.productList = list; }
@Override public void run() { while (true) { synchronized (productList) { while (productList.isEmpty()) { try { productList.wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
Integer product = productList.removeFirst(); System.out.println("Consumer " + Thread.currentThread().getName() + " consume product-" + product);
productList.notify();
try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
|