首页 > 备战大半年秋招资料分享-Java并发篇
头像
肥猫学长
发布于 2021-09-24 09:52
+ 关注

备战大半年秋招资料分享-Java并发篇

备战秋招大半年,目前已经拿到offer上岸,将半年来的笔记分享给大家。更多 笔记涵盖  MYSQL、Elasticsearch、Kafka、设计模式、JVM、Java语言基础、集合原理、并发技术 。
需要的同学可以加我vx:uukiinternet 私发给你们哦。


Java 并发
AQS
8.AQS (AbstractQueuedSynchronizer)
AQS的作用:AQS 是一个用于构建锁、同步器等线程协作工具类的模板框架
8.1 AQS的实现有哪些?
Semaphore
CountDownLatch
ReentrantLock
CyclicBarrier
ReentrantReadWriteLock
ThreadPoolExcutor

线程创建方式:继承Threads实现run方法;实现Runnable接口实现run方法;Callable接口;由于Java不支持多重继承,因此建议使用实现Runnable接口的方式。
// Callabable方式,通过futureTask get(Timeout)获取线程执行结束返回值;支持捕获线程抛出异常,
public class MyFuture implements Callable<Integer> {


    @Override
    public Integer call() throws Exception {
        int i = 0;
        for (; i < 100; i++) {
            System.out.println(String.format("I am %s, count is %d", Thread.currentThread().getName(), i));
            Thread.sleep(10);
        }
        return i;
    }
}
MyFuture myf = new MyFuture();
FutureTask<Integer> futureTask = new FutureTask<Integer>(myf);
Thread futureThr = new Thread(futureTask);
futureThr.start();
futureTask.get() // 阻塞等待 可以通过isDone、isCancelled判断进度

使用ExecutorService管理线程:shutdown、shutdownNow都不会阻塞主线程执行;shutdown方法只是向线程池发起一个终止信号,线程池完成任务后退出。shutdownNow是向线程池调用Interrupt(),如果线程在阻塞、等待的话会抛出InterruptException,从而结束该线程,但是记住不能终止I/O阻塞和synchronized阻塞的线程https://blog.csdn.net/qq_26012495/article/details/84325445
几种常用的线程池:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();  SynchronousQueue 队列实现 (使用该队列,由于容量为0,其不会真实的保存任务,而是每次都将新任务提交给线程池,如果线程池满了,则直接执行拒绝策略,简单粗暴)
工作线程创建无限制,可以根据任务灵活的往线程池里创建线程,60s没有工作则自动销毁,没有任务的时候线程数可以为0,不占用系统资源,该线程池需要频繁的创建销毁线程,占用资源

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);   LinkedBlockingQueue  无限长度
它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();  LinkedBlockingQueue  无限长度
即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);   DelayedWorkQueue
创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行。

fixedThreadPool.execute(new Runnable())
execute()是Executor接口的方法,没有返回值

fixedThreadPool.submit(Runnable Task)
submit是executorService方法,通过Future获取任务结果

execute和submit方法的不同点:
(1)execute没有返回值;而submit有返回值,方便返回执行结果。
(2)submit方便进行Exception处理,由于返回参数是Future,如果执行期间抛出了异常,可以使用Future.get()进行捕获。

说明:Executors返回的线程池对象的弊端如下:
1)FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2)CachedThreadPool和ScheduledThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

* 对于需要保证所有提交的任务都要被执行的情况,使用FixedThreadPool
* 如果限定只能使用一个线程进行任务处理,使用SingleThreadExecutor
* 如果希望提交的任务尽快分配线程执行,使用CachedThreadPool (任务处理速度快于提交速度)
* 如果业务上允许任务执行失败,或者任务执行过程可能出现执行时间过长进而影响其他业务的应用场景,可以通过使用限定线程数量的线程池以及限定长度的队列进行容错处理。
workQueue:超过coreThr个数之后放到队列中存储,超过队列长度后继续创建线程,达到maxThr后执行拒绝策略。
自定义线程池可以根据应用场景自行配置workQueue和拒绝策略(1.  AbortPolicy 丢弃并抛出异常 2. DiscardPolicy 丢弃不抛出异常 3. DiscardOldestPolicy 丢掉最老的一个任务,然后加入队列  4. CallerRunsPolicy 调用者线程自己运行)。

