`

java线程:Wait-And-Notification机制

 
阅读更多

 

Java的每一个对象除了有一个相关的monitor以外(用做synchronized lock),还有一个相关的wait set,用以存放处于WAITING状态的线程

 

  • wait set是线程的集合
  • 当Java对象创建的时候,其wait set是空的。对于wait set操作(将线程加入或移出wait set)都是原子操作
  • 对于wait set的操作(加入或移出),而且只能通过Object.wait,Object.notify,Object.notifyAll这三个操作来进行。当线程执行到Object.wait指令后,就会进入到wait set集合中;而执行到Object.notify,Object.notifyAll指令后,则通知处于wait set中的线程,条件满足了。

 

一、背后的理念

 

  • 一个线程需要某个条件继续执行,并且认为别的线程会创造这个条件。于是这个线程就呆在wait set里面等待
  • 当别的线程创造了这个条件,则通知等待条件的线程

 

二、相关的方法

 

void wait()

 

 

等待条件发生。

 

该方法必须在synchronized方法或synchronized块中呼叫

void wait(long timeout)

 

 

等待条件发生。但和wait()方法不同之处在于,如果在timeout(毫秒)的时间内,通知没有到来,则自己略过条件等待,移出wait set,继续执行。

 

该方法必须在synchronized方法或synchronized块中呼叫

void wait(long timeout,int nanos)

 

 

等待条件发生。观念和wait(long timeout)是一样的。唯一的差别timeout的时间,到了纳秒级别。

 

该方法必须在synchronized方法或synchronized块中呼叫

void notify()

 

 

通知位于wait set中的某一个线程,条件已经发生了。

 

该方法必须在synchronized方法或synchronized块中呼叫

void notifyAll()

 

 

通知位于wait set中的所有线程,条件已经发生了。

 

该方法必须在synchronized方法或synchronized块中呼叫

 

 

三、生产者和消费者模型的实现

第一种:

 

public class ProducerConsumerV6 {

    private List<Integer> list = null;

    private Random random = null;

    private Object lock;

    public ProducerConsumerV6() {
        list = new ArrayList<Integer>();
        random = new Random();
        lock = new Object();
    }

    public void produce() throws InterruptedException {
        synchronized (lock) {
            if (list.size() == 0) {// 这个对象的wait set里面有consumer线程
                lock.notifyAll();// notify() can work
            }
            System.out.print(Thread.currentThread().getName() + "\t\t\t" + list + "\t\t\t");
            int size = list.size();
            for (int i = 0; i < 10 - size; i++) {
                list.add(random.nextInt(1000));
            }
            System.out.println(list);
        }
    }

    /**
     * 这里有个问题,当producer产生的食物总量,比consumer的数量少的时候 则有可能发生发生一场状况
     * 
     * @throws InterruptedException
     */
    public void consume() throws InterruptedException {
        synchronized (lock) {
            while (list.size() == 0) {// 测试竞争条件
                // 注意这里必须用while,当一个consumer线程醒来的时候,还必须
                // 继续检查list是否为空,如果是则继续等待。否则就继续下一条指
                // 令执行
                lock.wait();
            }
            System.out.print(Thread.currentThread().getName() + "\t\t\t" + list + "\t\t\t");
            list.remove(0);
            System.out.println(list);
        }
    }

    public static void main(String[] args) throws Exception {
        ProducerConsumerV6 pc = new ProducerConsumerV6();
        for (int i = 0; i < 50; i++) {
            Thread thread = new Thread(pc.new Consumer());
            thread.setName("Consumer " + i);
            thread.start();
        }

        for (int i = 0; i < 2; i++) {
            Thread thread = new Thread(pc.new Producer());
            thread.setName("Producer " + i);
            thread.start();
        }
    }

    public class Producer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    produce();
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return;
                }
            }
        }
    }

    public class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    consume();
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return;
                }
            }
        }
    }
}
 

第二种:

 

public class ProducerConsumerV7 {

    private List<Integer> list = null;

    private Random random = null;

    public ProducerConsumerV7() {
        list = new ArrayList<Integer>();
        random = new Random();
    }

    public synchronized void produce() throws InterruptedException {
        if (list.size() == 0) {//这个对象的wait set里面有consumer线程
            notifyAll();//notify() can work
        }
        System.out.print(Thread.currentThread().getName() + "\t\t\t" + list + "\t\t\t");
        int size = list.size();
        for (int i = 0; i < 10 - size; i++) {
            list.add(random.nextInt(1000));
        }
        System.out.println(list);
    }
    
    /**
     * 这里有个问题,当producer产生的食物总量,比consumer的数量少的时候
     * 则有可能发生发生一场状况
     * @throws InterruptedException
     */
    public synchronized void consume() throws InterruptedException {
        while(list.size() == 0) {//测试竞争条件
             //注意这里必须用while,当一个consumer线程醒来的时候,还必须
             //继续检查list是否为空,如果是则继续等待。否则就继续下一条指
             //令执行
            wait();  
        }
        System.out.print(Thread.currentThread().getName() + "\t\t\t" + list + "\t\t\t");
        list.remove(0);
        System.out.println(list);
    }

    public static void main(String[] args) throws Exception {
        ProducerConsumerV7 pc = new ProducerConsumerV7();
        for (int i = 0; i < 50; i++) {
            Thread thread = new Thread(pc.new Consumer());
            thread.setName("Consumer " + i);
            thread.start();
        }

        for (int i = 0; i <2; i++) {
            Thread thread = new Thread(pc.new Producer());
            thread.setName("Producer " + i);
            thread.start();
        }
    }

    public class Producer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    produce();
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return;
                }
            }
        }
    }

    public class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    consume();
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return;
                }
            }
        }
    }
}
 

四,其他

 

  • Object.wait方法和Thread.sleep方法的不同之处在于,Object.wait方法需要获得对象的synchronized 锁――即对象的Monitor。Object.wait方法执行时,线程就会进入对象的wait set中,synchronized锁就会被自动释放掉(由JVM内部完成这个操作)。一旦接受到通知,线程从Object.wait中返回之前,必须首先获得synchronized锁。所以Java线程Wait-And-Notification机制,必须有赖于synchronized锁。所以上面表格中所列的几个方法必须在synchronized方法或synchronized块中执行。
  • 当线程从wait方法中唤醒以后,这时已经获取了synchronized锁,就会接着wait指令后面的指令继续执行。
  • notify() 和 notifyAll()方法类似,区别在于notifyAll会“唤醒”处于wait set中的所有的线程,只是这些线程仍需竞争synchronized lock。只有获取了synchronized lock的线程才能继续执行。(似乎这些线程的状态发生了迁移,从WAITING态迁移到了BLOCKED状态),而notify只是唤醒一个线程,但具体是哪个线程由JVM决定,我们自己对此无能为力。

 

参考: 

 

  • http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.2
  • 《Java Threads 3rd Edtion》

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics