raft: joint consensus, use unordered_set for server_address list

This commit is contained in:
Konstantin Osipov
2020-10-14 20:40:59 +03:00
parent df944f953c
commit 1ca738d9a2
6 changed files with 37 additions and 20 deletions

View File

@@ -46,7 +46,8 @@ struct server_address {
};
struct configuration {
std::vector<raft::server_address> servers;
std::unordered_set<raft::server_address> current;
std::unordered_set<raft::server_address> previous;
};
struct snapshot {

View File

@@ -83,7 +83,7 @@ bool follower_progress::can_send_to() {
return false;
}
void tracker::set_configuration(const std::vector<server_address>& servers, index_t next_idx) {
void tracker::set_configuration(const server_address_set& servers, index_t next_idx) {
for (auto& s : servers) {
if (this->progress::find(s.id) != this->progress::end()) {
continue;

View File

@@ -86,7 +86,7 @@ public:
follower_progress& find(server_id dst) {
return this->progress::find(dst)->second;
}
void set_configuration(const std::vector<server_address>& servers, index_t next_idx);
void set_configuration(const server_address_set& servers, index_t next_idx);
// Calculate the current commit index based on the current
// simple or joint quorum.
@@ -114,7 +114,7 @@ class votes {
// The candidate always votes for self.
size_t _granted = 1;
public:
void set_configuration(const std::vector<server_address>& servers) {
void set_configuration(const server_address_set& servers) {
_cluster_size = servers.size();
}

View File

@@ -21,6 +21,7 @@
#pragma once
#include <vector>
#include <unordered_set>
#include <functional>
#include <boost/container/deque.hpp>
#include <seastar/core/lowres_clock.hh>
@@ -64,27 +65,43 @@ using server_info = bytes;
struct server_address {
server_id id;
server_info info;
bool operator==(const server_address& rhs) const {
return id == rhs.id;
}
};
} // end of namespace raft
namespace std {
template <> struct hash<raft::server_address> {
size_t operator()(const raft::server_address& address) const {
return std::hash<raft::server_id>{}(address.id);
}
};
} // end of namespace std
namespace raft {
using server_address_set = std::unordered_set<server_address>;
struct configuration {
// Used during the transitioning period of configuration
// changes.
std::vector<server_address> previous;
// Contains the current configuration. When configuration
// change is in progress, contains the new configuration.
std::vector<server_address> current;
server_address_set current;
// Used during the transitioning period of configuration
// changes.
server_address_set previous;
configuration(std::initializer_list<server_id> ids) {
current.reserve(ids.size());
for (auto&& id : ids) {
current.emplace_back(server_address{std::move(id)});
current.emplace(server_address{std::move(id)});
}
}
configuration() = default;
configuration(std::vector<server_address> servers)
: servers(std::move(servers))
{}
configuration(server_address_set current_arg = {}, server_address_set previous_arg = {})
: current(std::move(current_arg)), previous(std::move(previous_arg)) {}
};
struct log_entry {

View File

@@ -32,12 +32,9 @@
namespace raft{
// these operators provided exclusively for testing purposes
static bool operator==(const server_address& lhs, const server_address& rhs) {
return lhs.id == rhs.id;
}
static bool operator==(const configuration& lhs, const configuration& rhs) {
return lhs.servers == rhs.servers;
return lhs.current == rhs.current && lhs.previous == rhs.previous;
}
static bool operator==(const snapshot& lhs, const snapshot& rhs) {

View File

@@ -193,6 +193,7 @@ public:
};
struct initial_state {
raft::server_address address;
raft::term_t term = raft::term_t(1);
raft::server_id vote;
std::vector<raft::log_entry> log;
@@ -324,11 +325,12 @@ future<std::vector<std::pair<std::unique_ptr<raft::server>, state_machine*>>> cr
for (size_t i = 0; i < states.size(); i++) {
auto uuid = utils::UUID(0, i + 1); // Custom sequential debug id; 0 is invalid
config.current.push_back(raft::server_address{uuid});
states[i].address = raft::server_address{uuid};
config.current.emplace(states[i].address);
}
for (size_t i = 0; i < states.size(); i++) {
auto& s = config.current[i];
auto& s = states[i].address;
states[i].snapshot.config = config;
snapshots[s.id] = states[i].snp_value;
auto& raft = *rafts.emplace_back(create_raft_server(s.id, apply, states[i], apply_entries, type)).first;