您的位置:

Java多线程实现生产者消费者模式

一、生产者消费者模式概述

生产者消费者模式是多线程编程中的经典问题之一,它的核心在于解决生产者与消费者的同步、通信和互斥问题。在这个问题中,一个或多个线程充当生产者的角色,生成一些产品,而另外一个或多个线程则充当消费者的角色,消费这些产品。

在这种情况下,生产者和消费者必须要匹配并能够相互通信,而他们之间的同步有时应该受到限制,这才可能保证程序的正常运行。如果没有正确地实现同步,可能会导致生产出来的产品被重复消费或者消费者尝试消费还未生产出来的产品。

二、Java中的生产者消费者模式

在Java中,我们可以通过多线程技术实现生产者消费者模式。使用多线程技术可以更有效地将任务分配给不同的线程,使程序更高效地运行。

三、生产者消费者模式的实现

1. 生产者消费者模式的基本实现

下面是一个基本的Java程序,演示了如何使用多线程来实现生产者消费者模式:

public class ProducerConsumerExample {
    public static void main(String[] args) {
        Queue<Integer> queue = new LinkedList<>();
        int maxSize = 10;
        Thread producer = new Producer(queue, maxSize, "Producer");
        Thread consumer = new Consumer(queue, "Consumer");
        producer.start();
        consumer.start();
    }
}

class Producer extends Thread {
    private Queue<Integer> queue;
    private int maxSize;
    private String name;

    public Producer(Queue<Integer> queue, int maxSize, String name) {
        this.queue = queue;
        this.maxSize = maxSize;
        this.name = name;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (queue) {
                while (queue.size() == maxSize) {
                    try {
                        System.out.println("Queue is full, " + name + " is waiting ...");
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                int number = (int) (Math.random() * 100);
                System.out.println(name + " produced " + number);
                queue.add(number);
                queue.notifyAll();
            }
        }
    }
}

class Consumer extends Thread {
    private Queue<Integer> queue;
    private String name;

    public Consumer(Queue<Integer> queue, String name) {
        this.queue = queue;
        this.name = name;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (queue) {
                while (queue.isEmpty()) {
                    try {
                        System.out.println("Queue is empty, " + name + " is waiting ...");
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                int number = queue.poll();
                System.out.println(name + " consumed " + number);
                queue.notifyAll();
            }
        }
    }
}

在这个示例中,我们定义了一个Queue对象(用于存储数字),一个Producer对象和一个Consumer对象。Producer和Consumer都是Thread的子类。

在Producer中,我们使用while循环来一直生成数字,并加到queue中。如果队列已经满了,线程则等待。在Consumer中,我们使用while循环一直从队列中取出数字并消费。如果队列为空,线程则等待。

2. 使用BlockingQueue实现生产者消费者模式

Java中提供了BlockingQueue来实现生产者消费者模式。使用BlockingQueue可以避免显式地使用wait()和notify()方法,因为BlockingQueue内部已经实现了这些操作。

下面是使用BlockingQueue实现生产者消费者模式的示例代码:

public class ProducerConsumerExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        Thread producer = new Producer(queue, "Producer");
        Thread consumer = new Consumer(queue, "Consumer");
        producer.start();
        consumer.start();
    }
}

class Producer extends Thread {
    private BlockingQueue<Integer> queue;
    private String name;

    public Producer(BlockingQueue<Integer> queue, String name) {
        this.queue = queue;
        this.name = name;
    }

    @Override
    public void run() {
        while (true) {
            try {
                int number = (int) (Math.random() * 100);
                queue.put(number);
                System.out.println(name + " produced " + number);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer extends Thread {
    private BlockingQueue<Integer> queue;
    private String name;

    public Consumer(BlockingQueue<Integer> queue, String name) {
        this.queue = queue;
        this.name = name;
    }

    @Override
    public void run() {
        while (true) {
            try {
                int number = queue.take();
                System.out.println(name + " consumed " + number);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在这个示例中,我们使用BlockingQueue来实现生产者消费者模式。我们创建了一个ArrayBlockingQueue,这是一个由数组支持的有界阻塞队列。当队列为空时,take()操作会阻塞线程。当队列满时,put()操作也会阻塞线程。

3. 使用ReentrantLock和Condition实现生产者消费者模式

另外一种实现方式是使用ReentrantLock和Condition。在Java 5中引入的新特性,ReentrantLock是一种可重入的互斥锁,它替代了synchronized关键字,Condition是一个条件对象,它替代了wait()和notify()方法。使用ReentrantLock和Condition可以实现更细粒度的加锁操作,从而更好地控制线程并发。

下面是ReentrantLock和Condition实现生产者消费者模式的示例代码:

public class ProducerConsumerExample {
    public static void main(String[] args) {
        Queue<Integer> queue = new LinkedList<>();
        int maxSize = 10;
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        Thread producer = new Producer(queue, maxSize, "Producer", lock, condition);
        Thread consumer = new Consumer(queue, "Consumer", lock, condition);
        producer.start();
        consumer.start();
    }
}

class Producer extends Thread {
    private Queue<Integer> queue;
    private int maxSize;
    private String name;
    private ReentrantLock lock;
    private Condition condition;

    public Producer(Queue<Integer> queue, int maxSize, String name, ReentrantLock lock, Condition condition) {
        this.queue = queue;
        this.maxSize = maxSize;
        this.name = name;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                while (queue.size() == maxSize) {
                    System.out.println("Queue is full, " + name + " is waiting ...");
                    condition.await();
                }
                int number = (int) (Math.random() * 100);
                System.out.println(name + " produced " + number);
                queue.add(number);
                condition.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}

class Consumer extends Thread {
    private Queue<Integer> queue;
    private String name;
    private ReentrantLock lock;
    private Condition condition;

    public Consumer(Queue<Integer> queue, String name, ReentrantLock lock, Condition condition) {
        this.queue = queue;
        this.name = name;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                while (queue.isEmpty()) {
                    System.out.println("Queue is empty, " + name + " is waiting ...");
                    condition.await();
                }
                int number = queue.poll();
                System.out.println(name + " consumed " + number);
                condition.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}

在这个示例中,我们创建了一个Queue对象、一个ReentrantLock对象和一个Condition对象。在Producer和Consumer中都使用了lock()方法获取锁。如果队列已经满了(在Producer中)或为空(在Consumer中),线程会调用await()方法等待。当新产品被生产出来或消费者取走一个产品时,线程会调用signalAll()方法进行唤醒。

四、总结

以上是Java中实现生产者消费者模式的三种方法,包括基本实现、BlockingQueue实现和ReentrantLock和Condition实现。每种方法都有自己的优点和适用场景,根据具体情况选择合适的方法进行实现。