Skip to content

多线程基础2


尚硅谷

虚假唤醒

if条件进行等待改为while循环等待,避免虚假唤醒,因为一旦唤醒,if条件就不再判断是否满足唤醒条件而直接执行下面语句了,而while语句会继续判断是否满足唤醒条件

线程通信

创建三个线程AA、BB、CC分别交替打印2、3、4次,一共打印3轮

java
// ThreadDemo.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ShareResource {
    // 定义标志位
    private int flag = 1;

    // 创建Lock锁
    private Lock lock = new ReentrantLock();

    // 创建三个condition
    private Condition c1 = lock.newCondition();
    private Condition c2 = lock.newCondition();
    private Condition c3 = lock.newCondition();

    public void print2(int loop) throws InterruptedException {
        // 上锁
        lock.lock();
        try {
            // 判断flag
            while (flag != 1) {
                // 等待
                c1.await();
            }
            // 执行
            for (int i = 1; i <= 2; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + loop + "轮");
            }
            System.out.println("--------------");

            flag = 2;      // 修改标志位
            c2.signal();   // 通知B线程
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    // 打印10次,参数是第几轮
    public void print3(int loop) throws InterruptedException {
        // 上锁
        lock.lock();
        try {
            // 判断flag
            while (flag != 2) {
                // 等待
                c2.await();
            }
            // 执行
            for (int i = 1; i <= 3; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + loop + "轮");
            }
            System.out.println("--------------");
            flag = 3;      // 修改标志位
            c3.signal();   // 通知C线程
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    public void print4(int loop) throws InterruptedException {
        // 上锁
        lock.lock();
        try {
            // 判断flag
            while (flag != 3) {
                // 等待
                c3.await();
            }
            // 执行
            for (int i = 1; i <= 4; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + loop + "轮");

            }
            System.out.println("--------------");

            flag = 1;      // 修改标志位
            c1.signal();   // 通知A线程
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
}

public class ThreadDemo {

    public static void main(String[] args) {
        ShareResource resource = new ShareResource();
        new Thread(() -> {
            for (int i = 1; i <= 3; i++) {
                try {
                    resource.print2(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "AA").start();

        new Thread(() -> {
            for (int i = 1; i <= 3; i++) {
                try {
                    resource.print3(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "BB").start();

        new Thread(() -> {
            for (int i = 1; i <= 3; i++) {
                try {
                    resource.print4(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "CC").start();
    }
}
sh
AA	1	1轮
AA	2	1轮
--------------
BB	1	1轮
BB	2	1轮
BB	3	1轮
--------------
CC	1	1轮
CC	2	1轮
CC	3	1轮
CC	4	1轮
--------------
AA	1	2轮
AA	2	2轮
--------------
BB	1	2轮
BB	2	2轮
BB	3	2轮
--------------
CC	1	2轮
CC	2	2轮
CC	3	2轮
CC	4	2轮
--------------
AA	1	3轮
AA	2	3轮
--------------
BB	1	3轮
BB	2	3轮
BB	3	3轮
--------------
CC	1	3轮
CC	2	3轮
CC	3	3轮
CC	4	3轮
--------------

ArrayList线程不安全,解决方式:CopyOnWriteArrayList

java
// ArrayList线程不安全,多次执行可以看到
// 解决方法:Vector \ Collections.synchronizedList() \ CopyOnWriteArrayList
// 常用CopyOnWriteArrayList

public class ThreadDemo2 {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();

//        List<String> list = new Vector<>();
//        List<String> list = Collections.synchronizedList(new ArrayList<>());
//        List<String> list = new CopyOnWriteArrayList<>();

        for (int i = 0; i < 80; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0, 8));
                System.out.println(list);
            }, String.valueOf(i)).start();
        }
    }
}

HashSet线程不安全,解决方式:CopyOnWriteArraySet

HashMap线程不安全,解决方式:ConcurrentHashMap

公平锁和非公平锁

ReentrantLock的无参构造和有参构造,参数boolean fair

  • 默认非公平锁:充分利用CPU的时间片,避免CPU的空闲状态,效率高,但线程可能饿死
  • 公平锁:线程之间切换开销较大,效率相对较低,线程队列进行排序

可重入锁(递归锁)

synchronized(隐式)和Lock(显示)都是可重入锁

死锁

java
public class DeadlockExample {
    // 两个共享资源
    private static final Object resource1 = new Object();
    private static final Object resource2 = new Object();

    public static void main(String[] args) {
        // 线程1
        Thread thread1 = new Thread(() -> {
            synchronized (resource1) {
                System.out.println("Thread 1: locked resource 1");

                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                synchronized (resource2) {
                    System.out.println("Thread 1: locked resource 2");
                }
            }
        });

        // 线程2
        Thread thread2 = new Thread(() -> {
            synchronized (resource2) {
                System.out.println("Thread 2: locked resource 2");

                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                synchronized (resource1) {
                    System.out.println("Thread 2: locked resource 1");
                }
            }
        });

        // 启动两个线程
        thread1.start();
        thread2.start();
    }
}

死锁的检测方式,一:纯命令,jps-l,jstack进程编号;二:图形化,jconsle,P39讲解死锁排查命令

JUC辅助类

CountDownLatch 减少计数
java
// CountDownLatchDemo.java
// 创建实例,指定线程数量
// 计数 -1
// await()
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        // 创建实例,指定线程数量
        CountDownLatch latch = new CountDownLatch(6);
        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "号同学离开");
                // 计数 -1
                latch.countDown();
            }, String.valueOf(i)).start();
        }

        // await()
        latch.await();
        System.out.println(Thread.currentThread().getName() + "班长锁门");
    }
}

输出:

sh
3号同学离开
6号同学离开
1号同学离开
2号同学离开
4号同学离开
5号同学离开
main班长锁门
CyclicBarrier 循环栅栏
java
public class CyclicBarrierDemo {
    // 创建固定值
    private static final int NUMBER = 7;

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {
            System.out.println("*******集齐7颗龙珠");
        });

        for (int i = 1; i <= NUMBER; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "星龙珠");
                try {
                    // 等待
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
    }
}

输出:

sh
7星龙珠
4星龙珠
3星龙珠
1星龙珠
2星龙珠
5星龙珠
6星龙珠
*******集齐7颗龙珠
Semaphore 信号灯
java
// SemaphoreDemo.java
// 6辆车,3个停车位
public class SemaphoreDemo {
    public static void main(String[] args) {
        // 创建Semaphore设置许可数量
        Semaphore semaphore = new Semaphore(3);

        // 模拟6辆车
        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                try {
                    // 抢占
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "-抢到了车位");
                    // 设置停车时间
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                    System.out.println("-------离开了车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 释放
                    semaphore.release();
                }
            }, String.valueOf(i)).start();
        }
    }
}

