# 线程间协作

读书笔记:黄文海.Java多线程编程实战指南(核心篇)(Java多线程编程实战系列)(Kindle位置1032).电子工业出版社.Kindle版本.

面向对象的世界中类不是孤立的,一个类往往需要借助其他类才能完成一个计算。同样,多线程世界中的线程并不是孤立的,一个线程往往需要其他线程的协作才能够完成其待执行的任务。

# 等待与通知——wait/notify

在多线程中,一个线程因其执行目标动作所需的保护条件未满足而被暂停的过程就被称为等待(Wait)。一个线程更新了系统的状态,使得其他线程所需的保护条件得以满足的时候唤醒那些被暂停的线程的过程就被称为通知(Notify)。

# 作用与用法


WAIT

实现伪代码如下:

// 在 调用 wait 方法 前 获得 相应 对象 的 内部 锁 
synchronized(someObject) { 
    while(保护条件不成立){ 
        // 调用Object.wait()暂停当前线程 
        someObject. wait(); 
    } // 代码执行到这里说明保护条件已经满足 
    // 执行目标动作 
    doAction(); 
}

Object.wait()的作用是使其执行线程被暂停(其生命周期状态变更为WAITING),该方法可用来实现等待;Object.notify()的作用是唤醒一个被暂停的线程,调用该方法可实现通知。相应地,Object.wait()的执行线程就被称为等待线程;Object.notify()的执行线程就被称为通知线程。由于Object类是Java中任何对象的父类,因此使用Java中的任何对象都能够实现等待与通知。

一个线程只有在持有一个对象的内部锁的情况下才能够调用该对象的wait方法,因此Object.wait()调用总是放在相应对象所引导的临界区之中。

wait()会以原子操作的方式使其执行线程(当前线程)暂停并使该线程释放其持有的的内部锁。其他线程在该线程所需的保护条件成立的时候执行相应的notify方法,即可唤醒持有该内部锁上任意一个等待线程。被唤醒的等待线程在其占用处理器继续运行的时候,需要再次申请对应的内部锁。申请臣工后继续执行wait()中剩余的指令,直到wait方法返回。

等待线程在其被唤醒、继续运行到其再次持有相应对象的内部锁的时间内,其他线程可能抢先获得相应的内部锁并更新了相关共享变量而导致该线程所需的保护条件又再次不成立,因此wait()调用返回之后我们需要再次判断此时保护条件是否成立。所以,对保护条件的判断以及wait()调用应该放在循环语句之中,以确保目标动作只有在保护条件成立的情况下才能够执行!

Object.wait()暂停当前线程时释放的锁只是与该wait方法所属对象的内部锁。当前线程所持有的其他内部锁、显式锁并不会因此而被释放。


NOTIFY

实现伪代码如下:

synchronized(someObject){ 
    // 更新等待线程的保护条件涉及的共享变量 
    updateSharedState()// 唤醒其他线程 
        someObject.notify(); 
}

一个线程只有在持有一个对象的内部锁的情况下才能够执行该对象的notify方法,因此Object.notify()调用总是放在相应对象内部锁所引导的临界区之中。

Object.notify()的执行线程持有的相应对象的内部锁只有在Object.notify()调用所在的临界区代码执行结束后才会被释放,而Object.notify()本身并不会将这个内部锁释放。

调用Object.notify()所唤醒的线程仅是相应对象上的一个任意等待线程,所以这个被唤醒的线程可能不是我们真正想要唤醒的那个线程。因此,有时候需要借助Object.notify()的兄弟——Object.notifyAll(),它可以唤醒相应对象上的所有等待线程。由于等待线程和通知线程在其实现等待和通知的时候必须是调用同一个对象的wait方法、notify方法,而这两个方法都要求其执行线程必须持有该方法所属对象的内部锁,因此等待线程和通知线程是同步在同一对象之上的两种线程。

# 案例实战

分布式系统有个告警功能模块AlarmAgent类,AlarmAgent内部会维护两个工作者线程:一个工作者线程负责与告警服务器建立网络连接(Socket连接),我们称该线程为网络连接线程;另外一个工作者线程负责定时检测告警代理与告警服务器的网络连接状况,我们称该线程为心跳线程。告警模块还专门维护了一个告警发送线程,该工作者线程通过调用AlarmAgent.sendAlarm(String)将该模块接收到的告警消息上报给告警服务器。

由于告警发送线程执行AlarmAgent.sendAlarm(String)的时候AlarmAgent与告警服务器的网络连接可能尚未建立完毕,或者中途由于一些故障(比如告警服务器宕机)连接已经中断,因此该线程需要等待AlarmAgent与告警服务器的连接完毕或者恢复连接之后才能上报告警消息,否则会导致告警上报失败。我们可以使用wait/notify实现一套等待/通知的机制:告警发送线程在上报告警消息前必须等待,直到AlarmAgent与告警服务器的连接成功建立或者恢复;心跳线程在检测到网络连接恢复之后通知告警发送线程

AlarmAgent源码

/**
 * 告警代理
 *
 * @author Viscent Huang
 */
