-
Notifications
You must be signed in to change notification settings - Fork 1
/
proposer.hpp
102 lines (90 loc) · 3.48 KB
/
proposer.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
//
// Created by David Chu on 10/4/20.
//
#ifndef C__PAXOS_PROPOSER_HPP
#define C__PAXOS_PROPOSER_HPP
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <queue>
#include <algorithm>
#include <functional>
#include "message.pb.h"
#include "utils/network.hpp"
#include "utils/config.hpp"
#include "utils/metrics.hpp"
#include "models/message.hpp"
#include "models/log.hpp"
#include "models/client_component.hpp"
#include "models/server_component.hpp"
#include "models/heartbeat_component.hpp"
#include "lib/storage/anna.hpp"
class proposer {
public:
explicit proposer(int id, int numAcceptorGroups); //TODO once matchmakers are integrated, numAcceptorGroups is dynamic
private:
const int id; // 0 indexed, no gaps
const int numAcceptorGroups;
std::shared_ptr<metrics::variables> metricsVars;
anna* annaClient;
network* zmqNetwork;
client_component* proposers;
heartbeat_component* proxyLeaderHeartbeat;
server_component* proxyLeaders;
server_component* batchers;
int ballotNum = 0; // must be at least 1 the first time it is sent
Ballot ballot;
bool isLeader = false;
time_t lastLeaderHeartbeat = 0;
std::unordered_set<std::string> remainingAcceptorGroupsForScouts = {};
std::queue<int> logHoles = {};
int nextSlot = 0;
std::vector<Log::stringLog> acceptorGroupCommittedLogs = {};
std::unordered_map<std::string, Log::pValueLog> acceptorGroupUncommittedLogs = {}; //key = acceptor group ID
two_p_set acceptorGroupIdSet;
std::vector<std::string> acceptorGroupIds = {};
int nextAcceptorGroup = 0;
void listenToAnna(const std::string& key, const two_p_set& twoPSet, time_t now);
void listenToBatcher(const network::addressPayloadsMap& addressToPayloads);
void listenToProxyLeader(const network::addressPayloadsMap& addressToPayloads, time_t now);
void listenToProposer(const network::addressPayloadsMap& addressToPayloads, time_t now);
/**
* Broadcast p1a to acceptors to become the leader.
* @invariant isLeader = false
*/
void sendScouts();
/**
* Check if proxy leaders have replied with a win in phase 1.
* If every acceptor group has replied, then we are the new leader, and we should merge the committed/uncommitted logs
* of each acceptor group.
* Otherwise, reset values.
*
* @invariant isLeader = false, shouldSendScouts = false, uncommittedProposals.empty()
*/
void handleP1B(const ProxyLeaderToProposer& message);
/**
* Update log with newly committed slots from acceptors. Remove committed proposals from unproposedPayloads.
* Propose uncommitted slots, add to uncommittedProposals
* @invariant uncommittedProposals.empty()
*/
void mergeLogs();
/**
* Check if a proxy leader has committed values for a slot. If yes, then confirm that slot as committed.
* If we've been preempted, then that means another has become the leader. Reset values.
* @invariant isLeader = true
*/
void handleP2B(const ProxyLeaderToProposer& message);
/**
* Reset all values when this proposer learns that it is no longer the leader.
*/
void noLongerLeader();
void broadcastIamLeader();
/**
* Increments (round robin) the next acceptor group a payload will be proposed to.
* @warning Does NOT lock acceptorMutex. The caller MUST lock it.
* @return The ID of the acceptor group to propose to.
*/
const std::string& fetchNextAcceptorGroupId();
};
#endif //C__PAXOS_PROPOSER_HPP