简述
Semaphore(信号量)用来控制同时访问特定资源的线程数,它通过协调各个线程,以保证合理地使用公共资源
实现原理
Semaphore中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。在执行操作时需要首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么acquire方法将阻塞直到有许可(或者直到被中断或者操作超时),当执行完业务通过release方法将许可归还,以便其他线程能够获得许可继续执行
构造方法
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }复制代码
Semaphore内部基于sync实现,参数permits表示许可证数量,参数fair表示是否采取公平策略
公平策略
sync继承AQS,只需要重写其模板中的方法即可,acquire方法如下:
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * AQS共享式获取同步状态模板方法(能响应中断) */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { for (;;) { // //判断该线程是否在队列的头部 if (hasQueuedPredecessors()) return -1; // 获取当前剩余信号量许可 int available = getState(); // 计算acquire()之后,剩余的信号量许可数 int remaining = available - acquires; // CAS设置信号量许可 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }复制代码
非公平策略
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }复制代码
公平与非公平区别在于公平策略会调用hasQueuedPredecessors()方法判断线程所关联的节点是否在队列头位置
释放
public void release() { sync.releaseShared(1); } /** * AQS共享式释放同步状态模板方法 */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { for (;;) { // 获取当前剩余许可数 int current = getState(); // 加上待释放许可数 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // CAS设置新许可数 if (compareAndSetState(current, next)) return true; } }复制代码
示例
public class SemaphoreTest { public static void main(String[] args) { final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "进入车库"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); System.out.println(Thread.currentThread().getName() + "出车库"); } } }).start(); } } }复制代码
感谢
《java并发编程的艺术》