// NIOServerCnxn.java voiddoIO(SelectionKey k)throws InterruptedException { try { if (sock == null) { LOG.warn("trying to do i/o on a null socket for session:0x" + Long.toHexString(sessionId));
return; } if (k.isReadable()) { // ... } if (k.isWritable()) { // ... } } catch (CancelledKeyException e) { LOG.warn("Exception causing close of session 0x" + Long.toHexString(sessionId) + " due to " + e); if (LOG.isDebugEnabled()) { LOG.debug("CancelledKeyException stack trace", e); } close(); } catch (CloseRequestException e) { // expecting close to log session closure close(); } catch (EndOfStreamException e) { LOG.warn("caught end of stream exception", e); // tell user why
// expecting close to log session closure close(); } catch (IOException e) { LOG.warn("Exception causing close of session 0x" + Long.toHexString(sessionId) + " due to " + e); if (LOG.isDebugEnabled()) { LOG.debug("IOException stack trace", e); } close(); } }
publicvoidclose(){ synchronized (factory.cnxns) { // if this is not in cnxns then it's already closed if (!factory.cnxns.remove(this)) { return; }
synchronized (factory.ipMap) { Set<NIOServerCnxn> s = factory.ipMap.get(sock.socket().getInetAddress()); s.remove(this); }
factory.unregisterConnection(this);
if (zkServer != null) { // 移除watcher zkServer.removeCnxn(this); }
closeSock();
if (sk != null) { try { // need to cancel this selection key from the selector sk.cancel(); } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("ignoring exception during selectionkey cancel", e); } } } } }
//check to see if the leader zxid is lower than ours //this should never happen but is just a safety check long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); if (newEpoch < self.getAcceptedEpoch()) { LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid) + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch())); thrownew IOException("Error: Epoch of leader is lower"); } // 从leader同步数据 syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket(); while (self.isRunning()) { // 读取从leader同步过来的数据 readPacket(qp); // 数据处理 processPacket(qp); } } catch (IOException e) { LOG.warn("Exception when following the leader", e); try { sock.close(); } catch (IOException e1) { e1.printStackTrace(); }