关注我,可了解更多有趣的面试相关问题。
本篇收录于《计算机核心知识串讲》,属于该系列第二篇,后面持续更新中…………
写在之前
关于Simaphore
第一次了解这个名词,很多书直接翻译为信号灯?信号量?,总是不太清楚表示何种意思,就比如Socket
被翻译为套接字,百分之99的人第一眼不知道是个啥,如果直接翻译为"插座",大家第一眼就知道是什么了。直到后面看到《并发编程艺术》对Simaphore
讲解,颇为贴切,引用放在文首。
信号量,将其看做马路上控制流量的信号灯 比如”后厂村路“想要控制人流量,每次只允许200辆车通过,其余车辆必须在路口等待,前200辆车拿到通过的许可证,可以开进马路,后面的车因为没有许可证不允许驶入“后厂村路”。如果其中有10辆车已经完全通过马路,哨兵重新颁发10张许可证给等待车辆,这10辆车允许通过。
What
什么是Simaphore
?
上面的例子中,车辆就是工作线程、许可证可以当做是令牌,驶入马路表示线程在执行,完全通过马路表示线程执行完毕。
通过上面的例子可以总结出Simaphore
概念的含义:用于控制同时访问特定资源的线程数,并且每个线程在获取锁之前必须从semaphore
获取许可(可以理解为令牌),当该线程获取锁之后执行完业务,将令牌释放回资源池中,其他线程可以重复利用。
信号量默认大小是1,表示只允许一个线程访问资源(也就是源码中用的permits,我理解翻译成资源比较合适),可以被用作互斥锁。这通常被称为二进制信号量,因为它只有两种状态:
还有一个资源可以被访问、零个资源可以被访问,
类结构
Semaphore
类本身只实现了一个Serializable
接口,其内部有一个抽象内部类的Sync
同步组件,该类本身继承了AbstractQueuedSynchronizer
(以下都简称为AQS),以AQS
为基础实现了自己的信号量同步器,之前提到过AQS
中定义了通用的state
,在Semaphore
中表示可用资源数,Sync
类本身提供了一些方法供其两个实现类NonfairSync
和FairSync
调用,保证Semaphore
同样提供公平的获取资源及非公平的获取资源等两种模式,非公平模式下,资源利用的效率高于公平模式。
关于AQS
,后续会推一篇文章专门介绍一下。【挖坑+1】
如何创建Semaphore
1、初始化可用资源数。
默认采用非公平模式
public Semaphore(int permits) { sync = new NonfairSync(permits); }
- 初始化可用资源数。可以自定义是否是公平模式
/** * 初始化信号量对象 * @param permits 允许访问资源数 * @param fair 是否公平 */ public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
3. 基本方法
1. acquire()
获取可用资源。
该方法直接调用AQS的共享的、可中断的获取同步状态的方法。
如果获取不到资源则阻塞线程,直到有可用资源或者被其他线程中断,获取到资源之后,将资源池的可用资源数减1。
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
2. acquireUninterruptibly()
非中断的获取资源。该方法直接调用AQS中的acquireShared()方法
如果没有资源可访问,则当前线程会一直等待,直到其他线程释放资源,资源池中有资源可访问。
public void acquireUninterruptibly() { sync.acquireShared(1); }
3. tryAcquire()
尝试获取资源,
非公平的抢占,不在乎是否有线程在等待。如果资产池有中资源可用则返回true,否则返回false
直接调用抽象内部类的非公平抢占式方法
public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; }
4. tryAcquire(long timeout, TimeUnit unit)
如果在给定时间内有可用获取资源或者线程没有被中断,则线程尝试获取资源。
公平机制本质是调用AQS
中的tryAcquireSharedNanos(int arg, long nanosTimeout)
如果获取资源成功,则返回true
,并且将资源池可用资源减1。
如果没有可用资源线程会一直等待,直到出现以下三种情况:
- 其他线程释放资源,并且当前线程是下一个等着获取资源的线程
- 其他线程中断,当前线程可用。
- 超时
/** * @param timeout 超时时间 * @param unit 超时时间单位 * @return * @throws InterruptedException */ public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
5. release()
释放资源,资源池可用资源+1
方法实现是直接调用AQS中的releaseShared(int arg)
方法
public void release() { sync.releaseShared(1); }
6. acquire(int permits)
获取特定数量资源 调用AQS中的acquireSharedInterruptibly(int arg)方法
- 获取给定数量资源,资源不够则阻塞等待,如果线程被中断则放弃等待
其中permits表示可获取的资源public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
7. acquireUninterruptibly(int permits)
获取特定数量资源(不可中断)
在可用资源够用之前,需要一直等待获取,即使当前线程被其他线程中断,也会一直阻塞等待
public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }
8. tryAcquire(int permits)
获取特定数量的资源数 非公平的获取
如果资源数不够,则直接返回false,放弃等待
public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; }
9. tryAcquire(int permits, long timeout, TimeUnit unit)
带有超时的尝试获取特定资源数
本质是调用AQS中的tryAcquireSharedNanos(int arg, long nanosTimeout)方法
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }
10. release(int permits)
释放特定资源数
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
4. 实例
设置30个正在执行的线程,但是每次只允许10个并发线程执行,5s之后执行问,其他线程可以获取资源。
/** * @time 2019/12/14 14:54 * @Description 信号量的例子 * * 设置30个正在执行的线程,但是只允许10个并发线程执行 */ public class SemaphoreTest { private static final int THREAD_TOTAL = 30; /** * 初始化一个固定的线程池 */ private static ExecutorService poolExecutor = new ThreadPoolExecutor(THREAD_TOTAL, THREAD_TOTAL, 60, TimeUnit.SECONDS, new ArrayBlockingQueue <>(1024), new ThreadPoolExecutor.AbortPolicy()); /** * 初始化信号量 */ static Semaphore semaphore = new Semaphore(10,false); public static void main(String[] args) { for(int i = 0; i < THREAD_TOTAL; i++){ poolExecutor.execute(()-> { try { semaphore.acquire(); System.out.println("允许获取资源,序号"); // 模拟工作1S Thread.sleep(5000); // 释放资源,然后 semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }); } poolExecutor.shutdown(); } }
使用场景
Semaphore
作用类似于开头所说,用于控制线程访问特定的资源,通常用于限流框架中,用于控制流量的进入、比如hystrix
框架。
全部评论
(0) 回帖