输出:

sh
1-抢到了车位
2-抢到了车位
3-抢到了车位
-------离开了车位
4-抢到了车位
-------离开了车位
-------离开了车位
6-抢到了车位
5-抢到了车位
-------离开了车位
-------离开了车位
-------离开了车位

ReentrantReadWriteLock读写锁

  • 悲观锁 效率低,上锁后其他线程无法操作,不支持并发
  • 乐观锁 支持并发,上锁后台线程也能操作,有版本号
  • 表锁 锁住整张表
  • 行锁 锁住一行,其他行正常被访问
  • 读锁,也叫共享锁,会发生死锁
  • 写锁,也叫独占锁,会发生死锁
无锁多线程抢占资源
synchronized 和 ReentrantLock都是独占,每次只能进来一个操作,不能共享
ReentrantReadWriteLock读写锁读读可以共享,提升性能,同时多人进行读操作;缺点:容易造成锁饥饿,一致读不能写;读的时候不能写,只有读完之后才能写

锁降级:写可以降级为读,然后释放读锁,再释放写锁

BlockingQueue阻塞队列

  • ArrayBlockingQueue 由数组组成的有界阻塞队列

  • LinkedBlockingQueue 由链表组成的有界(若无参构造默认容量Integer.MAX_VALUE)阻塞队列

  • DelayQueue 使用优先级队列实现的延迟无界阻塞队列

  • PriorityBlockingQueue 支持优先级排序的无界阻塞队列

  • SynchronousQueue 不存储元素的阻塞队列,也即单个元素的队列

  • LinkedTransFerQueue 由链表组成的无界阻塞队列

  • LinkedBlockingDeque 由链表组成的双向阻塞队列

