多线程
推荐阅读:嘟嘟独立博客
推荐阅读:Java多线程锁机制
推荐阅读:40个Java多线程问题总结
问:进程和线程的区别是什么?
进程是资源分配的基本单位,而线程是CPU调度的基本单位。
一个进程可以有多个线程,多个线程共享进程的资源,线程又叫做轻量级进程。
问:线程的几种状态
(1) 新建(new):新创建了一个线程对象。
(2) 可运行(runnable):线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法,该状态的线程便位于可运行线程池中,等待被线程调度选中,获取cpu的使用权。
(3) 运行(running):可运行状态(runnable)的线程获得了cpu时间片(timeslice),执行程序代码。
(4) 阻塞(block):阻塞状态是指线程因为某种原因放弃了cpu使用权,也即让出了cpu timeslice,暂时停止运行。直到线程进入可运行(runnable)状态,才有机会再次获得cpu timeslice转到运行(running)状态。阻塞的情况分三种:
等待阻塞:运行(running)的线程执行o.wait()方法,JVM会把该线程放入等待队列(waitting queue)中。
同步阻塞:运行(running)的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池(lock pool)中。
其他阻塞: 运行(running)的线程执行Thread.sleep(long ms)或t.join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。
当sleep()状态超时、 join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入可运行(runnable)状态。
(5) 死亡(dead):线程run()、 main()方法执行结束,或者因异常退出了run()方法,则该线程结束生命周期。死亡的线程不可再次复生。
例1:此题有疑问,感觉下面这句话是不对的。
Java线程调度算法是平台独立的。
问:创建线程有几种不同的方式?你喜欢哪一种?为什么?
(1) 继承Thread类,重写run()方法
public class ThreadTest1 {
public static void main(String[] args){
MyThread thread=new MyThread("T1");
thread.start();
System.out.println("The main thread is running.");
}
}
class MyThread extends Thread{
private String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("The thread "+this.name+" is running.");
}
}
(2) 实现Runnable接口,重写run()方法
public class ThreadTest2 {
public static void main(String[] args){
Thread thread=new Thread(new MyThread2("T2"));
thread.start();
System.out.println("The main thread is running.");
}
}
class MyThread2 implements Runnable{
private String name;
public MyThread2(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("The thread "+this.name+" is running.");
}
}
(3) 使用ExecutorService、Callable、Future实现有返回结果的多线程,实现Callable接口时,该接口中的call方法可以在线程执行结束时产生一个返回值。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) throws Exception {
List<Future<Integer>> list = new ArrayList<>();
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
list.add(service.submit(new Task((int) (Math.random() * 100))));
}
int sum = 0;
for (Future<Integer> future : list) {
sum += future.get();
}
System.out.println(sum);
}
}
class Task implements Callable<Integer> {
private int upperBounds;
public Task(int upperBounds) {
this.upperBounds = upperBounds;
}
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= upperBounds; i++) {
sum += i;
}
return sum;
}
}
实现Runnable接口这种方式更受欢迎。在应用设计中线程类已经继承了别的类的情况下,需要多继承,只能实现接口(而Java不支持多继承,却支持实现多个接口),这样就不再需要继承Thread类,避免单继承的局限。同时,线程池也是非常高效的,很容易实现和使用。
问:start()和run()
无论是通过继承Thread类还是实现Runnable接口来创建线程,都必须调用start()方法启动线程。线程启动后,线程进入就绪状态,当CPU分配时间给它时,它才开始运行,并不是一调用start()方法线程就立即运行。
run()方法是线程关联的需要执行的代码。直接调用run()方法,就相当于是普通的方法调用,会在主线程中直接运行,此时没有开启一个线程。
问:Java四种线程池的使用
在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在Java中更是如此,虚拟机试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这就是”池化资源”技术产生的原因。
线程池顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候从池中获取线程而不用自行创建,使用完毕后不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销。要配置一个线程池是比较复杂的,尤其是在对线程池的原理不是很清楚的情况下,Java 5后的工具类Executors提供了一些静态工厂方法,可以生成一些常用的线程池。
合理的使用线程池,可以带来很多好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行;
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
应用程序可以使用Executor/Callable/Future框架来创建线程池。
(1) ExecutorService java.util.concurrent.Executors.newSingleThreadExecutor()
创建一个单线程的线程池,这个线程池只有一个线程执行任务,也就相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NewSingleThreadExecutor {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
// 顺序执行各个任务,依次输出结果
(2) ExecutorService java.util.concurrent.Executors.newFixedThreadPool(int nThreads)
创建一个含有固定数量线程的线程池,可控制最大并发任务数,超出的任务会在队列中等待。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NewFixedThreadPool {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
// 因为线程池大小为3,每个线程输出index后sleep 2秒,所以每隔两秒打印3个数字
(3) ExecutorService java.util.concurrent.Executors.newCachedThreadPool()
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程;当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println(index);
}
});
}
}
}
(4) ScheduledExecutorService java.util.concurrent.Executors.newScheduledThreadPool(int corePoolSize)
创建一个含有固定数量线程的线程池,支持延时任务或周期性任务的执行。
执行延时任务的示例代码:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class NewScheduledThreadPool1 {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
}
}
// 表示延迟3秒执行任务
执行定期性任务的示例代码:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class NewScheduledThreadPool2 {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
}
}
// 表示延迟1秒后,每3秒执行一次任务
问:同步和异步
同步:上一段代码没完成,下一段必须等到上一段代码完成后才可以执行。例如:两个线程共享数据,要求一个线程读取另一个线程写的数据,此时必须要求两个线程同步执行。
异步:上一段代码没完成,下一段不必等到上一段代码完成就可以执行。例如:当程序在某一个对象上调用了一个需要很长时间才能执行完的方法,如果我们不希望让程序等待方法的返回,此时可以使用异步编程。
问:synchronized关键字
推荐阅读:嘟嘟独立博客
在Java中,每一个对象都拥有一个锁标记,也称为监视器。一旦一个方法或一个代码块被synchronized修饰,那么这个部分就放入了监视器的监视区域,确保在同一时刻最多只有一个线程执行该部分代码,线程在获取锁之前不允许执行该部分的代码。
java实现进程之间的同步执行采用的机制是:监视器
当两个并发线程访问同一个对象的synchronized方法或代码块时,两个线程间是互斥的,在同一时刻只能有一个线程得到执行,另一个线程被阻塞,因为在执行synchronized方法或代码块的线程会锁定当前对象,只有在当前线程执行完这些代码并释放该对象的锁时,下一个线程才能锁定并执行该对象的synchronized方法或代码块。
(1) 当一个线程访问对象的一个synchronized方法或代码块时,另一个线程仍然可以访问该对象的非synchronized方法或代码块。(两个线程使用的是同一个对象)
(2) 当一个线程访问对象的一个synchronized方法或代码块时,其他线程对该对象的所有其它synchronized方法或代码块的访问将被阻塞(同上,两个线程使用的是同一个对象)。
例2:
当一个线程进入一个对象的一个synchronized方法后,其它线程是否可进入此对象的其它方法?
答:
(1) 其它线程可以进入该对象的非synchronized方法;
(2) 如果这个synchronized方法内部调用了wait(),则其它线程可以进入此对象的其他synchronized方法;
如果这个synchronized方法内部没有调用wait(),则其它线程不可以进入该对象的其他synchronized方法。
如果其他方法是static方法(当然它不属于对象,而是属于类),它用的同步锁是当前类的字节码,与非静态的方法不能同步,其它线程可以进入这些方法。
例3:看评论
public class HelloSogou{
public static synchronized void main(String[] a){
Thread t=new Thread(){
public void run(){
Sogou();
}
};
t.run();
System.out.print("Hello");
}
static synchronized void Sogou(){
System.out.print("Sogou");
}
}
上面JAVA程序的输出是:SogouHello
(3) 如果父类中的某个方法使用了synchronized关键字,而子类中覆盖了这个方法,默认情况下子类中的这个方法并不是同步的,必须显示的在子类的这个方法中加上synchronized关键字之后该方法才同步。当然,也可以在子类中调用父类中相应的方法,这样虽然子类中的方法并不是同步的,但子类调用了父类中的同步方法,也就相当子类方法也同步了。
例4:判断正误
构造方法不需要同步化。(正确)
一个子类不可以覆盖掉父类的同步方法。(错误)
问:同步方法和同步代码块的区别是什么?
Java允许多线程并发控制,当多个线程同时操作一个可共享的资源变量时(增删改查),将会导致数据的不准确,相互之间产生冲突。因此加入了同步锁,以避免在该线程没有结束前,其他线程操作该资源变量,从而保证了变量的唯一性,准确性。
Java中每个对象都有一把锁,线程可以通过synchronized关键字来获取对象上的锁。
(1) 同步方法(粗粒度锁):由synchronized关键字修饰的方法
修饰一般方法:public synchronized void method(){…},获取的是当前被调用对象的锁
修饰静态方法:public static synchronized void method(){…},获取当前类的字节码对象上的锁
举例:如果一个线程执行一个对象的非static的synchronized方法,另外一个线程执行这个对象所属类的static的synchronized方法,此时不会发生互斥现象,因为访问static的synchronized方法占用的是类的字节码对象锁,而访问非static的synchronized方法占用的是对象锁,所以不存在互斥现象。
public class SychronizedTest {
public static void main(String[] args){
SynObj obj=new SynObj();
Thread01 thread01=new Thread01("Thread01", obj);
Thread01 thread02=new Thread01("Thread02", obj);
Thread02 thread03=new Thread02("Thread03");
Thread02 thread04=new Thread02("Thread04");
thread01.start();
thread02.start();
thread03.start();
thread04.start();
}
}
class Thread01 extends Thread{
private String threadName;
private SynObj obj;
public Thread01(String threadName, SynObj obj) {
this.threadName=threadName;
this.obj = obj;
}
@Override
public void run() {
this.obj.run1(this.threadName);
}
}
class Thread02 extends Thread{
private String threadName;
public Thread02(String threadName) {
this.threadName=threadName;
}
@Override
public void run() {
SynObj.run2(this.threadName);
}
}
class SynObj{
// 同步方法
public synchronized void run1(String threadName){
for(int i=0; i<3; i++){
try {
Thread.sleep(1000);
System.out.println(threadName+" run1 i="+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 同步static方法
public static synchronized void run2(String threadName){
for(int i=0; i<3; i++){
try {
Thread.sleep(1000);
System.out.println(threadName+" run2 i="+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Thread01和Thread02之间存在互斥关系,Thread03和Thread03之间存在互斥关系,但是Thread01/Thread02和Thread03/Thread04之间不存在互斥关系。
一种可能的输出:
Thread01 run1 i=0
Thread03 run2 i=0
Thread03 run2 i=1
Thread01 run1 i=1
Thread03 run2 i=2
Thread01 run1 i=2
Thread02 run1 i=0
Thread04 run2 i=0
Thread04 run2 i=1
Thread02 run1 i=1
Thread02 run1 i=2
Thread04 run2 i=2
(2) 同步代码块(细粒度锁):: 由synchronized关键字修饰的语句块,同步代码块可以选择以什么语句块来加锁,比同步方法要更细颗粒度,我们可以选择只同步会发生同步问题的部分代码而不是整个方法
synchronized(obj) {…},同步代码块可以指定获取某个对象上的锁。
public class SychronizedTest {
public static void main(String[] args){
SynObj synObj1=new SynObj();
SynObj synObj2=new SynObj();
Obj obj1=new Obj();
Obj obj2=new Obj();
Thread01 thread01=new Thread01("Thread01", synObj1);
Thread01 thread02=new Thread01("Thread02", synObj1);
Thread01 thread03=new Thread01("Thread03", synObj2);
Thread02 thread04=new Thread02("Thread04", synObj1, obj1);
Thread02 thread05=new Thread02("Thread05", synObj1, obj1);
Thread02 thread06=new Thread02("Thread06", synObj2, obj2);
thread01.start();
thread02.start();
thread03.start();
thread04.start();
thread05.start();
thread06.start();
}
}
class Thread01 extends Thread{
private String threadName;
private SynObj obj;
public Thread01(String threadName, SynObj obj) {
this.threadName=threadName;
this.obj = obj;
}
@Override
public void run() {
this.obj.run1(this.threadName);
}
}
class Thread02 extends Thread{
private String threadName;
private SynObj synObj;
private Obj obj;
public Thread02(String threadName, SynObj synObj, Obj obj) {
this.threadName=threadName;
this.synObj=synObj;
this.obj=obj;
}
@Override
public void run() {
this.synObj.run2(this.threadName, this.obj);
}
}
class SynObj{
public void run1(String threadName){
// 同步代码块,锁住的是对象本身
synchronized(this){
for(int i=0; i<3; i++){
try {
Thread.sleep(1000);
System.out.println(threadName+" run1 i="+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void run2(String threadName, Obj obj){
// 同步代码块,锁住的是对象obj
synchronized(obj){
for(int i=0; i<3; i++){
try {
Thread.sleep(1000);
System.out.println(threadName+" run1 i="+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
class Obj{}
Thread01和Thread02之间存在互斥关系,但是Thread01/Thread02和Thread03之间不存在互斥关系。
注释掉thread04,thread05和thread06的启动代码
//thread04.start();
//thread05.start();
//thread06.start();
一种可能的输出:
Thread01 run1 i=0
Thread03 run1 i=0
Thread01 run1 i=1
Thread03 run1 i=1
Thread01 run1 i=2
Thread03 run1 i=2
Thread02 run1 i=0
Thread02 run1 i=1
Thread02 run1 i=2
Thread04和Thread05之间存在互斥关系,但是Thread04/Thread05和Thread06之间不存在互斥关系。
注释掉thread01,thread02和thread03的启动代码
//thread01.start();
//thread02.start();
//thread03.start();
一种可能的输出:
Thread04 run1 i=0
Thread06 run1 i=0
Thread06 run1 i=1
Thread04 run1 i=1
Thread04 run1 i=2
Thread06 run1 i=2
Thread05 run1 i=0
Thread05 run1 i=1
Thread05 run1 i=2
将SynObj的run2方法的被锁对象换成obj.getClass()之后,此时Thread04、Thread05和Thread06之间存在互斥关系。因此Synchronized后面括号括起来的部分为synchronized(className.class),作用的对象是这个类的所有对象
public void run2(String threadName, Obj obj){
// 同步代码块,锁住的是对象obj
synchronized(obj.getClass()){
for(int i=0; i<3; i++){
try {
Thread.sleep(1000);
System.out.println(threadName+" run1 i="+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
一种可能的输出为:
Thread04 run1 i=0
Thread04 run1 i=1
Thread04 run1 i=2
Thread06 run1 i=0
Thread06 run1 i=1
Thread06 run1 i=2
Thread05 run1 i=0
Thread05 run1 i=1
Thread05 run1 i=2
问:synchronized和java.util.concurrent.locks.Lock的异同
Lock是Java 5以后引入的新的API。
相同点:Lock能完成synchronized所实现的所有功能
不同点:
- Lock有比synchronized更精确的线程语义和更好的性能。
- synchronized会自动释放锁,而Lock一定要求程序员手工释放,并且必须在finally从句中释放。
- Lock还有更强大的功能,例如,它的tryLock()方法可以以非阻塞方式去拿锁。
例1:请写出程序,实现如下功能:子线程循环10次,接着主线程循环100,接着又回到子线程循环10次,接着再回到主线程又循环100,如此循环50次。
方法一:使用synchronized
public class Test {
public static boolean isMain=false;
public static void main(String[] args){
Object object=new Object();
Thread thread=new Thread(new Runnable(){
@Override
public void run() {
for(int i=0; i<50; i++){
synchronized(object){
if(isMain){
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for(int k=0; k<10; k++){
System.out.println(Thread.currentThread().getName()+", i="+i+", k="+k);
}
isMain=true;
object.notify();
}
}
}
});
thread.start();
for(int i=0; i<50; i++){
synchronized (object){
if(!isMain){
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for(int j=0; j<100; j++){
System.out.println(Thread.currentThread().getName()+", i="+i+", j="+j);
}
isMain=false;
object.notify();
}
}
}
}
方法二:使用并发库
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Test {
private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static boolean isMain = false;
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 50; i++) {
lock.lock();
try {
if (isMain) {
condition.await();
}
for (int k = 0; k < 10; k++) {
System.out.println(Thread.currentThread().getName() + ", i=" + i + ", k=" + k);
}
isMain = true;
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
});
thread.start();
for (int i = 0; i < 50; i++) {
lock.lock();
try {
if (!isMain) {
condition.await();
}
for (int j = 0; j < 100; j++) {
System.out.println(Thread.currentThread().getName() + ", i=" + i + ", j=" + j);
}
isMain = false;
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
问:Java支持并发编程的同步机制
JDK提供的用于并发编程的同步器有:Semaphore、CyclicBarrier、CountDownLatch
CyclicBarrier让一组线程等待其他线程;CountDownLatch让一组线程等待某个事件发生
Callable类的call()方法可以返回值和抛出异常
线程调用start()方法后进行就绪状态,等待获取CPU的使用权
CopyOnWriteArrayList适合于读操作远远大于写操作的场景里,比如缓存。
ReadWriteLock 当写操作时,其他线程无法读取或写入数据,而当读操作时,其它线程无法写入数据,但却可以读取数据。适用于读取远远大于写入的操作。
ConcurrentHashMap是一个线程安全的HashTable,它的主要功能是提供了一组和HashTable功能相同但是线程安全的方法。
ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁。
问:wait、notify、notifyAll
- wait()
(1) 该方法用来让当前线程进入休眠状态,直到被唤醒或被中断为止。
(2) 在调用wait()之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用wait()方法。如果调用wait()时,线程没有持有适当的锁,则抛出IllegalMonitorStateException,它是RuntimeException的一个子类,因此,不需要try-catch结构。
(3) 进入wait()方法后,当前线程释放锁。在从 wait()返回前,线程与其他线程竞争重新获得锁。
- notify()
(1) 该方法也要在同步方法或同步块中调用,即在调用前,线程也必须要获得该对象的对象级别锁,如果调用notify()时线程没有持有适当的锁,也会抛出IllegalMonitorStateException。
(2) 该方法用来唤醒那些可能等待该对象的对象锁的其他线程。如果有多个线程等待,该方法并不能确切的唤醒某一个等待的线程,线程调度器任意挑选出其中一个wait()状态的线程来发出通知,并使它等待获取该对象的对象锁(notify后,发出通知的当前线程不会马上释放该对象锁,wait所在的线程并不能马上获取该对象锁,要等到程序退出synchronized代码块后,当前线程才会释放锁,wait所在的线程也才可以获取该对象锁),但不惊动其他同样在等待被该对象notify的线程们。
(3) 当第一个获得了该对象锁的wait线程运行完毕以后,它会释放掉该对象锁,此时如果该对象没有再次使用notify 语句,则即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,会继续阻塞在wait状态,直到这个对象发出一个notify或notifyAll。这里需要注意:它们等待的是被notify或notifyAll,而不是锁。这与下面的 notifyAll()方法执行后的情况不同。
- notifyAll()
该方法与 notify ()方法的工作方式相同,重要的一点差异是:
notifyAll 使所有原来在该对象上 wait 的线程统统退出 wait 的状态(即全部被唤醒,不再等待notify或notifyAll,但由于此时还没有获取到该对象锁,因此还不能继续往下执行),开始等待获取该对象上的锁,一旦该对象锁被释放(notifyAll线程退出调用了notifyAll的synchronized代码块的时候),他们就会去竞争。如果其中一个线程获得了该对象锁,它就会继续往下执行,在它退出synchronized代码块,释放锁后,其他的已经被唤醒的线程将会继续竞争获取该锁,一直进行下去,直到所有被唤醒的线程都执行完毕。
总结:如果线程调用了对象的 wait()方法,那么线程便会处于该对象的等待池中,等待池中的线程不会去竞争该对象的锁。当有线程调用了对象的 notifyAll()方法(唤醒所有 wait 线程)或 notify()方法(只随机唤醒一个 wait 线程),被唤醒的的线程便会进入该对象的锁池中,锁池中的线程会去竞争该对象锁。优先级高的线程竞争到对象锁的概率大,假若某线程没有竞争到该对象锁,它还会留在锁池中,唯有线程再次调用wait()方法,它才会重新回到等待池中。而竞争到对象锁的线程则继续往下执行,直到执行完了 synchronized 代码块,它会释放掉该对象锁,这时锁池中的线程会继续竞争该对象锁。
案例:当前线程调用对象的notify后,当前线程不会马上释放该对象锁,wait所在的线程并不能马上获取该对象锁,要等到程序退出synchronized代码块后,当前线程才会释放锁,wait所在的线程也才可以获取该对象锁。
public static void main(String[]args)throws Exception {
final Object obj = new Object();
Thread t1 = new Thread() {
public void run() {
synchronized (obj) {
try {
obj.wait();
System.out.println("Thread 1 wake up.");
} catch (InterruptedException e) {
}
}
}
};
t1.start();
Thread.sleep(1000);//We assume thread 1 must start up within 1 sec.
Thread t2 = new Thread() {
public void run() {
synchronized (obj) {
obj.notifyAll();
System.out.println("Thread 2 sent notify.");
}
}
};
t2.start();
}
output:
Thread 2 sent notify.
Thread 1 wake up
例1:找出代码中错误的地方。
void waitForSignal(){
Object obj = new Object();
synchronized(Thread.currentThread()){
obj.wait();
obj.notify();
}
}
第一个错误是wait()方法要以try/catch包覆,或是掷出InterruptedException才行
第二个错误是wait或者notify方法被调用的对象必须与synchronized中的对象一致,否则会有IllegalMonitorStateException
可用来实现线程间通知和唤醒的方式:
Object.wait/notify/notifyAll
Condition.await/signal/signalAll
问:sleep()方法和wait()方法的区别
sleep():该方法让线程休眠指定的时间,让出CPU,给其他线程执行的机会。当这个时间达到之后,线程会再次醒来,进入就绪状态。它是Thread类一个静态方法,调用此方法需要捕捉InterruptedException异常。
wait():该方法用来让当前线程进入休眠状态,直到被唤醒或被中断为止。
(1) sleep()是Thread类的方法,wait()是Object类的方法
(2) wait(),notify()和notifyAll()只能在同步方法或同步代码块中使用,而sleep()可以在任何地方使用
(3) 调用sleep()方法后,线程进入睡眠状态,但不会释放对象锁,休眠时间结束后会自动进入就绪状态;调用wait()方法后,线程会释放对象锁,进入此对象的等待池(wait pool)中,让出CPU,给其他线程执行的机会,直到其他线程调用对象的notify()方法(或notifyAll()方法)时才能唤醒等待池中的线程进入等锁池(lock pool),准备获得对象锁,如果线程重新获得对象的锁就可以进入就绪状态。
例1:sleep()和wait()的对比
问:sleep()方法和yield()方法的区别
调用线程yield方法会让当前线程交出CPU权限,让CPU去执行其他的线程。它跟sleep方法类似,同样不会释放锁。但是yield不能控制交出CPU的具体时间,另外,yield方法只能让拥有相同优先级的线程有获取CPU执行时间的机会。
注意,调用yield方法并不会让线程进入阻塞状态,而是让线程重回就绪状态,它只需要等待重新获取CPU执行时间,这一点是和sleep方法不一样的。
sleep()方法和yield()方法的区别:
(1) sleep()方法给其他线程运行机会时不考虑线程的优先级,因此会给低优先级的线程以运行的机会;yield()方法只会给相同优先级或更高优先级的线程以运行的机会;
(2) 线程执行sleep()方法后转入阻塞(blocked)状态,而执行yield()方法后转入就绪(ready)状态;
(3) sleep()方法声明抛出InterruptedException,而yield()方法没有声明任何异常;
(4) sleep()方法比yield()方法(跟操作系统CPU调度相关)具有更好的可移植性。
问:suspend()和resume()方法
Java.Thread的方法resume()负责重新开始被以下哪个方法中断的线程的执行()?
A、stop B、sleep C、wait D、suspend
正确答案:D
suspend()和resume()方法:两个方法配套使用,suspend()使线程进入阻塞状态,并且不会自动恢复,必须其对应的resume()被调用,才能使得线程重新进入可执行状态。
问:join()方法
方法join()的作用是让调用join()方法的线程等待被调用线程结束,再继续往下执行。
问:共享变量在线程间的可见性
要实现共享变量的可见性,必须要保证亮点:
- 被线程修改后的共享变量的值能够被及时从该线程的工作内存中刷新至主内存中
- 其他线程能够及时把共享变量的最新值从主内存中更新到自己的工作内存中
问:重排序
为了提高程序的性能,编译器或处理器会优化程序的代码,使得代码的书写顺序和实际执行的顺序有所不同,这便是指令重排序。通常存在以下三种重排序:
- 编译器优化的重排序(编译器优化)
- 指令级并行重排序(处理器优化)
- 内存系统的重排序(处理器优化)
问:as-if-serial语义
无论编译器或处理器如何对代码重排序,程序执行的结果应该与代码顺序执行的结果一致。(Java编译器和处理器会保证Java在单线程下遵循as-if-serial语义)
问:Java在语言层面实现共享变量在线程间的可见性的方式
(1) 导致共享变量在线程间不可见的原因:
- 线程的交叉执行
- 重排序
- 共享变量更新后的值在工作内存和主内存之间没有得到及时更新
(2) Java在语言层面实现共享变量在线程间的可见性的方式
synchronized和volatile
(3) synchronized实现共享变量在线程间的可见性
JMM对synchronized的两条规定:(注意:加锁和解锁使用的需要是同一把锁)
- 线程解锁前,必须把共享变量的最新值刷新到主内存中
- 线程加锁时,将清空工作内存中共享变量的值,从而在使用共享变量时需要从主内存中重新读取共享变量的最新值
线程解锁前对共享变量的修改在下次加锁时对其他线程可见。
问:volatile关键字
推荐阅读:嘟嘟独立博客
volatile是java中的一个关键字,可以用来修饰被不同线程访问和修改的变量。
出于运行速率的考虑,Java编译器会把经常经常访问的变量放到缓存(严格讲应该是工作内存)中,读取变量则从缓存中读。但是在多线程编程中,内存中的值和缓存中的值可能会出现不一致。volatile用于限定变量只能从内存中读取,保证对所有线程而言,值都是一致的。volatile变量在执行写操作时,会在写操作之后加入一条store屏障指令,强制缓存中变量的值立刻被写入到内存中;volatile变量在之心读操作时,会在读操作之前就爱如一条load屏障指令,强制将变量的值从内存中更新到缓存中。但是volatile不能保证原子性,也就不能保证线程安全。
一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,那么就具备了两层语义:
1)可见性:保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
2)禁止进行指令重排序:volatile只提供了保证访问该变量时,每次都是从内存中读取最新值,并不会使用寄存器缓存该值,每次都会从内存中读取。
volatile实现可见性
线程写volatile变量的过程:
(1) 改变线程工作内存中volatile变量副本的值
(2) 将改变后的副本的值从工作内存刷新到主内存
线程读volatile变量的过程:
(1) 从主内存中读取volatile变量的最新值到线程的工作内存
(2) 从工作内存中读取volatile变量的副本
volatile不能保证volatile变量相关操作的原子性
volatile的使用场景
要在多线程中安全地使用volatile变量,必须同时满足两个条件:
(1) 对变量的写入操作不依赖于其当前值
- 满足:boolean变量,记录温度变化的变量等
- 不满足:number++,count=count*5等
(2) 该变量不包含在具有其他volatile变量的不变式中
- 不满足:不变式low<up,其中up和low为volatile变量
问:保证原子性的方法
(1) 使用synchronized关键字
(2) 使用java.util.concurrent.locks包中的ReentrantLock
(3) 使用java.util.concurrent.aotmic包中的AtomicInteger
问:ThreadLocal
ThreadLocal类用于创建一个线程本地变量,让每一个线程都维护自己独有的变量拷贝。
在Thread中有一个成员变量ThreadLocals,该变量的类型是ThreadLocalMap,也就是一个Map,它的键是threadLocal,值为就是变量的副本。ThreadLocal为每一个使用该变量的线程都提供了一个变量值的副本,每一个线程都可以独立地改变自己的副本,是线程隔离的。通过ThreadLocal的get()方法可以获取该线程变量的本地副本,在get方法之前要先set,否则就要重写initialValue()方法。
ThreadLocal不是用来在多线程之间共享数据,而是让变量在每个线程中都有独立拷贝,使得不会出现一个线程读取变量时,该变量而被另一个线程修改的现象。
由于每个线程在访问该变量时,读取和修改的,都是自己独有的那一份变量拷贝,不会被其他线程访问,变量被彻底封闭在每个访问的线程中
问:什么是死锁(deadlock)?
两个线程或两个以上线程因争夺资源而都在等待对方执行完毕才能继续往下执行的时候就发生了死锁。结果就是若无外力作用,这些线程都陷入了无限的等待中。
问:如何确保N个线程可以访问N个资源同时又不导致死锁?
使用多线程的时候,一种非常简单的避免死锁的方式就是:指定访问资源的顺序,并强制线程按照指定的顺序获取锁。因此,如果所有的线程都是以同样的顺序加锁和释放锁,就不会出现死锁了。
问:生产者和消费者模型
模型描述:有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者可以从仓库中取走产品。解决生产者和消费者问题的核心在于要保证同一资源被多个线程并发访问时的完整性。一般采用信号量或加锁机制解决。下面介绍Java中解决生产者和消费者问题的三种主要方式:
public class ProducerConsumer {
public static void main(String[] args) {
Storage storage = new Storage();
ExecutorService taskSubmit = Executors.newFixedThreadPool(10);
// 给定2个消费者
taskSubmit.submit(new Consumer("consumer1", storage, 50));
taskSubmit.submit(new Consumer("consumer2", storage, 10));
// 给定3个生产者
taskSubmit.submit(new Producer("producer1", storage, 60));
taskSubmit.submit(new Producer("producer2", storage, 10));
taskSubmit.submit(new Producer("producer3", storage, 20));
}
}
生产者线程:
class Producer implements Runnable {
private String producerName;
private Storage storage;
private int num;// 每次生产数量
public Producer(String name, Storage storage, int num) {
this.producerName = name;
this.storage = storage;
this.num = num;
}
@Override
public void run() {
while(true){
storage.produce(this.producerName, num);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者线程:
class Consumer implements Runnable {
private String consumerName;
private Storage storage;
private int num;// 每次消费数量
public Consumer(String name, Storage storage, int num) {
this.consumerName = name;
this.storage = storage;
this.num = num;
}
@Override
public void run() {
while(true){
storage.consume(this.consumerName, num);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
(1) wait()、notify()/notifyAll()
class Storage {
private static final int MAX_SIZE = 100;// 仓库的最大容量
private List<Object> data = new ArrayList<Object>();// 存储载体
/**
* 生产操作
*/
public synchronized void produce(String producer, int num) {
while (data.size() + num > MAX_SIZE) {// 如果生产这些产品将超出仓库的最大容量,则生产操作阻塞
System.out.println(producer+"生产操作-->数量:" + num + ",超出仓库容量,生产阻塞!------库存:" + data.size());
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 到这里,表示可以正常生产产品
for (int i = 0; i < num; i++) {// 生产num个产品
data.add(new Object());
}
System.out.println(producer+"生产操作-->数量:" + num + ",成功入库~------库存:" + data.size());
// 生产完产品后,唤醒其他等待消费的线程
notify();
}
/**
* 消费操作
*/
public synchronized void consume(String consumer, int num) {
while (data.size() - num < 0) {// 如果产品数量不足
System.out.println(consumer+"消费操作-->数量:" + num + ",库存不足,消费阻塞!------库存:" + data.size());
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 到这里,表示可以正常消费
for (int i = 0; i < num; i++) {// 消费num个产品
data.remove(0);
}
System.out.println(consumer+"消费操作-->数量:" + num + ",消费成功~------库存:" + data.size());
// 消费完产品后,唤醒其他等待生产的线程
notify();
}
}
(2) await()、signal()/signalAll()
JDK 1.5之后引入了concurrent包,可以使用await()和signal()方法来做同步,功能和wait()和notify()方法相同,可以完全取代它们,但await()和signal()需要和Lock机制结合使用,更加灵活。和第一种方法类似,可以通过调用Lock的newCondition()方法依次获取两个条件变量,一个针对仓库空的,一个针对仓库满,通过添加变量进行同步控制。
class Storage {
private static final int MAX_SIZE = 100;// 仓库的最大容量
private List<Object> data = new ArrayList<Object>();// 存储载体
private Lock lock = new ReentrantLock();// 可重入锁
private Condition full = lock.newCondition();// 仓库满的条件变量
private Condition empty = lock.newCondition();// 仓库空时的条件变量
/**
* 生产操作
*/
public void produce(String producer, int num) {
lock.lock(); // 加锁
while (data.size() + num > MAX_SIZE) {// 如果生产这些产品将超出仓库的最大容量,则生产操作阻塞
System.out.println(producer + "生产操作-->数量:" + num + ",超出仓库容量,生产阻塞!------库存:" + data.size());
try {
full.await(); // 阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 到这里,表示可以正常生产产品
for (int i = 0; i < num; i++) {// 生产num个产品
data.add(new Object());
}
System.out.println(producer + "生产操作-->数量:" + num + ",成功入库~------库存:" + data.size());
// 生产完产品后,唤醒其他等待消费的线程
empty.signalAll();
lock.unlock(); // 释放锁
}
/**
* 消费操作
*/
public void consume(String consumer, int num) {
lock.lock(); // 加锁
while (data.size() - num < 0) {// 如果产品数量不足
System.out.println(consumer + "消费操作-->数量:" + num + ",库存不足,消费阻塞!------库存:" + data.size());
try {
empty.await(); // 阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 到这里,表示可以正常消费
for (int i = 0; i < num; i++) {// 消费num个产品
data.remove(0);
}
System.out.println(consumer + "消费操作-->数量:" + num + ",消费成功~------库存:" + data.size());
// 消费完产品后,唤醒其他等待生产的线程
full.signalAll();
lock.unlock(); // 释放锁
}
}
使用await和signal后,加锁解锁操作就交给了Lock,不用再使用synchronized同步(具体可看前面总结的同步的实现方法),在produce中满仓后阻塞,生产完后唤醒等待的消费线程,consume中库存不足后阻塞,消费完后唤醒等待的生产者线程,表示可以消费了。
(3) BlockingQueue阻塞队列方式
在创建一个线程池如ThreadPoolExecutor时,需要传入一个阻塞任务队列即BlockingQueue用于保存等待执行的任务。可以选择以下几个阻塞队列:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue。
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
BlockingQueue的所有实现类都已经实现了同步的队列,实现的方式采用的是上面介绍的第二种await()/signal() + Lock的同步机制。在生成阻塞队列时,可以指定队列大小。用于阻塞操作的方法主要为:
- put()方法:插入一个元素,如果超过容量则自我阻塞,等待被唤醒;
- take()方法:取走一个元素,如果容量不足了,自我阻塞,等待被唤醒;
put和take内部自己实现了await和signal、lock的机制处理,不再需要我们做相应操作。
class Storage {
private static final int MAX_SIZE = 100;// 仓库的最大容量
private BlockingQueue<Object> data = new LinkedBlockingQueue<Object>(MAX_SIZE); // 使用阻塞队列作为存储载体
/**
* 生产操作
*/
public void produce(String producer, int num) {
if (data.size() == MAX_SIZE) {// 如果仓库已达最大容量
System.out.println("生产操作-->仓库已达最大容量!");
}
// 到这里,表示可以正常生产产品
for (int i = 0; i < num; i++) {// 生产num个产品
try {
data.put(new Object()); // put内部自动实现了判断,超过最大容量自动阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("生产操作-->数量:" + num + ",成功入库~------库存:" + data.size());
}
/**
* 消费操作
*/
public void consume(String consumer, int num) {
if (data.size() == 0) {// 如果产品数量不足
System.out.println("消费操作--库存不足!");
}
// 到这里,表示可以正常消费
for (int i = 0; i < num; i++) {// 消费num个产品
try {
data.take(); // take内部自动判断,消耗后库存是否充足,不足自我阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消费操作-->数量:" + num + ",消费成功~------库存:" + data.size());
}
}
可以看到,Storage中produce和consume方法中我们直接通过put和take方法往容器中添加或移除产品即可,没有进行逻辑控制(其实上面两个方法中if都可以去掉,只是为了打印效果才加上的),这是因为BlockingQueue内部已经实现了,不需要我们再次控制。
同时,我们看到打印的库存信息出现了不匹配,这个主要是因为我们的打印语句Systm.out.println()没有被同步导致的,因为同步语句只是在put和take方法内部,而我们打印语句中使用了data这个共享变量。这里因为我们需要看效果,所以才加的打印语句,并不影响我们对BlockingQueue的使用。
因此,在Java中,使用BlockingQueue阻塞队列的方式可以很方便的为我们处理生产者消费则问题,推荐使用。
在我们的编程生涯中,我们自己要去写生产者和消费者问题,多是前面第一种介绍的“类似消费者生产者问题”上。
解决生产者和消费者问题还有管道的方式,即在生产者和消费者之间建立一个管道缓冲区,Java中用PipedInputStream / PipedOutputStream实现,由于这种方式对于传输对象不易封装,因此实用性不高,就不具体介绍了。