LinkedBlockingQueue https://blog.csdn.net/tonywu1992/article/details/83419448 put/take 阻塞线程  offer/poll非阻塞直接返回
队列已满,阻塞等待。
队列未满,创建一个node节点放入队列中,如果放完以后队列还有剩余空间,继续唤醒下一个添加线程进行添加。如果放之前队列中没有元素,放完以后要唤醒消费线程进行消费。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 获取锁中断
    putLock.lockInterruptibly();
    try {
        //判断队列是否已满,如果已满阻塞等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 把node放入队列中
        enqueue(node);
        c = count.getAndIncrement();
        // 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 如果队列中有一条数据(证明之前消费者都是阻塞的await),唤醒消费线程进行消费
    if (c == 0)
        signalNotEmpty();
}

**可以看到offer仅仅对put方法改动了一点点,当队列没有可用元素的时候,不同于put方法的阻塞等待,offer方法直接方法false。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 队列为空,阻塞等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        // 队列中还有元素,唤醒下一个消费线程进行消费
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 移除元素之前队列是满的(证明所有生产者都在await等待唤醒),唤醒生产线程进行添加元素
    if (c == capacity)
        signalNotFull();
    return x;
}
** poll方法去除了take方法中元素为空后阻塞等待这一步骤,这里也就不详细说了。

ArrayBlockingQueue
生产者、消费者共享一把锁,两个Condition;
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

CountDownLatch(类似操作系统共享变量):CountDownLatch维护一个计数器,可以实现一个或者多个线程等待多个线程完成任务后执行。等待线程调用countDownLatch.await方法进行阻塞等待,执行线程每完成任务就调用countDownLatch.countdown(),对计数器减一,减至0则唤醒等待线程。
countDownLatch底层是通过Sync(AQS)实现,通过CAS对Sync计数器减一,而await线程则死循环不断尝试判断计数器是否等于0;若是则返回,否则则死循环

通过内部类Sync的源码可以分析出,CountDownLatch的实现完整逻辑如下:
1、初始化CountDownLatch实际就是设置了AQS的state为计数的值 state:volatile变量
2、调用CountDownLatch的countDown方法时实际就是调用AQS的释放同步状态的方法,每调用一次就自减一次state值   不断自旋,通过CAS来更新state值
3、调用await方法实际就调用AQS的共享式获取同步状态的方法acquireSharedInterruptibly(1),这个方法的实现逻辑就调用子类Sync的tryAcquireShared方法,只有当子类Sync的tryAcquireShared方法返回大于0的值时才算获取同步状态成功,
否则就会一直在死循环中不断重试,直到tryAcquireShared方法返回大于等于0的值,而Sync的tryAcquireShared方法只有当AQS中的state值为0时才会返回1,否则都返回-1,也就相当于只有当AQS的state值为0时,await方法才会执行成功,否则
就会一直处于死循环中不断重试。   不断自旋 判断state == 0
总结:
CountDownLatch实际完全依靠AQS的共享式获取和释放同步状态来实现,初始化时定义AQS的state值,每调用countDown实际就是释放一次AQS的共享式同步状态,await方法实际就是尝试获取AQS的同步状态,只有当同步状态值为0时才能获取成功


CyclicBarrier:多个线程之间互相等待,等待所有线程都到达Barrier的时候(每调用await计数器-1,并进行等待),即计数器为0时候,所有await线程被唤醒继续执行。

CyclicBarrier实现原理:
主要是给两个属性parties(总线程数)、count(当前剩余线程数)进行赋值;从源码可以看出CyclicBarrier的实现原理主要是通过ReentrantLock和Condition来实现的,主要实现流程如下:
1、创建CyclicBarrier时定义了CyclicBarrier对象需要达到的线程数count
2、每当一个线程执行了await方法时,需要先通过ReentrantLock进行加锁操作,然后对count进行自减操作,操作成功则判断当前count是否为0;
3、如果当前count不为0则调用Condition的await方法使当前线程进入等待状态;
4、如果当前count为0则表示同步屏障已经完全,调用NextGernation,此时调用Condition的signalAll方法唤醒之前所有等待的线程,并开启循环的下一次同步屏障功能;
5、唤醒其他线程之后,其他线程继续执行剩余的逻辑。

