/ Java  

Java Multithreading 9: Producer and consumer problem

In this blog, producer/consumer problem will be discussed. The contents include:

  1. Producer/Consumer Model
  2. Producer/Consumer implementation

Producer/Consumer Model

The producer/consumer problem is a very typical multi-threaded problem. The objects involved include producer, consumer, warehouse and product. The relationship between them is as follows:

  1. The producer only produces when the warehouse is not full, and stops production when the warehouse is full.
  2. Consumers can only consume when there is a product in the warehouse, and wait when the warehouse is empty.
  3. When the consumer finds that there is no product in the warehouse for consumption, he will notify the producer to produce it.
  4. When producing consumable products, producers should notify consumers who are waiting to consume.

Producer/Consumer implementation

Let’s look at how to implement it using wait() and notify()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class Warehouse {
// the capacity of the warehouse
private final int capacity;
// number of items in the warehouse
private int items;

public Warehouse(int capacity) {
this.capacity = capacity;
this.items = 0;
}

// producer produce products
public synchronized void produce(int val) {
try {
int left = val;

while (left > 0) {
// warehouse is full, wait items to be consumed
while (items >= capacity) {
wait();
}

// get how many items to be produced
int numberToBeProduced = (items + left) > capacity ? (capacity - items) : left;

items += numberToBeProduced;
left -= numberToBeProduced;

System.out.printf("%s produce(%3d) --> left=%3d, numberToBeProduced=%3d, size=%3d\n", Thread.currentThread().getName(), val, left, numberToBeProduced, items);

// notify consumer to consume the product
notifyAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// consumer consume products
public synchronized void consume(int val) {
try {
int left = val;

while (left > 0) {
// warehouse is empty, wait items to be produced
while (items <= 0) {
wait();
}

// get how many items to be produced
int numberToBeConsumed = items < left ? items : left;

items -= numberToBeConsumed;
left -= numberToBeConsumed;

System.out.printf("%s consume(%3d) <-- left=%3d, numberToBeConsumed=%3d, size=%3d\n", Thread.currentThread().getName(), val, left, numberToBeConsumed, items);

notifyAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public String toString() {
return "capacity:" + capacity + ", actual size:" + items;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public class Producer {
private final Warehouse warehouse;

public Producer(Warehouse warehouse) {
this.warehouse = warehouse;
}

// new thread to produce products
public void produce(final int val) {
new Thread(() -> warehouse.produce(val)).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public class Consumer {
private final Warehouse warehouse;

public Consumer(Warehouse warehouse) {
this.warehouse = warehouse;
}

// new thread to consume products
public void consume(final int val) {
new Thread(() -> warehouse.consume(val)).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Main {
public static void main(String[] args) {
Warehouse warehouse = new Warehouse(100);

Producer producer = new Producer(warehouse);
Consumer consumer = new Consumer(warehouse);

producer.produce(60);
producer.produce(120);
consumer.consume(90);
consumer.consume(150);
producer.produce(110);
}
}

Results:

1
2
3
4
5
6
7
8
9
Thread-0 produce( 60) --> left=  0, numberToBeProduced= 60, size= 60
Thread-4 produce(110) --> left= 70, numberToBeProduced= 40, size=100
Thread-3 consume(150) <-- left= 50, numberToBeConsumed=100, size= 0
Thread-1 produce(120) --> left= 20, numberToBeProduced=100, size=100
Thread-3 consume(150) <-- left= 0, numberToBeConsumed= 50, size= 50
Thread-1 produce(120) --> left= 0, numberToBeProduced= 20, size= 70
Thread-2 consume( 90) <-- left= 20, numberToBeConsumed= 70, size= 0
Thread-4 produce(110) --> left= 0, numberToBeProduced= 70, size= 70
Thread-2 consume( 90) <-- left= 0, numberToBeConsumed= 20, size= 50
  1. Producer is a producer class, which is associated with a warehouse. When the producer() method of the producer is called, it will create a new thread and produce the product in the warehouse.
  2. Consumer is a consumer class, which is associated with a warehouse. When the consume() method of the consumer is called, it will create a new thread and consume the products in the warehouse.
  3. Warehouse is a warehouse class, which records capacity of the warehouse and current number of products in the warehouse.

The produce() and consume() methods of the warehouse are both synchronized methods. Entering the synchronized method means that this thread has acquired the synchronization lock of the warehouse object. This means that only one producer and consumer thread can run at a time.

For the production method produce(): when the warehouse is full, the producer thread waits, and it needs to wait for the consumer to consume the product before the producer thread can produce. After the producer thread produces the product, it will wake up all threads wait on the synchronized lock through notifyAll(), including consumer threads, which we call notify consumers to consume.

For the consumption method consume(): when the warehouse is empty, the consumer thread waits and needs to wait for the producer to produce the product before the consumer thread can consume. After the consumer thread consumes the product, it will wake up all threads the synchronization lock through notifyAll(), including producer threads, which we call notify producers to produce.