多线程中:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会被唤醒

线程池

提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控。

Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类。

  • Executor接口

    最基础的接口,定义了一个方法execute(Runnalbe command),用于提交任务。

    主要实现类是ThreadPoolExecutor

  • ExecutorService接口

    继承了 Executor,增加了一些方法用于管理任务执行和生命周期,例如 submit(), invokeAll(), shutdown(), awaitTermination() 等。

    主要实现类包括 ThreadPoolExecutorScheduledThreadPoolExecutor

  • ThreadPoolExecutor

    线程池的核心实现类,提供了灵活的线程池配置选项,可以通过参数来控制核心线程数、最大线程数、空闲线程存活时间、任务队列等。

    可以创建不同类型的线程池,例如固定大小线程池、缓存线程池、单线程池等。

  • ScheduledExecutorService接口

    继承了 ExecutorService,提供了调度功能,允许任务在指定的延迟后执行或定期执行。

    主要实现类是 ScheduledThreadPoolExecutor

  • Executors工具类

    Executors 提供了一些静态工厂方法,用于创建常见的线程池实例,例如固定大小线程池、缓存线程池、单线程池、定时任务线程池等。

java
// ThreadPoolDemo1.java
public class ThreadPoolDemo1 {
    public static void main(String[] args) {
        // ExecutorService接口继承Executor接口,里面抽象方法execute的参数是接口Runnable
        ExecutorService threadPool = Executors.newFixedThreadPool(5); // 固定大小线程池
//        ExecutorService threadPool = Executors.newSingleThreadExecutor();     // 单线程池
//        ExecutorService threadPool = Executors.newCachedThreadPool();         // 缓存线程池

        try {
            for (int i = 1; i < 20; i++) {
                // lambda表达式,这里接口Execute的抽象方法execute参数为Runnable
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

实际开发不要使用Executors工具类来创建线程池,用ThreadPoolExecutor类创建线程池的实例,参数配置

java
// ThreadPoolDemo2.java
public class ThreadPoolDemo2 {
    public static void main(String[] args) {
        // 自定义线程池
        int corePoolSize = 2;
        int maximumPoolSize = 5;
        long keepAliveTime = 2L;
        int queueCapacity = 3;

        ExecutorService threadPool = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(queueCapacity),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        // 20个顾客请求
        try {
            for (int i = 1; i <= maximumPoolSize + queueCapacity; i++) {
                // lambda表达式,这里接口Execute的抽象方法execute参数为Runnable
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

输出:

sh
pool-1-thread-3办理业务
pool-1-thread-4办理业务
pool-1-thread-3办理业务
pool-1-thread-4办理业务
pool-1-thread-2办理业务
pool-1-thread-1办理业务
pool-1-thread-3办理业务
pool-1-thread-5办理业务

Fork/Join简介

Fork/Join可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。

Fork:把一个复杂任务进行拆分,大事化小

Join:把拆分的任务结果进行合并

java
// ForkJoinDemo.java

// public abstract class RecursiveTask<V> extends ForkJoinTask<V>

// 计算斐波那契数列第n项,模仿这个例子写的MyTask
class Fibonacci extends RecursiveTask<Integer> {
    final int n;

    Fibonacci(int n) {
        this.n = n;
    }

    @Override
    protected Integer compute() {
        if (n <= 1) {
            return n;
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join();
    }
}

class MyTask extends RecursiveTask<Integer> {
    private static final Integer VAULUE = 10;
    private int begin;
    private int end;
    private int result;

    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - begin <= VAULUE) {
            for (int i = begin; i <= end; i++) {
                result = result + i;
            }
        } else {
            int middle = (begin + end) / 2;
            MyTask left = new MyTask(begin, middle);
            left.fork();
            MyTask right = new MyTask(middle + 1, end);
            right.fork();
            result = left.join() + right.join();
        }
        return result;
    }
}

public class ForkJoinDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建MyTask对象
        MyTask myTask = new MyTask(1, 100);
        // 创建分支合并池对象
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
        // 获取合并之后结果,5050
        Integer result = forkJoinTask.get();
        System.out.println(result);
        // 关闭池对象
        forkJoinPool.shutdown();
    }
}