信号量 Semaphore Redisson提供分布式信号量,接口和用法与juc.Semaphore相似,使用方式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 RSemaphore semaphore = redisson.getSemaphore("semaphore" ); semaphore.addPermits(10 ); semaphore.acquire(); semaphore.acquireAsync(); semaphore.acquire(23 ); semaphore.tryAcquire(); semaphore.tryAcquireAsync(); semaphore.tryAcquire(23 , TimeUnit.SECONDS); semaphore.tryAcquireAsync(23 , TimeUnit.SECONDS); semaphore.release(10 ); semaphore.release(); semaphore.releaseAsync();
新增凭证 添加凭证实际上就是给redis中的key设置一个值。核心方法如下:
1 2 3 4 5 6 7 8 9 10 public RFuture<Void> addPermitsAsync (int permits) { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call('get', KEYS[1]); " + "if (value == false) then " + "value = 0;" + "end;" + "redis.call('set', KEYS[1], value + ARGV[1]); " + "redis.call('publish', KEYS[2], value + ARGV[1]); " , Arrays.asList(getRawName(), getChannelName()), permits); }
获取凭证 获取凭证时,只需判断剩余凭证大于要申请的凭证数量即可申请成功,如果获取失败就订阅凭证释放消息,收到释放凭证消息后循环获取。获取凭证核心方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public RFuture<Boolean> tryAcquireAsync (int permits) { if (permits < 0 ) { throw new IllegalArgumentException("Permits amount can't be negative" ); } if (permits == 0 ) { return RedissonPromise.newSucceededFuture(true ); } return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('get', KEYS[1]); " + "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " + "local val = redis.call('decrby', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;" , Collections.<Object>singletonList(getRawName()), permits); }
释放凭证 释放凭证时直接给信号量增加相应数量的凭证,然后发布一条释放凭证的消息通知订阅的等待线程。释放凭证核心方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public RFuture<Void> releaseAsync (int permits) { if (permits < 0 ) { throw new IllegalArgumentException("Permits amount can't be negative" ); } if (permits == 0 ) { return RedissonPromise.newSucceededFuture(null ); } RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call('incrby', KEYS[1], ARGV[1]); " + "redis.call('publish', KEYS[2], value); " , Arrays.asList(getRawName(), getChannelName()), permits); if (log.isDebugEnabled()) { future.onComplete((o, e) -> { if (e == null ) { log.debug("released, permits: {}, name: {}" , permits, getName()); } }); } return future; }
闭锁 CountDownLatch Redission提供闭锁功能,接口和用法与juc.CountDownLatch相似,使用方式如下:
1 2 3 4 5 6 7 RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch" ); latch.trySetCount(1 ); latch.await(); RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch" ); latch.countDown();
设置计数器值 CountDownLatch计数器值只有在未设置过才能设置成功,Redission提供的CountDownLatch设置计数器值核心方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public RFuture<Boolean> trySetCountAsync (long count) { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('exists', KEYS[1]) == 0 then " + "redis.call('set', KEYS[1], ARGV[2]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end" , Arrays.<Object>asList(getRawName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE, count); }
减少计数器值 1 2 3 4 5 6 7 8 9 10 public RFuture<Void> countDownAsync () { return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local v = redis.call('decr', KEYS[1]);" + "if v <= 0 then redis.call('del', KEYS[1]) end;" + "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;" , Arrays.<Object>asList(getRawName(), getChannelName()), CountDownLatchPubSub.ZERO_COUNT_MESSAGE); }
等待计数器值达到0 当某一个线程调用awite方法时,线程会阻塞直到CountDownLatch计数器被扣减为1,实际上就是不断循环查询计数器值,直到为0时返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void await () throws InterruptedException { if (getCount() == 0 ) { return ; } RFuture<RedissonCountDownLatchEntry> future = subscribe(); try { commandExecutor.syncSubscriptionInterrupted(future); while (getCount() > 0 ) { future.getNow().getLatch().await(); } } finally { unsubscribe(future); } }