1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| public synchronized void start() { loadDataBase(); cnxnFactory.start(); startLeaderElection(); super.start(); }
private void loadDataBase() {
zkDb.loadDataBase();
}
synchronized public void startLeaderElection() {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
this.electionAlg = createElectionAlgorithm(electionType); }
protected Election createElectionAlgorithm(int electionAlgorithm) { Election le = null;
qcm = new QuorumCnxManager(this); QuorumCnxManager.Listener listener = qcm.listener; if (listener != null) { listener.start(); le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); }
return le; }
public long loadDataBase() throws IOException { PlayBackListener listener = new PlayBackListener() { public void onTxnLoaded(TxnHeader hdr, Record txn) { Request r = new Request(null, 0, hdr.getCxid(), hdr.getType(), null, null); r.txn = txn; r.hdr = hdr; r.zxid = hdr.getZxid(); addCommittedProposal(r); } };
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, listener); initialized = true; return zxid; }
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); TxnIterator itr = txnLog.read(dt.lastProcessedZxid + 1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; while (true) { hdr = itr.getHeader(); if (hdr == null) { return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error(highestZxid + "(higestZxid) > " + hdr.getZxid() + "(next log) for type " + hdr.getType()); } else { highestZxid = hdr.getZxid(); } try { processTransaction(hdr, dt, sessions, itr.getTxn()); } catch (KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } listener.onTxnLoaded(hdr, itr.getTxn()); if (!itr.next()) break; } return highestZxid; }
|