1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| public Lease acquire() throws Exception { Collection<Lease> leases = acquire(1, 0, null); return leases.iterator().next(); }
public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception { long startMs = System.currentTimeMillis(); boolean hasWait = (unit != null); long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
ImmutableList.Builder<Lease> builder = ImmutableList.builder(); boolean success = false; try { while ( qty-- > 0 ) { int retryCount = 0; long startMillis = System.currentTimeMillis(); boolean isDone = false; while ( !isDone ) { switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs) ) { case CONTINUE: { isDone = true; break; }
} } } success = true; } finally { if ( !success ) { returnAll(builder.build()); } }
return builder.build(); }
private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception { if ( client.getState() != CuratorFrameworkState.STARTED ) { return InternalAcquireResult.RETURN_NULL; }
if ( hasWait ) { long thisWaitMs = getThisWaitMs(startMs, waitMs); if ( !lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS) ) { return InternalAcquireResult.RETURN_NULL; } } else { lock.acquire(); }
Lease lease = null;
try { PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL); String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME)); String nodeName = ZKPaths.getNodeFromPath(path); lease = makeLease(path);
if ( debugAcquireLatch != null ) { debugAcquireLatch.await(); }
synchronized (this) { for (;;) { List<String> children; try { children = client.getChildren().usingWatcher(watcher).forPath(leasesPath); } catch ( Exception e ) { if ( debugFailedGetChildrenLatch != null ) { debugFailedGetChildrenLatch.countDown(); } returnLease(lease); throw e; } if ( !children.contains(nodeName) ) { log.error("Sequential path not found: " + path); returnLease(lease); return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE; }
if ( children.size() <= maxLeases ) { break; } if ( hasWait ) { long thisWaitMs = getThisWaitMs(startMs, waitMs); if ( thisWaitMs <= 0 ) { returnLease(lease); return InternalAcquireResult.RETURN_NULL; } wait(thisWaitMs); } else { wait(); } } } } finally { lock.release(); } builder.add(Preconditions.checkNotNull(lease)); return InternalAcquireResult.CONTINUE; }
|