lock.lock();
20         try {
21             final Generation g = generation;
22
23             if (g.broken)
24                 throw new BrokenBarrierException();
25             //响应线程中断
26             if (Thread.interrupted()) {
27                 breakBarrier();
28                 throw new InterruptedException();
29             }
30             //count自减操作
31             int index = --count;
32             //判断当前还需达到同步屏障的线程数是否为0
33             if (index == 0) {  // tripped
34                 boolean ranAction = false;
35                 try {
36                     //barrierCommand是同步屏障打开之后需要执行的Runnable对象
37                     final Runnable command = barrierCommand;
38                     if (command != null)
39                         //如果Runnable对象不为空直接执行Runnable线程任务
40                         command.run();
41                     ranAction = true;
42                     //本次同步屏障全部达成,唤醒所有线程并开始下一次同步屏障
43                     nextGeneration();
44                     return 0;
45                 } finally {
46                     if (!ranAction)
47                         breakBarrier();
48                 }
49             }
50
51             // loop until tripped, broken, interrupted, or timed out
52             for (;;) {
53                 try {
54                     if (!timed)
55                         //调用Condition对象的await方法使当前线程进入等待状态
56                         trip.await();
57                     else if (nanos > 0L)
58                         nanos = trip.awaitNanos(nanos);
59                 } catch (InterruptedException ie) {
60                     if (g == generation && ! g.broken) {
61                         breakBarrier();
62                         throw ie;
63                     } else {
64                         // We're about to finish waiting even if we had not
65                         // been interrupted, so this interrupt is deemed to
66                         // "belong" to subsequent execution.
67                         Thread.currentThread().interrupt();
68                     }
69                 }
70
71                 if (g.broken)
72                     throw new BrokenBarrierException();
73
74                 if (g != generation)
75                     return index;
76
77                 if (timed && nanos <= 0L) {
78                     breakBarrier();
79                     throw new TimeoutException();
80                 }
81             }
82         } finally {
83             lock.unlock();
84         }
85     }
86
87     private void nextGeneration() {
88         // signal completion of last generation
89         //唤醒所有线程
90         trip.signalAll();
91         // set up next generation
92         count = parties;
93         generation = new Generation();
94     }

CountDownLatch与CyclicBarrier区别 CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的;CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。nexGeneration()重置,也可reset()重置

线程Join:A线程Join B线程,表示线程A将自身线程挂起,等待B完成才继续执行,实现线程之间协作。

线程wait/notify:命令必须在同步代码块/方法运行(synchronized),否则抛出异常。线程使用wait命令挂起期间,线程会释放锁,使得其他线程可以进入对象的同步方法/代码块中,等待满足一定条件时,其他线程调用notifyAlll/notify方法进行唤醒(注意,唤醒过程也需要等待唤醒线程完成操作释放锁后,再由wait线程竞争锁获得运行权利)。其中notifyall表示唤醒所有等待线程,notify唤醒其中一个。

wait() 和 sleep() 的区别
  • wait() 是 Object 的方法,而 sleep() 是 Thread 的静态方法;
  • wait() 会释放锁,sleep() 不会。
await、signal:类似wait/notify,java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。相比于 wait() 这种等待方式,await() 可以指定等待的条件,因此更加灵活。
public class AwaitSignalExample {


    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();