public class AlarmAgent {
  // 保存该类的唯一实例
  private final static AlarmAgent INSTANCE = new AlarmAgent();
  // 是否连接上告警服务器
  private boolean connectedToServer = false;
  // 心跳线程,用于检测告警代理与告警服务器的网络连接是否正常
  private final HeartbeartThread heartbeatThread = new HeartbeartThread();

  private AlarmAgent() {
    // 什么也不做
  }

  public static AlarmAgent getInstance() {
    return INSTANCE;
  }

  public void init() {
    connectToServer();
    heartbeatThread.setDaemon(true);
    heartbeatThread.start();
  }

  private void connectToServer() {
    // 创建并启动网络连接线程,在该线程中与告警服务器建立连接
    new Thread() {
      @Override
      public void run() {
        doConnect();
      }
    }.start();
  }

  private void doConnect() {
    // 模拟实际操作耗时
    Tools.randomPause(100);
    synchronized (this) {
      connectedToServer = true;
      // 连接已经建立完毕,通知以唤醒告警发送线程
      notify();
    }
  }

  public void sendAlarm(String message) throws InterruptedException {
    synchronized (this) {
      // 使当前线程等待直到告警代理与告警服务器的连接建立完毕或者恢复
      while (!connectedToServer) {
        Debug.info("Alarm agent was not connected to server.");
        wait();
      }
      // 真正将告警消息上报到告警服务器
      doSendAlarm(message);
    }
  }

  private void doSendAlarm(String message) {
    // ...
    Debug.info("Alarm sent:%s", message);
  }

  // 心跳线程
  class HeartbeartThread extends Thread {
    @Override
    public void run() {
      try {
        // 留一定的时间给网络连接线程与告警服务器建立连接
        Thread.sleep(1000);
        while (true) {
          if (checkConnection()) {
            connectedToServer = true;
          } else {
            connectedToServer = false;
            Debug.info("Alarm agent was disconnected from server.");

            // 检测到连接中断,重新建立连接
            connectToServer();
          }
          Thread.sleep(2000);
        }
      } catch (InterruptedException e) {
        // 什么也不做;
      }
    }

    // 检测与告警服务器的网络连接情况
    private boolean checkConnection() {
      boolean isConnected = true;
      final Random random = new Random();

      // 模拟随机性的网络断链
      int rand = random.nextInt(1000);
      if (rand <= 500) {
        isConnected = false;
      }
      return isConnected;
    }
  }
}

Object.wait()的执行线程会一直处于WAITING状态,直到通知线程唤醒该线程并且保护条件成立。因此,Object.wait()所实现的等待是无限等待。Object.wait()方法还有个版本,其声明如下:

public final void wait(long timeout) throws InterruptedException

Object.wait(long)允许我们指定一个超时时间(单位为毫秒)。如果被暂停的等待线程在这个时间内没有被其他线程唤醒,那么Java虚拟机会自动唤醒该线程。不过Object.wait(long)既无返回值也不会抛出特定的异常,以便区分其返回是由于其他线程通知了当前线程还是由于等待超时。因此,使用Object.wait(long)的时候我们需要一些额外的处理。

public class TimeoutWaitExample {
  private static final Object lock = new Object();
  private static boolean ready = false;
  protected static final Random random = new Random();

  public static void main(String[] args) throws InterruptedException {
    Thread t = new Thread() {
      @Override
      public void run() {
        for (;;) {
          synchronized (lock) {
            ready = random.nextInt(100) < 5 ? true : false;
            if (ready) {
              lock.notify();
            }
          }
          // 使当前线程暂停一段(随机)时间
          Tools.randomPause(500);
        }// for循环结束
      }
    };
    t.setDaemon(true);
    t.start();
    waiter(1000);
  }

  public static void waiter(final long timeOut) throws InterruptedException {
    if (timeOut < 0) {
      throw new IllegalArgumentException();
    }

    long start = System.currentTimeMillis();
    long waitTime;
    long now;
    synchronized (lock) {
      while (!ready) {
        now = System.currentTimeMillis();
        // 计算剩余等待时间
        waitTime = timeOut - (now - start);
        Debug.info("Remaining time to wait:%sms", waitTime);
        if (waitTime <= 0) {
          // 等待超时退出
          break;
        }
        lock.wait(waitTime);
      }// while循环结束

      if (ready) {
        // 执行目标动作
        guardedAction();
      } else {
        // 等待超时,保护条件未成立
        Debug.error("Wait timed out,unable to execution target action!");
      }
    }// 同步块结束
  }

  private static void guardedAction() {
    Debug.info("Take some action.");
    // ...
  }
}

每次调用Object.wait(long)之前,我们总是先根据系统当前时间(now)和等待方法被调用的时间(start)计算出剩余的等待时间(waitTime),然后以该时间为参数去调用Object.wait(long)。并且,在执行目标动作前我们会再次判断保护条件(ready==true)是否成立,此时保护条件若仍然不成立,则说明循环语句中的Object.wait(long)的返回是由等待超时导致的。

