1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
| public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException { this(connectString, sessionTimeout, watcher, false); }
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser( connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }
public void start() { sendThread.start(); eventThread.start(); }
class SendThread extends Thread { private final ClientCnxnSocket clientCnxnSocket;
public void run() { clientCnxnSocket.introduce(this, sessionId); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = System.currentTimeMillis(); while (state.isAlive()) { try { if (!clientCnxnSocket.isConnected()) { if (!isFirstConnect) { try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } if (closing || !state.isAlive()) { break; } startConnect(); clientCnxnSocket.updateLastSendAndHeard(); }
} catch (Throwable e) { } } }
private void startConnect() throws IOException { state = States.CONNECTING;
InetSocketAddress addr; if (rwServerAddress != null) { addr = rwServerAddress; rwServerAddress = null; } else { addr = hostProvider.next(1000); }
setName(getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); try { zooKeeperSaslClient = new ZooKeeperSaslClient("zookeeper/" + addr.getHostName()); } catch (LoginException e) { LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it."); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } logStartConnect(addr);
clientCnxnSocket.connect(addr); }
void primeConnection() throws IOException { LOG.info("Socket connection established to " + clientCnxnSocket.getRemoteSocketAddress() + ", initiating session"); isFirstConnect = false; long sessId = (seenRwServerBefore) ? sessionId : 0; ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); synchronized (outgoingQueue) { if (!disableAutoWatchReset) { List<String> dataWatches = zooKeeper.getDataWatches(); List<String> existWatches = zooKeeper.getExistWatches(); List<String> childWatches = zooKeeper.getChildWatches(); if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()) { SetWatches sw = new SetWatches(lastZxid, prependChroot(dataWatches), prependChroot(existWatches), prependChroot(childWatches)); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.setWatches); h.setXid(-8); Packet packet = new Packet(h, new ReplyHeader(), sw, null, null); outgoingQueue.addFirst(packet); } }
for (AuthData id : authInfo) { outgoingQueue.addFirst(new Packet(new RequestHeader(-4, OpCode.auth), null, new AuthPacket(0, id.scheme, id.data), null, null)); }
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly)); }
clientCnxnSocket.enableReadWriteOnly(); if (LOG.isDebugEnabled()) { LOG.debug("Session establishment request sent on " + clientCnxnSocket.getRemoteSocketAddress()); } } }
void connect(InetSocketAddress addr) throws IOException { SocketChannel sock = createSock(); try { registerAndConnect(sock, addr); } catch (IOException e) { LOG.error("Unable to open socket to " + addr); sock.close(); throw e; } initialized = false;
lenBuffer.clear(); incomingBuffer = lenBuffer; }
SocketChannel createSock() throws IOException { SocketChannel sock; sock = SocketChannel.open(); sock.configureBlocking(false); sock.socket().setSoLinger(false, -1); sock.socket().setTcpNoDelay(true); return sock; }
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException { sockKey = sock.register(selector, SelectionKey.OP_CONNECT); boolean immediateConnect = sock.connect(addr); if (immediateConnect) { sendThread.primeConnection(); } }
|