多线程基础2
虚假唤醒
if条件进行等待改为while循环等待,避免虚假唤醒,因为一旦唤醒,if条件就不再判断是否满足唤醒条件而直接执行下面语句了,而while语句会继续判断是否满足唤醒条件
线程通信
创建三个线程AA、BB、CC分别交替打印2、3、4次,一共打印3轮
// ThreadDemo.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ShareResource {
// 定义标志位
private int flag = 1;
// 创建Lock锁
private Lock lock = new ReentrantLock();
// 创建三个condition
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
public void print2(int loop) throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断flag
while (flag != 1) {
// 等待
c1.await();
}
// 执行
for (int i = 1; i <= 2; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + loop + "轮");
}
System.out.println("--------------");
flag = 2; // 修改标志位
c2.signal(); // 通知B线程
} finally {
// 释放锁
lock.unlock();
}
}
// 打印10次,参数是第几轮
public void print3(int loop) throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断flag
while (flag != 2) {
// 等待
c2.await();
}
// 执行
for (int i = 1; i <= 3; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + loop + "轮");
}
System.out.println("--------------");
flag = 3; // 修改标志位
c3.signal(); // 通知C线程
} finally {
// 释放锁
lock.unlock();
}
}
public void print4(int loop) throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断flag
while (flag != 3) {
// 等待
c3.await();
}
// 执行
for (int i = 1; i <= 4; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + loop + "轮");
}
System.out.println("--------------");
flag = 1; // 修改标志位
c1.signal(); // 通知A线程
} finally {
// 释放锁
lock.unlock();
}
}
}
public class ThreadDemo {
public static void main(String[] args) {
ShareResource resource = new ShareResource();
new Thread(() -> {
for (int i = 1; i <= 3; i++) {
try {
resource.print2(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 1; i <= 3; i++) {
try {
resource.print3(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BB").start();
new Thread(() -> {
for (int i = 1; i <= 3; i++) {
try {
resource.print4(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "CC").start();
}
}AA 1 1轮
AA 2 1轮
--------------
BB 1 1轮
BB 2 1轮
BB 3 1轮
--------------
CC 1 1轮
CC 2 1轮
CC 3 1轮
CC 4 1轮
--------------
AA 1 2轮
AA 2 2轮
--------------
BB 1 2轮
BB 2 2轮
BB 3 2轮
--------------
CC 1 2轮
CC 2 2轮
CC 3 2轮
CC 4 2轮
--------------
AA 1 3轮
AA 2 3轮
--------------
BB 1 3轮
BB 2 3轮
BB 3 3轮
--------------
CC 1 3轮
CC 2 3轮
CC 3 3轮
CC 4 3轮
--------------ArrayList线程不安全,解决方式:CopyOnWriteArrayList
// ArrayList线程不安全,多次执行可以看到
// 解决方法:Vector \ Collections.synchronizedList() \ CopyOnWriteArrayList
// 常用CopyOnWriteArrayList
public class ThreadDemo2 {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
// List<String> list = new Vector<>();
// List<String> list = Collections.synchronizedList(new ArrayList<>());
// List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 80; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}HashSet线程不安全,解决方式:CopyOnWriteArraySet
HashMap线程不安全,解决方式:ConcurrentHashMap
公平锁和非公平锁
ReentrantLock的无参构造和有参构造,参数boolean fair
- 默认非公平锁:充分利用CPU的时间片,避免CPU的空闲状态,效率高,但线程可能饿死
- 公平锁:线程之间切换开销较大,效率相对较低,线程队列进行排序
可重入锁(递归锁)
synchronized(隐式)和Lock(显示)都是可重入锁
死锁
public class DeadlockExample {
// 两个共享资源
private static final Object resource1 = new Object();
private static final Object resource2 = new Object();
public static void main(String[] args) {
// 线程1
Thread thread1 = new Thread(() -> {
synchronized (resource1) {
System.out.println("Thread 1: locked resource 1");
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (resource2) {
System.out.println("Thread 1: locked resource 2");
}
}
});
// 线程2
Thread thread2 = new Thread(() -> {
synchronized (resource2) {
System.out.println("Thread 2: locked resource 2");
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (resource1) {
System.out.println("Thread 2: locked resource 1");
}
}
});
// 启动两个线程
thread1.start();
thread2.start();
}
}死锁的检测方式,一:纯命令,jps-l,jstack进程编号;二:图形化,jconsle,P39讲解死锁排查命令
JUC辅助类
CountDownLatch 减少计数
// CountDownLatchDemo.java
// 创建实例,指定线程数量
// 计数 -1
// await()
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 创建实例,指定线程数量
CountDownLatch latch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "号同学离开");
// 计数 -1
latch.countDown();
}, String.valueOf(i)).start();
}
// await()
latch.await();
System.out.println(Thread.currentThread().getName() + "班长锁门");
}
}输出:
3号同学离开
6号同学离开
1号同学离开
2号同学离开
4号同学离开
5号同学离开
main班长锁门CyclicBarrier 循环栅栏
public class CyclicBarrierDemo {
// 创建固定值
private static final int NUMBER = 7;
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {
System.out.println("*******集齐7颗龙珠");
});
for (int i = 1; i <= NUMBER; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "星龙珠");
try {
// 等待
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}输出:
7星龙珠
4星龙珠
3星龙珠
1星龙珠
2星龙珠
5星龙珠
6星龙珠
*******集齐7颗龙珠Semaphore 信号灯
// SemaphoreDemo.java
// 6辆车,3个停车位
public class SemaphoreDemo {
public static void main(String[] args) {
// 创建Semaphore设置许可数量
Semaphore semaphore = new Semaphore(3);
// 模拟6辆车
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
// 抢占
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "-抢到了车位");
// 设置停车时间
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println("-------离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}输出:
1-抢到了车位
2-抢到了车位
3-抢到了车位
-------离开了车位
4-抢到了车位
-------离开了车位
-------离开了车位
6-抢到了车位
5-抢到了车位
-------离开了车位
-------离开了车位
-------离开了车位ReentrantReadWriteLock读写锁
- 悲观锁 效率低,上锁后其他线程无法操作,不支持并发
- 乐观锁 支持并发,上锁后台线程也能操作,有版本号
- 表锁 锁住整张表
- 行锁 锁住一行,其他行正常被访问
- 读锁,也叫共享锁,会发生死锁
- 写锁,也叫独占锁,会发生死锁
| 无锁 | 多线程抢占资源 |
| synchronized 和 ReentrantLock | 都是独占,每次只能进来一个操作,不能共享 |
| ReentrantReadWriteLock读写锁 | 读读可以共享,提升性能,同时多人进行读操作;缺点:容易造成锁饥饿,一致读不能写;读的时候不能写,只有读完之后才能写 |
锁降级:写可以降级为读,然后释放读锁,再释放写锁
BlockingQueue阻塞队列
ArrayBlockingQueue 由数组组成的有界阻塞队列
LinkedBlockingQueue 由链表组成的有界(若无参构造默认容量
Integer.MAX_VALUE)阻塞队列DelayQueue 使用优先级队列实现的延迟无界阻塞队列
PriorityBlockingQueue 支持优先级排序的无界阻塞队列
SynchronousQueue 不存储元素的阻塞队列,也即单个元素的队列
LinkedTransFerQueue 由链表组成的无界阻塞队列
LinkedBlockingDeque 由链表组成的双向阻塞队列
多线程中:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会被唤醒
线程池
提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控。
Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类。
Executor接口最基础的接口,定义了一个方法
execute(Runnalbe command),用于提交任务。主要实现类是
ThreadPoolExecutor。ExecutorService接口继承了
Executor,增加了一些方法用于管理任务执行和生命周期,例如submit(),invokeAll(),shutdown(),awaitTermination()等。主要实现类包括
ThreadPoolExecutor和ScheduledThreadPoolExecutor。ThreadPoolExecutor类线程池的核心实现类,提供了灵活的线程池配置选项,可以通过参数来控制核心线程数、最大线程数、空闲线程存活时间、任务队列等。
可以创建不同类型的线程池,例如固定大小线程池、缓存线程池、单线程池等。
ScheduledExecutorService接口继承了
ExecutorService,提供了调度功能,允许任务在指定的延迟后执行或定期执行。主要实现类是
ScheduledThreadPoolExecutor。Executors工具类Executors提供了一些静态工厂方法,用于创建常见的线程池实例,例如固定大小线程池、缓存线程池、单线程池、定时任务线程池等。
// ThreadPoolDemo1.java
public class ThreadPoolDemo1 {
public static void main(String[] args) {
// ExecutorService接口继承Executor接口,里面抽象方法execute的参数是接口Runnable
ExecutorService threadPool = Executors.newFixedThreadPool(5); // 固定大小线程池
// ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 单线程池
// ExecutorService threadPool = Executors.newCachedThreadPool(); // 缓存线程池
try {
for (int i = 1; i < 20; i++) {
// lambda表达式,这里接口Execute的抽象方法execute参数为Runnable
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}实际开发不要使用Executors工具类来创建线程池,用ThreadPoolExecutor类创建线程池的实例,参数配置
// ThreadPoolDemo2.java
public class ThreadPoolDemo2 {
public static void main(String[] args) {
// 自定义线程池
int corePoolSize = 2;
int maximumPoolSize = 5;
long keepAliveTime = 2L;
int queueCapacity = 3;
ExecutorService threadPool = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueCapacity),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
// 20个顾客请求
try {
for (int i = 1; i <= maximumPoolSize + queueCapacity; i++) {
// lambda表达式,这里接口Execute的抽象方法execute参数为Runnable
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}输出:
pool-1-thread-3办理业务
pool-1-thread-4办理业务
pool-1-thread-3办理业务
pool-1-thread-4办理业务
pool-1-thread-2办理业务
pool-1-thread-1办理业务
pool-1-thread-3办理业务
pool-1-thread-5办理业务Fork/Join简介
Fork/Join可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。
Fork:把一个复杂任务进行拆分,大事化小
Join:把拆分的任务结果进行合并
// ForkJoinDemo.java
// public abstract class RecursiveTask<V> extends ForkJoinTask<V>
// 计算斐波那契数列第n项,模仿这个例子写的MyTask
class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
class MyTask extends RecursiveTask<Integer> {
private static final Integer VAULUE = 10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if (end - begin <= VAULUE) {
for (int i = begin; i <= end; i++) {
result = result + i;
}
} else {
int middle = (begin + end) / 2;
MyTask left = new MyTask(begin, middle);
left.fork();
MyTask right = new MyTask(middle + 1, end);
right.fork();
result = left.join() + right.join();
}
return result;
}
}
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建MyTask对象
MyTask myTask = new MyTask(1, 100);
// 创建分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
// 获取合并之后结果,5050
Integer result = forkJoinTask.get();
System.out.println(result);
// 关闭池对象
forkJoinPool.shutdown();
}
}