您的位置:

java生产者消费者,java生产者消费者模式

本文目录一览:

JAVA怎么编多个生产者多个消费者代码啊

public class ProduceConsumerDemo {

public static void main(String[] args) {

// 1.创建资源

Resource resource = new Resource();

// 2.创建两个任务

Producer producer = new Producer(resource);

Consumer consumer = new Consumer(resource);

// 3.创建线程

/*

* 多生产多消费产生的问题:重复生产、重复消费

*/

Thread thread0 = new Thread(producer);

Thread thread1 = new Thread(producer);

thread0.setName("生产者(NO0)");

thread1.setName("生产者(NO1)");

Thread thread2 = new Thread(consumer);

Thread thread3 = new Thread(consumer);

thread2.setName("消费者(NO2)");

thread3.setName("消费者(NO3)");

thread0.start();

thread1.start();

thread2.start();

thread3.start();

}

}

class Resource {

private String name;

private int count = 1;

// 定义标记

private boolean flag;

// 提供给商品赋值的方法

public synchronized void setName(String name) {// thread0, thread1在这里运行

while (flag)// 判断标记为true,执行wait等待,为false则生产

/*

* 这里使用while,而不使用if的理由如下:

* thread0有可能第二次也抢到锁的执行权,判断为真,则有面包不生产,所以接下来执行等待,此时thread0在线程池中。

* 接下来活的线程有3个(除了thread0),这三个线程都有可能获取到执行权.

* 假设thread1获得了执行权,判断为真,则有面包不生产,执行等待。此时thread1又进入到了线程池中。

* 接下来有两个活的线程thread2和thread3。 假设thread2又抢到了执行权,所以程序转到了消费get处……

*/

try {

this.wait();//这里wait语句必须包含在try/catch块中,抛出异常。

} catch (InterruptedException e) {

e.printStackTrace();

}

this.name = name + count;// 第一个面包

count++;// 2

System.out.println(Thread.currentThread().getName() + this.name);// thread0线程生产了面包1

// 生产完毕,将标记改成true.

flag = true;// thread0第一次生产完面包以后,将标记改为真,表示有面包了

// 唤醒消费者(这里使用notifyAll而不使用notify的原因在下面)

this.notifyAll();// 第一次在这里是空唤醒,没有意义

}

/*

* 通过同步,解决了没生产就消费的问题

* 生产完以后,生产者释放了this锁,此时,生产者和消费者同时去抢锁,又是生产者抢到了锁,所以就出现了一直生产的情况。

* 与“生产一个就消费一个的需求不符合” 等待唤醒机制 wait();该方法可以使线程处于冻结状态,并将线程临时存储到线程池

* notify();唤醒指定线程池中的任意一个线程。 notifyAll();唤醒指定线程池中的所有线程

* 这些方法必须使用在同步函数中,因为他们用来操作同步锁上的线程上的状态的。

* 在使用这些方法时候,必须标识他们所属于的锁,标识方式就是锁对象.wait(); 锁对象.notify(); 锁对象.notifyAll();

* 相同锁的notify()可以获取相同锁的wait();

*/

public synchronized void getName() {// thread2,thread3在这里运行

while (!flag)

/*

* ……接着上面的程序执行分析 thread2拿到锁获取执行权之后,判断!flag为假,则不等待,直接消费面包1,输出一次.

* 消费完成之后将flag改为假 接下来又唤醒了thread0或者thread1生产者中的一个

* 假设又唤醒了thread0线程,现在活的线程有thread0,thread2,thread3三个线程

* 假设接下来thread2又抢到了执行权,判断!flag为真,没面包了,停止消费,所以thread2执行等待.

* 此时活着的线程有thread0和thread3。

* 假设thread3得到了执行权,拿到锁之后进来执行等待,此时活着的线程只有thread0.

* 所以thread0只能抢到执行权之后,生产面包2,将标记改为true告诉消费者有面包可以消费了。

* 接下来执行notify唤醒,此时唤醒休眠中的3个线程中的任何一个都有可能。

* 如果唤醒了消费者thread2或者thread3中的任何一个,程序都是正常。如果此时唤醒thread1则不正常。

* 如果唤醒了thread1,此时活着的线程有thread0和thread1两个线程。

* 假设thread0又获得了执行权,判读为真有面包,则又一次执行等待。

* 接下来只有thread1线程有执行权(此时没有判断标记直接生产了,出错了),所以又生产了面包3。 在这个过程中,面包2没有被消费。

* 这就是连续生产和消费容易出现的问题。

* 原因:被唤醒的线程没有判断标记就开始执行了,导致了重复的生产和消费发生。

* 解决:被唤醒的线程必须判断标记,使用while循环标记,而不使用if判断的理由。

* 但是接下来会出现死锁,原因在于:

* 上面的程序中thread0在执行notify的时候唤醒了thread1,而此时thread2和thread3两个消费者线程都处于等待状态

* thread1在执行while判断语句之后判断为真,则执行等待,此时所有的线程都处于冻结等待状态了。

* 原因:本方线程在执行唤醒的时候又一次唤醒了本方线程,而本方线程循环判断标记又继续等待,而导致所有的线程都等待。

* 解决:本方线程唤醒对方线程, 可以使用notifyAll()方法

*  唤醒之后,既有本方,又有对方,但是本方线程判断标记之后,会继续等待,这样就有对方线程在执行。

*/

try {

this.wait();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(Thread.currentThread().getName()  + this.name);

// 将标记改为false

flag = false;

// 唤醒生产者

this.notify();

}

}

// 生产者

class Producer implements Runnable {

private Resource resource;

public Producer(Resource resource) {

this.resource = resource;

}

public void run() {

while (true) {

resource.setName("面包");

}

}

}

// 消费者

class Consumer implements Runnable {

private Resource resource;

public Consumer(Resource resource) {

this.resource = resource;

}

@Override

public void run() {

while (true) {

resource.getName();

}

}

}

JAVA模拟生产者与消费者实例

使用的生产者和消费者模型具有如下特点:

(1)本实验的多个缓冲区不是环形循环的,也不要求按顺序访问。生产者可以把产品放到目前某一个空缓冲区中。

(2)消费者只消费指定生产者的产品。

(3)在测试用例文件中指定了所有的生产和消费的需求,只有当共享缓冲区的数据满足了所有关于它的消费需求后,此共享缓冲区才可以作为空闲空间允许新的生产者使用。

(4)本实验在为生产者分配缓冲区时各生产者间必须互斥,此后各个生产者的具体生产活动可以并发。而消费者之间只有在对同一产品进行消费时才需要互斥,同时它们在消费过程结束时需要判断该消费对象是否已经消费完毕并清除该产品。

Windows

用来实现同步和互斥的实体。在Windows

中,常见的同步对象有:信号量(Semaphore)、

互斥量(Mutex)、临界段(CriticalSection)和事件(Event)等。本程序中用到了前三个。使用这些对象都分

为三个步骤,一是创建或者初始化:接着请求该同步对象,随即进入临界区,这一步对应于互斥量的

上锁;最后释放该同步对象,这对应于互斥量的解锁。这些同步对象在一个线程中创建,在其他线程

中都可以使用,从而实现同步互斥。当然,在进程间使用这些同步对象实现同步的方法是类似的。

1.用锁操作原语实现互斥

为解决进程互斥进人临界区的问题,可为每类临界区设置一把锁,该锁有打开和关闭两种状态,进程执行临界区程序的操作按下列步骤进行:

①关锁。先检查锁的状态,如为关闭状态,则等待其打开;如已打开了,则将其关闭,继续执行步骤②的操作。

②执行临界区程序。

③开锁。将锁打开,退出临界区。

2.信号量及WAIT,SIGNAL操作原语

信号量的初值可以由系统根据资源情况和使用需要来确定。在初始条件下信号量的指针项可以置为0,表示队列为空。信号量在使用过程中它的值是可变的,但只能由WAIT,SIGNAL操作来改变。设信号量为S,对S的WAIT操作记为WAIT(S),对它的SIGNAL操作记为SIGNAL(S)。

WAIT(S):顺序执行以下两个动作:

①信号量的值减1,即S=S-1;

②如果S≥0,则该进程继续执行;

如果

S(0,则把该进程的状态置为阻塞态,把相应的WAITCB连人该信号量队列的末尾,并放弃处理机,进行等待(直至其它进程在S上执行SIGNAL操作,把它释放出来为止)。

SIGNAL(S):顺序执行以下两个动作

①S值加

1,即

S=S+1;

②如果S)0,则该进程继续运行;

如果S(0则释放信号量队列上的第一个PCB(既信号量指针项所指向的PCB)所对应的进程(把阻塞态改为就绪态),执行SIGNAL操作的进程继续运行。

在具体实现时注意,WAIT,SIGNAL操作都应作为一个整体实施,不允许分割或相互穿插执行。也就是说,WAIT,SIGNAL操作各自都好像对应一条指令,需要不间断地做下去,否则会造成混乱。

从物理概念上讲,信号量S)时,S值表示可用资源的数量。执行一次WAIT操作意味着请求分配一个单位资源,因此S值减1;当S0时,表示已无可用资源,请求者必须等待别的进程释放了该类资源,它才能运行下去。所以它要排队。而执行一次SIGNAL操作意味着释放一个单位资源,因此S值加1;若S(0时,表示有某些进程正在等待该资源,因而要把队列头上的进程唤醒,释放资源的进程总是可以运行下去的。

---------------

/**

*

生产者

*

*/

public

class

Producer

implements

Runnable{

private

Semaphore

mutex,full,empty;

private

Buffer

buf;

String

name;

public

Producer(String

name,Semaphore

mutex,Semaphore

full,Semaphore

empty,Buffer

buf){

this.mutex

=

mutex;

this.full

=

full;

this.empty

=

empty;

this.buf

=

buf;

this.name

=

name;

}

public

void

run(){

while(true){

empty.p();

mutex.p();

System.out.println(name+"

inserts

a

new

product

into

"+buf.nextEmptyIndex);

buf.nextEmptyIndex

=

(buf.nextEmptyIndex+1)%buf.size;

mutex.v();

full.v();

try

{

Thread.sleep(1000);

}

catch

(InterruptedException

e)

{

e.printStackTrace();

}

}

}

}

---------------

/**

*

消费者

*

*/

public

class

Customer

implements

Runnable{

private

Semaphore

mutex,full,empty;

private

Buffer

buf;

String

name;

public

Customer(String

name,Semaphore

mutex,Semaphore

full,Semaphore

empty,Buffer

buf){

this.mutex

=

mutex;

this.full

=

full;

this.empty

=

empty;

this.buf

=

buf;

this.name

=

name;

}

public

void

run(){

while(true){

full.p();

mutex.p();

System.out.println(name+"

gets

a

product

from

"+buf.nextFullIndex);

buf.nextFullIndex

=

(buf.nextFullIndex+1)%buf.size;

mutex.v();

empty.v();

try

{

Thread.sleep(1000);

}

catch

(InterruptedException

e)

{

e.printStackTrace();

}

}

}

}

-------------------------

/**

*

缓冲区

*

*/

public

class

Buffer{

public

Buffer(int

size,int

nextEmpty,int

nextFull){

this.nextEmptyIndex

=

nextEmpty;

this.nextFullIndex

=

nextFull;

this.size

=

size;

}

public

int

size;

public

int

nextEmptyIndex;

public

int

nextFullIndex;

}

-----------------

/**

*

此类用来模拟信号量

*

*/

public

class

Semaphore{

private

int

semValue;

public

Semaphore(int

semValue){

this.semValue

=

semValue;

}

public

synchronized

void

p(){

semValue--;

if(semValue0){

try

{

this.wait();

}

catch

(InterruptedException

e)

{

e.printStackTrace();

}

}

}

public

synchronized

void

v(){

semValue++;

if(semValue=0){

this.notify();

}

}

}

------------------------

public

class

Test

extends

Thread

{

public

static

void

main(String[]

args)

{

Buffer

bf=new

Buffer(10,0,0);

Semaphore

mutex=new

Semaphore(1);

Semaphore

full=new

Semaphore(0);

Semaphore

empty=new

Semaphore(10);

//new

Thread(new

Producer("p001",mutex,full,empty,bf)).start();

Producer

p=new

Producer("p001",mutex,full,empty,bf);

new

Thread(new

Producer("p002",mutex,full,empty,bf)).start();

new

Thread(new

Producer("p003",mutex,full,empty,bf)).start();

new

Thread(new

Producer("p004",mutex,full,empty,bf)).start();

new

Thread(new

Producer("p005",mutex,full,empty,bf)).start();

try{

sleep(3000);

}

catch(Exception

ex)

{

ex.printStackTrace();

}

new

Thread(new

Customer("c001",mutex,full,empty,bf)).start();

new

Thread(new

Customer("c002",mutex,full,empty,bf)).start();

new

Thread(new

Customer("c003",mutex,full,empty,bf)).start();

new

Thread(new

Customer("c004",mutex,full,empty,bf)).start();

new

Thread(new

Customer("c005",mutex,full,empty,bf)).start();

}

}

--------------------------------------------

BlockingQueue 使用(生产者-消费者)

java.util.concurrent包中的Java BlockingQueue接口表示一个线程安全的队列,可以放入并获取实例。

在这篇文章中,我会告诉你如何使用这个BlockingQueue。

本文将不讨论如何在Java中实现BlockingQueue。如果您对此感兴趣,在我的偏理论的 Java并发教程 中有一个关于阻塞队列的文章。

BlockingQueue通常用于使线程产生对象,而另一线程则使用该对象。这是一张阐明这一原理的图表。

生产线程将持续生产新对象并将它们插入队列,直到队列达到它可以包含的上限。换句话说,这是极限。如果阻塞队列达到其上限,则会在尝试插入新对象时阻塞生产线程。在消耗线程将对象带出队列之前,它一直处于阻塞状态。

消费线程不断将对象从阻塞队列中取出,并对其进行处理。如果消费线程试图将对象从空队列中取出,则消费线程将被阻塞,直到生成的线程将对象放入队列。

BlockingQueue有4种不同的方法来插入、删除和检查队列中的元素。每一组方法的行为都是不同的,以防被请求的操作不能立即执行。下面是这些方法的一个表:

这四种不同的行为方式意思

无法将null插入到BlockingQueue中。如果尝试插入null,则BlockingQueue将引发NullPointerException。

也可以访问BlockingQueue中的所有元素,而不仅仅是开始和结束处的元素。例如,假设你需要处理队列中的一个对象,但你的应用程序决定不处理它。然后你可以调用remove(o)删除队列中的特定对象。但是,这并不是非常有效,所以除非你真的需要,否则不应该使用这些Collection方法

由于BlockingQueue是一个接口,因此您需要使用它的一个实现来使用它。java.util.concurrent包具有以下BlockingQueue接口(在Java 6中)的实现:

这是一个Java BlockingQueue示例。该示例使用BlockingQueue接口的ArrayBlockingQueue实现。

首先,BlockingQueueExample类在不同的线程中启动生产者和消费者。生产者将字符串插入共享BlockingQueue中,消费者将它们取出。

这是生产者类。注意它在每个put()调用之间的使用sleep。这将导致消费者在等待队列中的对象时阻塞。

这是消费者类。

它只是从队列中取出对象,并将它们打印到System.out。

[原文]( )