问题描述
我想执行相当简单的任务。
有2个队列(均具有有限的容量): BlockingQueue<String> source
和BlockingQueue<String> destination
。
线程有两种类型: Producer producer
产生一条消息,并将其存储在BlockingQueue<String> source
。
第二个- Replacer replacer
从源中进行选择,转换一条消息并将其插入到BlockingQueue<String> destination
。
两个问题/问题:
我不确定我是否已正确实现以下要求:如果源不为空且目标未满,则将消息从源传输到目标。
完成我的程序后,还有一个仍在运行的线程-“ Signal Dispatcher”。 如何正确终止? 我的程序无法正常终止。
以下是相关实体的实现:
源/目标队列的实现。
public class BlockingQueueImpl<E> implements BlockingQueue<E> {
private volatile Queue<E> storage = new PriorityQueue<>();
private volatile int capacity;
private volatile int currentNumber;
public BlockingQueueImpl(int capacity) {
this.capacity = capacity;
this.storage = new PriorityQueue<E>(capacity);
}
@Override
public synchronized void offer(E element) {
while (isFull()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currentNumber++;
storage.add(element);
notifyAll();
}
@Override
public synchronized E poll() {
while (isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currentNumber--;
notifyAll();
return storage.poll();
}
@Override
public int size() {
return capacity;
}
public synchronized boolean isFull(){
return currentNumber > capacity;
}
public synchronized boolean isEmpty(){
return currentNumber == 0;
}
}
生产者执行
public class Producer implements Runnable {
BlockingQueue<String> source;
String threadName;
public Producer(BlockingQueue<String> source, String threadName) {
this.source = source;
this.threadName = threadName;
}
@Override
public void run() {
while (!source.isFull()) {
source.offer(Utilities.generateMessage(threadName));
}
}
}
实施消费者
public class Replacer implements Runnable {
BlockingQueue<String> source;
BlockingQueue<String> destination;
String threadName;
public Replacer(BlockingQueue<String> source,
BlockingQueue<String> destination,
String threadName) {
this.source = source;
this.destination = destination;
this.threadName = threadName;
}
public synchronized void replace() {
destination.offer(Utilities.transformMessage(threadName, source.poll()));
}
private boolean isRunning() {
return (!destination.isFull()) && (!source.isEmpty());
}
@Override
public void run() {
while (isRunning()) {
replace();
}
}
}
和助手班
public class Utilities {
public static final int NUMBER_OF_PRODUCER_THREADS = 3;
public static final int NUMBER_OF_REPLACER_THREADS = 1000;
public static final int NUMBER_OF_MESSAGES_TO_READ = 1000;
public static final int STORAGE_CAPACITY = 100;
public static String transformMessage(String threadName, String messageToTransform) {
String[] splittedString = messageToTransform.split(" ");
String newMessage = "Thread #" + threadName + " transferred message " + splittedString[splittedString.length - 1];
return newMessage;
}
public static String generateMessage(String threadName) {
return "Thread #" + threadName + " generated message #" + threadName;
}
public static void spawnDaemonThreads(String threadName,
int numberOfThreadsToSpawn,
BlockingQueue<String> source,
BlockingQueue<String> destination) {
if (destination == null) {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Producer producer = new Producer(source, name);
Thread threadProducer = new Thread(producer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
} else {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Replacer replacer = new Replacer(source, destination, name);
Thread threadProducer = new Thread(replacer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
}
}
}
主班:
public class Main {
public static void main(String[] args) {
BlockingQueue<String> source = new BlockingQueueImpl<>(Utilities.STORAGE_CAPACITY);
BlockingQueue<String> destination = new BlockingQueueImpl<>(Utilities.STORAGE_CAPACITY);
// Create, configure and start PRODUCER threads.
Utilities.spawnDaemonThreads("Producer", Utilities.NUMBER_OF_PRODUCER_THREADS, source, null);
// Create, configure and start REPLACER threads.
Utilities.spawnDaemonThreads("Replacer", Utilities.NUMBER_OF_REPLACER_THREADS, source, destination);
// Read NUMBER_OF_MESSAGES_TO_READ from destination.
for (int i = 1; (i < Utilities.NUMBER_OF_MESSAGES_TO_READ) && !destination.isEmpty(); i++) {
System.out.println(destination.poll());
}
}
}
1楼
这是工作代码。
/**
* Class {@code BlockingQueueImpl} is the implementation of the Blocking Queue.
* This class provides thread-safe operations
* {@code public void offer(E element)} and {@code public E poll()}
*/
public class BlockingQueueImpl<E> implements BlockingQueue<E> {
private volatile Queue<E> storage = new PriorityQueue<>();
private volatile int capacity;
private volatile int currentNumber;
public BlockingQueueImpl(int capacity) {
this.capacity = capacity;
this.storage = new PriorityQueue<E>(capacity);
}
@Override
public synchronized void offer(E element) {
while (isFull()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
storage.add(element);
currentNumber++;
notifyAll();
}
@Override
public synchronized E poll() {
E polledElement;
while (isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
notifyAll();
polledElement = storage.poll();
currentNumber--;
return polledElement;
}
@Override
public int size() {
return capacity;
}
public synchronized boolean isFull(){
return currentNumber >= capacity;
}
public synchronized boolean isEmpty(){
return currentNumber == 0;
}
}
public class Producer implements Runnable {
BlockingQueue<String> source;
String threadName;
public Producer(BlockingQueue<String> source, String threadName) {
this.source = source;
this.threadName = threadName;
}
@Override
public void run() {
while (!source.isFull()) {
source.offer(Utilities.generateMessage(threadName));
}
}
}
public class Replacer implements Runnable {
BlockingQueue<String> source;
BlockingQueue<String> destination;
String threadName;
public Replacer(BlockingQueue<String> source,
BlockingQueue<String> destination,
String threadName) {
this.source = source;
this.destination = destination;
this.threadName = threadName;
}
public synchronized void replace() {
destination.offer(Utilities.transformMessage(threadName, source.poll()));
}
//Continue execution of a thread if a destination is not full and source is not empty.
private boolean isRunning() {
return (!destination.isFull()) && (!source.isEmpty());
}
@Override
public void run() {
while (isRunning()) {
replace();
}
}
}
public class Utilities {
public static final int NUMBER_OF_PRODUCER_THREADS = 3;
public static final int NUMBER_OF_REPLACER_THREADS = 1000;
public static final int NUMBER_OF_MESSAGES_TO_READ = 1000;
public static final int STORAGE_CAPACITY = 100;
public static String transformMessage(String threadName, String messageToTransform) {
String[] splittedString = messageToTransform.split(" ");
String newMessage = "Thread #" + threadName + " transferred message " + splittedString[splittedString.length - 1];
return newMessage;
}
public static String generateMessage(String threadName) {
return "Thread #" + threadName + " generated message #" + threadName;
}
public static void spawnDaemonThreads(String threadName,
int numberOfThreadsToSpawn,
BlockingQueue<String> source,
BlockingQueue<String> destination) {
if (destination == null) {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Producer producer = new Producer(source, name);
Thread threadProducer = new Thread(producer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
} else {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Replacer replacer = new Replacer(source, destination, name);
Thread threadProducer = new Thread(replacer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
}
}
}