java 다중 루틴의 생존자와 소비자 (java 프로그래밍 사상)
41735 단어 java 프로그래밍 사상
package Produce;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Meal {
private final int orderNum;
public Meal(int orderNum) {
this.orderNum = orderNum;
}
@Override
public String toString() {
return "Meal " + orderNum;
}
}
class WaitPerson implements Runnable {
private Restaurant restaurant;
public WaitPerson(Restaurant r) {
restaurant = r;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal == null)
wait(); // chef meal
}
System.out.println("Waitperson got " + restaurant.meal);
synchronized (restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll();// chef
}
}
} catch (InterruptedException e) {
System.out.println("WaitPerson interrupted");
}
}
}
class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant r) {
restaurant = r;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal != null)
wait();// meal
}
if (++count == 10) {
System.out.println("Out of food,closing");
restaurant.exec.shutdownNow();// Interrupt
return;// return Order up , return
}
System.out.println("Order up! ");
synchronized (restaurant.waitPerson) {
// notifyAll() waitPerson
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(500);// shutdownNow
}
} catch (InterruptedException e) {
System.out.println("Chef interrupted");
}
}
}
public class Restaurant {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);
public Restaurant() {
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args) {
new Restaurant();
}
}
2.java를 사용합니다.util.concurrent.locks.동기화 작업
Lock 및 Condition 객체는 더 어려운 다중 스레드 문제에서만 필요합니다.
package Produce;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Car {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean waxOn = false;
public void waxed() {
lock.lock();
try {
waxOn = true;// Ready to buff
condition.signalAll();
} finally {
lock.unlock();
}
}
public void buffed() {
lock.lock();
try {
waxOn = false;// Ready for another coat of wax
condition.signalAll();
} finally {
lock.unlock();
}
}
public void waitForWaxing() throws InterruptedException {
lock.lock();
try {
while (waxOn == false)
condition.await();
} finally {
lock.unlock();
}
}
public void waitForBuffing() throws InterruptedException {
lock.lock();
try {
while (waxOn == true)
condition.await();
} finally {
lock.unlock();
}
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car c) {
car = c;
}
@Override
public void run() {
try {
while (!Thread.interrupted())
// while (true) //
{
System.out.println("Wax On!");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch (InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax On task");
}
}
class WaxOff implements Runnable {
private Car car;
public WaxOff(Car c) {
car = c;
}
@Override
public void run() {
try {
while (!Thread.interrupted())
// while (true)
{
car.waitForWaxing();
System.out.println("Wax Off!");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch (InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax Off task");
}
}
public class WaxOMatic2 {
public static void main(String[] args) throws InterruptedException {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
3.동기화 대기열을 통해 구현
package Produce;
public class LiftOff implements Runnable {
protected int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {
}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!")
+ ")";
}
@Override
public void run() {
while (countDown-- > 0) {
System.out.println(status());
Thread.yield();//
}
}
}
package Produce;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.SynchronousQueue;
class LiftOffRunner implements Runnable {
private BlockingQueue<LiftOff> rockets;
public LiftOffRunner(BlockingQueue<LiftOff> queue) {
rockets = queue;
}
public void add(LiftOff lo) {
try {
rockets.put(lo);
} catch (InterruptedException e) {
System.out.println("Interrupted during put()");
}
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
LiftOff rocket = rockets.take();
rocket.run();
}
} catch (InterruptedException e) {
System.out.println("Waking from take()");
}
System.out.println("Exiting LiftOffRunner");
}
}
public class TestBlockingQueues {
static void getkey() {
try {
// Compensate for Windows/Linux difference in the
// length of the result produced by the Enter key:
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch (java.io.IOException e) {
throw new RuntimeException(e);
}
}
static void getkey(String message) {
System.out.println(message);
getkey();
}
static void test(String msg, BlockingQueue<LiftOff> queue) {
System.out.println(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
for (int i = 0; i < 5; i++) {
runner.add(new LiftOff(5));
}
getkey("Press'Enter'(" + msg + ")");
t.interrupt();
System.out.println("Finished " + msg + " test");
}
public static void main(String[] args) {
test("LinkedBlockingQueue", new LinkedBlockingDeque<LiftOff>());//
test("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff>(3));// , ,
test("SynchronousQueue", new SynchronousQueue<LiftOff>());
}
}
4. 일반 Toast의 대기열 인스턴스
package Produce;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
class Toast {
public enum Status {
DRY, BUTTERED, JAMMED
}
private Status status = Status.DRY;
private final int id;
public Toast(int idn) {
id = idn;
}
public void butter() {
status = Status.BUTTERED;
}
public void jam() {
status = Status.JAMMED;
}
public Status getStatus() {
return status;
}
public int getId() {
return id;
}
public String toString() {
return "Toast " + id + ": " + status;
}
}
class ToastQueue extends LinkedBlockingDeque<Toast> {
}
class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random(47);
public Toaster(ToastQueue tq) {
toastQueue = tq;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
Toast t = new Toast(count++);
System.out.println(t);
toastQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("Toaster interrupted");
}
System.out.println("Toaster off");
}
}
class Butterer implements Runnable {
private ToastQueue dryQueue, butteredQueue;
public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butteredQueue = buttered;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
// Blocks until next piece of toast is available
Toast t = dryQueue.take();
t.butter();
System.out.println(t);
butteredQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("Butterer interrupted");
}
System.out.println("Butterer off");
}
}
class Jammer implements Runnable {
private ToastQueue butteredQueue, finishedQueue;
public Jammer(ToastQueue buttered, ToastQueue finished) {
butteredQueue = buttered;
finishedQueue = finished;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = butteredQueue.take();
t.jam();
System.out.println(t);
finishedQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("Jammer interrupted");
}
System.out.println("Jammer off");
}
}
class Eater implements Runnable {
private ToastQueue finishedqQueue;
private int counter = 0;
public Eater(ToastQueue finished) {
finishedqQueue = finished;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = finishedqQueue.take();
if (t.getId() != counter++
|| t.getStatus() != Toast.Status.JAMMED) {
System.out.println(">>>> Error: " + t);
System.exit(1);
} else {
System.out.println("Chomp! " + t);
}
}
} catch (InterruptedException e) {
System.out.println("Eater interrupted");
}
System.out.println("Eater off");
}
}
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(), butteredQueue = new ToastQueue(), finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue, butteredQueue));
exec.execute(new Jammer(butteredQueue, finishedQueue));
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
5.수입 수출 파이프, 기능 유사 생산자 소비자
package Produce;
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();
public PipedWriter getpPipedWriter() {
return out;
}
@Override
public void run() {
try {
while (true) {
for (char c = 'A'; c <= 'z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
}
}
} catch (IOException e) {
System.out.println(e + " Sender write exception");
} catch (InterruptedException e) {
System.out.println(e + " Sender sleep interrupted");
}
}
}
class Receiver implements Runnable {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getpPipedWriter());
}
@Override
public void run() {
try {
while (true) {
System.out.println("Read:" + (char) in.read() + ". ");
}
} catch (IOException e) {
System.out.println(e + " Receiver read exception");
}
}
}
public class PipedIO {
public static void main(String[] args) throws Exception {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep(4);
exec.shutdownNow();
}
}