# 开销及问题

  • 过早唤醒(Wakeuptoosoon)问题。设一组等待/通知线程同步在对象someObject之上,初始状态下所有保护条件均不成立。接着,线程N1更新了共享变量state1使得保护条件1得以成立,此时为了唤醒使用该保护条件的所有等待线程(线程W1和线程W2),N1执行了someObject.notifyAll()。由于someObject.notifyAll()唤醒的是someObject上的所有等待线程,因此这时线程W2也会被唤醒。然而,W2所使用保的护条件2此时并没有成立,这就使得该线程被唤醒之后仍然需要继续等待。这种等待线程在其所需的保护条件并未成立的情况下被唤醒的现象就被称为过早唤醒(Wakeuptoosoon)。过早唤醒使得那些本来无须被唤醒的等待线程也被唤醒了,从而造成资源浪费。

    过早唤醒问题可以利用JDK1.5引入的java.util.concurrent.locks.Condition接口来解决。

image-20210923212558144

  • 信号丢失(MissedSignal)问题。如果等待线程在执行Object.wait()前没有先判断保护条件是否已然成立,那么有可能出现这种情形——通知线程在该等待线程进入临界区之前就已经更新了相关共享变量,使得相应的保护条件成立并进行了通知,但是此时等待线程还没有被暂停,自然也就无所谓唤醒了。这就可能造成等待线程直接执行Object.wait()而被暂停的时候,该线程由于没有其他线程进行通知而一直处于等待状态。这种现象就相当于等待线程错过了一个本来“发送”给它的“信号”,因此被称为信号丢失(MissedSignal)。只要将对保护条件的判断和Object.wait()调用放在一个循环语句之中就可以避免上述场景的信号丢失。信号丢失的另外一个表现是在应该调用Object.notifyAll()的地方却调用了Object.notify()。比如,对于使用同一个保护条件的多个等待线程,如果通知线程在侦测到这个保护条件成立后调用的是Object.notify(),那么这些等待线程最多只有一个线程能够被唤醒,甚至一个也没有被唤醒——被唤醒的线程是Object.notify()所属对象上使用其他保护条件的一个等待线程!也就是说,尽管通知线程在调用Object.notify()前可能考虑(判断)了某个特定的保护条件是否成立,但是Object.notify()本身在其唤醒线程时是不考虑任何保护条件的!这就可能使得通知线程执行Object.notify()进行的通知对于使用相应保护条件的等待线程来说丢失了。这种情形下,避免信号丢失的一个方法是在必要的时候使用Object.notifyAll()来通知。

  • 欺骗性唤醒(SpuriousWakeup)问题。等待线程也可能在没有其他任何线程执行Object.notify()/notifyAll()的情况下被唤醒。这种现象被称为欺骗性唤醒(SpuriousWakeup)。由于欺骗性唤醒的作用,等待线程被唤醒的时候该线程所需的保护条件可能仍然未成立,因为此时没有任何线程对相关共享变量进行过更新。可见,欺骗性唤醒也会导致过早唤醒。欺骗性唤醒虽然在实践中出现的概率非常低,但是由于操作系统是允许这种现象产生的,因此Java平台同样也允许这种现象的存在。欺骗性唤醒是Java平台对操作系统妥协的一种结果。只要我们将对保护条件的判断和Object.wait()调用行放在一个循环语句之中,欺骗性唤醒就不会对我们造成实际的影响。欺骗性唤醒和信号丢失问题的规避方法前文已经提及:将等待线程对保护条件的判断、Object.wait()的调用放在相应对象所引导的临界区中的一个循环语句之中即可。

  • 上下文切换问题。wait/notify的使用可能导致较多的上下文切换。

    • 等待线程执行Object.wait()至少会导致该线程对相应对象内部锁的两次申请与释放。通知线程在执行Object.notify()/notifyAll()时需要持有相应对象的内部锁,因此Object.notify()/notifyAll()调用会导致一次锁的申请。而锁的申请与释放可能导致上下文切换。

    • 等待线程从被暂停到唤醒这个过程本身就会导致上下文切换。

    • 被唤醒的等待线程在继续运行时需要再次申请相应对象的内部锁,此时等待线程可能需要和相应对象的入口集中的其他线程以及其他新来的活跃线程(即申请相应的内部锁且处于RUNNABLE状态的线程)争用相应的内部锁,而这又可能导致上下文切换。

    • 过早唤醒问题也会导致额外的上下文切换,这是因为被过早唤醒的线程仍然需要继续等待,即再次经历被暂停和唤醒的过程。

以下方法有助于避免或者减少wait/notify导致过多的上下文切换。

  • 在保证程序正确性的前提下,使用Object.notify()替代Object.notifyAll()。Object.notify()调用不会导致过早唤醒,因此减少了相应的上下文切换开销。

  • 通知线程在执行完Object.notify()/notifyAll()之后尽快释放相应的内部锁。这样可以避免被唤醒的线程在Object.wait()调用返回前再次申请相应内部锁时,由于该锁尚未被通知线程释放而导致该线程被暂停(以等待再次获得锁的机会)。

# notify和notifyAll的选用