if (request instanceof LearnerSyncRequest) { zks.getLeader().processSync((LearnerSyncRequest)request); } else { // commitProcessor nextProcessor.processRequest(request); if (request.hdr != null) { // We need to sync and get consensus on any transactions try { // 创建Propose, 发送到follower zks.getLeader().propose(request); } catch (XidRolloverException e) { thrownew RequestProcessorException(e.getMessage(), e); } // 写入事务日志 syncProcessor.processRequest(request); } } }
// Leader.java public Proposal propose(Request request)throws XidRolloverException { /** * Address the rollover issue. All lower 32bits set indicate a new leader * election. Force a re-election instead. See ZOOKEEPER-1277 */ if ((request.zxid & 0xffffffffL) == 0xffffffffL) { String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start"; shutdown(msg); thrownew XidRolloverException(msg); }
ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); try { request.hdr.serialize(boa, "hdr"); if (request.txn != null) { request.txn.serialize(boa, "txn"); } baos.close(); } catch (IOException e) { LOG.warn("This really should be impossible", e); } QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos.toByteArray(), null);
Proposal p = new Proposal(); p.packet = pp; p.request = request; synchronized (this) { if (LOG.isDebugEnabled()) { LOG.debug("Proposing:: " + request); }
// SyncRequestProcessor.java @Override publicvoidrun(){ try { int logCount = 0;
// we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time int randRoll = r.nextInt(snapCount / 2); while (true) { Request si = null; if (toFlush.isEmpty()) { si = queuedRequests.take(); } else { si = queuedRequests.poll(); if (si == null) { // 队列里的propose都写入事务日志, 执行flush到磁盘 flush(toFlush); continue; } } if (si == requestOfDeath) { break; } if (si != null) { // track the number of records written to the log // 追加日志 if (zks.getZKDatabase().append(si)) { logCount++; if (logCount > (snapCount / 2 + randRoll)) { randRoll = r.nextInt(snapCount / 2); // roll the log zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); } else { // 每隔一定次数保存快照 snapInProcess = new Thread("Snapshot Thread") { publicvoidrun(){ try { zks.takeSnapshot(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } } }; snapInProcess.start(); } logCount = 0; } } elseif (toFlush.isEmpty()) { // optimization for read heavy workloads // iff this is a read, and there are no pending // flushes (writes), then just pass this to the next // processor nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } continue; } toFlush.add(si); // 事务日志大于1000条flush到磁盘 if (toFlush.size() > 1000) { flush(toFlush); } } } } catch (Throwable t) { LOG.error("Severe unrecoverable error, exiting", t); running = false; System.exit(11); } LOG.info("SyncRequestProcessor exited!"); }
publicvoidrun(){ try { Request nextPending = null; while (!finished) { int len = toProcess.size(); for (int i = 0; i < len; i++) { // leader Leader.ToBeAppliedRequestProcessor // follower FinalRequestProcessor 结束处理 nextProcessor.processRequest(toProcess.get(i)); } toProcess.clear(); synchronized (this) { if ((queuedRequests.size() == 0 || nextPending != null) && committedRequests.size() == 0) { // 在多数节点ack之前会阻塞 wait(); continue; } // First check and see if the commit came in for the pending // request if ((queuedRequests.size() == 0 || nextPending != null) && committedRequests.size() > 0) { Request r = committedRequests.remove(); /* * We match with nextPending so that we can move to the * next request when it is committed. We also want to * use nextPending because it has the cnxn member set * properly. */ // 只有当已经可以commit消息保存到toProcess if (nextPending != null && nextPending.sessionId == r.sessionId && nextPending.cxid == r.cxid) { // we want to send our version of the request. // the pointer to the connection in the request nextPending.hdr = r.hdr; nextPending.txn = r.txn; nextPending.zxid = r.zxid; toProcess.add(nextPending); nextPending = null; } else { // this request came from someone else so just // send the commit packet toProcess.add(r); } } }
// We haven't matched the pending requests, so go back to // waiting // 等待处理第一个请求, 保证顺序性 if (nextPending != null) { continue; }
synchronized (this) { // Process the next requests in the queuedRequests while (nextPending == null && queuedRequests.size() > 0) { Request request = queuedRequests.remove(); switch (request.type) { case OpCode.create: case OpCode.delete: case OpCode.setData: case OpCode.multi: case OpCode.setACL: case OpCode.createSession: case OpCode.closeSession: nextPending = request; break; case OpCode.sync: if (matchSyncs) { nextPending = request; } else { toProcess.add(request); } break; default: toProcess.add(request); } } } } } catch (InterruptedException e) { LOG.warn("Interrupted exception while waiting", e); } catch (Throwable e) { LOG.error("Unexpected exception causing CommitProcessor to exit", e); } LOG.info("CommitProcessor exited loop!"); }
synchronizedpublicvoidcommit(Request request){ if (!finished) { if (request == null) { LOG.warn("Committed a null!", new Exception("committing a null! ")); return; } if (LOG.isDebugEnabled()) { LOG.debug("Committing request:: " + request); } // 增加commitRequest, 唤醒线程 committedRequests.add(request); notifyAll(); } }
publicvoidprocessRequest(Request request){ // ... // request.addRQRec(">final"); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { traceMask = ZooTrace.SERVER_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'E', request, ""); } ProcessTxnResult rc = null; synchronized (zks.outstandingChanges) { while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.get(0).zxid <= request.zxid) { ChangeRecord cr = zks.outstandingChanges.remove(0); if (cr.zxid < request.zxid) { LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + request.zxid); } if (zks.outstandingChangesForPath.get(cr.path) == cr) { zks.outstandingChangesForPath.remove(cr.path); } } if (request.hdr != null) { TxnHeader hdr = request.hdr; Record txn = request.txn;
// 修改内存数据库 rc = zks.processTxn(hdr, txn); } // do not add non quorum packets to the queue. if (Request.isQuorum(request.type)) { zks.getZKDatabase().addCommittedProposal(request); } }
// 关闭session if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) { ServerCnxnFactory scxn = zks.getServerCnxnFactory(); // this might be possible since // we might just be playing diffs from the leader if (scxn != null && request.cnxn == null) { // calling this if we have the cnxn results in the client's // close session response being lost - we've already closed // the session/socket here before we can send the closeSession // in the switch block below // 关闭连接, 移除watcher scxn.closeSession(request.sessionId); return; } }
@Override publicvoidrun(){ try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, ""); } if (request == Request.requestOfDeath) { break; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response nextProcessor.processRequest(request);
// We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this follower has pending, so we // add it to pendingSyncs. switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getFollower().request(request); break; case OpCode.create: case OpCode.delete: case OpCode.setData: case OpCode.setACL: case OpCode.createSession: case OpCode.closeSession: case OpCode.multi: // 转发请求到leader zks.getFollower().request(request); break; } } } catch (Exception e) { LOG.error("Unexpected exception causing exit", e); } LOG.info("FollowerRequestProcessor exited loop!"); }
publicvoidprocessRequest(Request request){ if (!finished) { queuedRequests.add(request); } }
// ObserverZooKeeperServer.java @Override protectedvoidsetupRequestProcessors(){ // We might consider changing the processor behaviour of // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getObserver())); syncProcessor.start(); }
@Override publicvoidrun(){ try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, ""); } if (request == Request.requestOfDeath) { break; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response nextProcessor.processRequest(request);
// We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this Observer has pending, so we // add it to pendingSyncs. switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getObserver().request(request); break; case OpCode.create: case OpCode.delete: case OpCode.setData: case OpCode.setACL: case OpCode.createSession: case OpCode.closeSession: case OpCode.multi: zks.getObserver().request(request); break; } } } catch (Exception e) { LOG.error("Unexpected exception causing exit", e); } LOG.info("ObserverRequestProcessor exited loop!"); }