    public void before() {
        lock.lock();
        try {
            System.out.println("before");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }


    public void after() {
        lock.lock();
        try {
            condition.await();
            System.out.println("after");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

Java锁类型:
// 非公平锁 竞争锁获得
ReentrantLock rtk = new ReentrantLock();
// 公平锁 按照申请顺序
ReentrantLock rtk2 = new ReentrantLock(true);
// 可重入锁;线程在外层获取锁的时候,在内层也拥有锁,可以一定程度上避免死锁,如ReentrantLock、Synchronized
synchronized void setA() throws Exception{
    Thread.sleep(1000);
    setB();
}

synchronized void setB() throws Exception{
    Thread.sleep(1000);
}
上面的代码就是一个可重入锁的一个特点,如果不是可重入锁的话,setB可能不会被当前线程执行,可能造成死锁。
// 独占锁: 比如ReetrantLock 只允许一个线程进入    共享锁:CountDownLatch 底层使用Sync实现 每次进入一个线程就-1,减到0才唤醒await
CountDownLatch cdl = new CountDownLatch(1);
// 乐观锁:无锁编程 如CAS,通过CAS自旋完成更新   悲观锁:大多数锁都是悲观锁
// 自旋锁 如CAS


偏向所锁,轻量级锁及重量级锁 (为了提高性能,消除无竞争情况下的同步原语(加锁释放锁操作底层使用操作系统Mutex Lock),提高运行性能)

偏向所锁,轻量级锁都是乐观锁,重量级锁是悲观锁。
一个对象刚开始实例化的时候,没有任何线程来访问它的时候。它是可偏向的,意味着,它现在认为只可能有一个线程来访问它,所以当第一个
线程来访问它的时候,它会偏向这个线程,此时,对象持有偏向锁。偏向第一个线程,这个线程在修改对象头成为偏向锁的时候使用CAS操作,并将
对象头中的ThreadID改成自己的ID,之后再次访问这个对象时,只需要对比ID,不需要再使用CAS在进行操作。
一旦有第二个线程访问这个对象,因为偏向锁不会主动释放,所以第二个线程可以看到对象时偏向状态,这时表明在这个对象上已经存在竞争了,检查原来持有该对象锁的线程是否依然存活,如果挂了,则可以将对象变为无锁状态,然后重新偏向新的线程,如果原来的线程依然存活,则马上执行那个线程的操作栈,检查该对象的使用情况,如果仍然需要持有偏向锁,则偏向锁升级为轻量级锁,(偏向锁就是这个时候升级为轻量级锁的。如果不存在使用了,则可以将对象回复成无锁状态,然后重新偏向。
轻量级锁认为竞争存在,但是竞争的程度很轻,一般两个线程对于同一个锁的操作都会错开,或者说稍微等待一下(自旋),另一个线程就会释放锁。 但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁膨胀为重量级锁,重量级锁使除了拥有锁的线程以外的线程都阻塞,防止CPU空转

锁粗化

如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗。
上一节的示例代码中连续的 append() 方法就属于这类情况。如果虚拟机探测到由这样的一串零碎的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部。对于上一节的示例代码就是扩展到第一个 append() 操作之前直至最后一个 append() 操作之后,这样只需要加锁一次就可以了。
public synchronized StringBuffer append(String str) {
    toStringCache = null;
    super.append(str);
    return this;
}



Java内存模型
1、 根据虚拟机规范,对于一个实例对象中的成员方法而言,如果方法中包含本地变量是基本数据类型(boolean,byte,short,char,int,long,float,double),将直接存储在工作内存的帧栈结构中,但倘若本地变量是引用类型,那么该变量的引用会存储在功能内存的帧栈中,而对象实例将存储在主内存(共享数据区域,堆)中。 但对于实例对象的成员变量,不管它是基本数据类型或者包装类型(Integer、Double等)还是引用类型,都会被存储到堆区

Happens-before原则 JVM 还规定了先行发生原则,让一个操作无需控制就能先于另一个操作完成。1) 对一个 volatile 变量的写操作先行发生于后面对这个变量的读操作。 2) 一个 unlock 操作先行发生于后面对同一个锁的 lock 操作。

ThreadLocal:使用线程本地存储可以解决线程安全的问题。一个公共(static)的ThreadLocal对象,不同线程调用其set()、get()方法进行数据的读写而不会产生线程安全问题。因为本质上每个Thread线程对象,都有一个成员对象ThreadLocalMap(数组实现),ThreadLocalMap数组上每一个元素都是由Key=ThreadLocal、Value组成的Entry对象,set()、get()方法就是对该数组根据ThreadLocal对象hash后放入ThreadLocalMap中。注意,set、get过程都会对key=null的元素进行清除,原因是key=null,意味着之前的ThreadLocal对象已经被回收释放,因此为了避免内存泄露(ThreadLocal对象被释放却仍然被Map对象持有),线程使用完ThreadLocal后应该显示调用remove方法

10. 为什么我们调用 start() 方法时会执行 run() 方法,为什么我们不能直接调用 run() 方法?

这是另一个非常经典的 java 多线程面试问题,而且在面试中会经常被问到。很简单,但是很多人都会答不上来!
new 一个 Thread,线程进入了新建状态;调用 start() 方***启动一个线程并使线程进入了就绪状态,当分配到时间片后就可以开始运行了。 start() 会执行线程的相应准备工作,然后自动执行 run() 方法的内容,这是真正的多线程工作。 而直接执行 run() 方***把 run 方法当成一个 main 线程下的普通方法去执行,并不会在某个线程中执行它,所以这并不是多线程工作。
总结: 调用 start 方法方可启动线程并使线程进入就绪状态,而 run 方法只是 thread 的一个普通方法调用,还是在主线程里执行。

ReentrantReadWriteLock读写锁
分为公平锁、非公平锁两种模式,读锁共享、写锁独享。
当对象读锁被线程持有后,新线程无法获得对象写锁,并加入队列等待。
写锁线程释放后,唤醒队列多个线程去竞争读锁(非公平)或依次唤醒等待线程(公平锁)




全部评论

(2) 回帖
加载中...
话题 回帖