Merge "raft: second series of preparatory patches for group 0 discovery" from Kostja

Miscellaneous preparatory patches for group 0 discovery.

* scylla-dev/raft-group-0-part-2-v4:
  raft: (service) servers map is gid -> server, not sid -> server
  system_keyspace: raft.group_id and raft_snapshots.group_id are TIMEUUID
  raft: (server) wait for configuration transition to complete
  raft: (server) implement raft::server::get_configuration()
  raft: (service) don't throw from schema state machine
  raft: (service) permit some scylla.raft cells to be empty
  raft: (service) properly handle failure to add a server
  raft: implement is_transient_error()
This commit is contained in:
Tomasz Grabiec
2021-06-17 00:12:23 +02:00
10 changed files with 118 additions and 74 deletions

View File

@@ -220,7 +220,7 @@ schema_ptr raft() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, RAFT);
return schema_builder(NAME, RAFT, std::optional(id))
.with_column("group_id", long_type, column_kind::partition_key)
.with_column("group_id", timeuuid_type, column_kind::partition_key)
// raft log part
.with_column("index", long_type, column_kind::clustering_key)
.with_column("term", long_type)
@@ -246,7 +246,7 @@ schema_ptr raft_snapshots() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, RAFT_SNAPSHOTS);
return schema_builder(NAME, RAFT_SNAPSHOTS, std::optional(id))
.with_column("group_id", long_type, column_kind::partition_key)
.with_column("group_id", timeuuid_type, column_kind::partition_key)
.with_column("id", uuid_type, column_kind::clustering_key)
.with_column("idx", long_type)
.with_column("term", long_type)

View File

@@ -223,6 +223,16 @@ struct config_error : public error {
using error::error;
};
// True if a failure to execute a Raft operation can be re-tried,
// perhaps with a different server.
inline bool is_transient_error(const std::exception& e) {
return dynamic_cast<const not_a_leader*>(&e) ||
dynamic_cast<const commit_status_unknown*>(&e) ||
dynamic_cast<const dropped_entry*>(&e) ||
dynamic_cast<const conf_change_in_progress*>(&e) ||
dynamic_cast<const stopped_error*>(&e);
}
struct snapshot {
// Index and term of last entry in the snapshot
index_t idx = index_t(0);

View File

@@ -65,6 +65,7 @@ public:
future<> add_entry(command command, wait_type type);
future<snapshot_reply> apply_snapshot(server_id from, install_snapshot snp) override;
future<> set_configuration(server_address_set c_new) override;
raft::configuration get_configuration() const override;
future<> start() override;
future<> abort() override;
term_t get_current_term() const override;
@@ -716,7 +717,20 @@ future<> server_impl::set_configuration(server_address_set c_new) {
co_return;
}
_stats.add_config++;
co_return co_await add_entry_internal(raft::configuration{std::move(c_new)}, wait_type::committed);
co_await add_entry_internal(raft::configuration{std::move(c_new)}, wait_type::committed);
// Above we co_wait that the joint configuration is committed.
// Immediately, without yield, once the FSM discovers
// this, it appends non-joint entry. Hence,
// at this point in execution, non-joint entry is in the log.
// By waiting for a follow up dummy to get committed, we
// automatically wait for the non-joint entry to get
// committed.
co_await add_entry_internal(log_entry::dummy(), wait_type::committed);
}
raft::configuration
server_impl::get_configuration() const {
return _fsm->get_configuration();
}
void server_impl::register_metrics() {

View File

@@ -71,8 +71,19 @@ public:
// Can be called on a leader only, otherwise throws not_a_leader.
// Cannot be called until previous set_configuration() completes
// otherwise throws conf_change_in_progress exception.
//
// Waits until configuration completes, i.e. the server left the joint
// configuration. The server will apply a dummy entry to
// make sure this happens.
//
// Note: committing a dummy entry extends the opportunity for
// uncertainty, thus commit_status_unknown exception may be
// returned even in case of a successful config change.
virtual future<> set_configuration(server_address_set c_new) = 0;
// Return the currently known configuration
virtual raft::configuration get_configuration() const = 0;
// Load persisted state and start background work that needs
// to run for this Raft server to function; The object cannot
// be used until the returned future is resolved.

View File

@@ -24,7 +24,6 @@
#include "service/raft/schema_raft_state_machine.hh"
#include "service/raft/raft_gossip_failure_detector.hh"
#include "raft/raft.hh"
#include "message/messaging_service.hh"
#include "cql3/query_processor.hh"
#include "gms/gossiper.hh"
@@ -48,10 +47,10 @@ void raft_services::init_rpc_verbs() {
const rpc::client_info& cinfo,
const raft::group_id& gid, raft::server_id from, raft::server_id dst, auto handler) {
return container().invoke_on(shard_for_group(gid),
[addr = netw::messaging_service::get_source(cinfo).addr, from, dst, handler] (raft_services& self) mutable {
[addr = netw::messaging_service::get_source(cinfo).addr, from, dst, gid, handler] (raft_services& self) mutable {
// Update the address mappings for the rpc module
// in case the sender is encountered for the first time
auto& rpc = self.get_rpc(dst);
auto& rpc = self.get_rpc(gid);
// The address learnt from a probably unknown server should
// eventually expire
self.update_address_mapping(from, std::move(addr), true);
@@ -132,16 +131,7 @@ seastar::future<> raft_services::init() {
// Once a Raft server starts, it soon times out
// and starts an election, so RPC must be ready by
// then to send VoteRequest messages.
init_rpc_verbs();
auto uninit_rpc_verbs = defer([this] { this->uninit_rpc_verbs().get(); });
// schema raft server instance always resides on shard 0
if (this_shard_id() == 0) {
// FIXME: Server id will change each time scylla server restarts,
// need to persist it or find a deterministic way to compute!
raft::server_id id = raft::server_id::create_random_id();
co_await add_server(id, create_schema_server(id));
}
uninit_rpc_verbs.cancel();
co_return init_rpc_verbs();
}
seastar::future<> raft_services::uninit() {
@@ -150,48 +140,56 @@ seastar::future<> raft_services::uninit() {
});
}
raft_rpc& raft_services::get_rpc(raft::server_id id) {
auto it = _servers.find(id);
raft_services::server_for_group& raft_services::get_server_for_group(raft::group_id gid) {
auto it = _servers.find(gid);
if (it == _servers.end()) {
on_internal_error(rslog, format("No raft server found with id = {}", id));
throw raft_group_not_found(gid);
}
return *it->second.rpc;
return it->second;
}
raft_services::create_server_result raft_services::create_schema_server(raft::server_id id) {
auto rpc = std::make_unique<raft_rpc>(_ms, *this, schema_raft_state_machine::gid, id);
raft_rpc& raft_services::get_rpc(raft::group_id gid) {
return get_server_for_group(gid).rpc;
}
raft::server& raft_services::get_server(raft::group_id gid) {
return *(get_server_for_group(gid).server);
}
raft_services::server_for_group raft_services::create_server_for_group(raft::group_id gid) {
raft::server_id my_id = raft::server_id::create_random_id();
auto rpc = std::make_unique<raft_rpc>(_ms, *this, gid, my_id);
// Keep a reference to a specific RPC class.
auto& rpc_ref = *rpc;
auto storage = std::make_unique<raft_sys_table_storage>(_qp.local(), schema_raft_state_machine::gid);
auto storage = std::make_unique<raft_sys_table_storage>(_qp.local(), gid);
auto state_machine = std::make_unique<schema_raft_state_machine>();
auto server = raft::create_server(my_id, std::move(rpc), std::move(state_machine),
std::move(storage), _fd, raft::server::configuration());
return std::pair(raft::create_server(id,
std::move(rpc),
std::move(state_machine),
std::move(storage),
_fd,
raft::server::configuration()), // use default raft server configuration
&rpc_ref);
// initialize the corresponding timer to tick the raft server instance
auto ticker = std::make_unique<ticker_type>([srv = server.get()] { srv->tick(); });
return server_for_group{
.gid = std::move(gid),
.server = std::move(server),
.ticker = std::move(ticker),
.rpc = rpc_ref,
};
}
future<> raft_services::add_server(raft::server_id id, create_server_result srv) {
auto srv_ref = std::ref(*srv.first);
// initialize the corresponding timer to tick the raft server instance
ticker_type t([srv_ref] {
srv_ref.get().tick();
});
auto [it, inserted] = _servers.emplace(std::pair(id, servers_value_type{
.server = std::move(srv.first),
.rpc = srv.second,
.ticker = std::move(t)
}));
future<> raft_services::start_server_for_group(server_for_group new_grp) {
auto gid = new_grp.gid;
auto [it, inserted] = _servers.emplace(std::move(gid), std::move(new_grp));
if (!inserted) {
on_internal_error(rslog, format("Attempt to add the second instance of raft server with the same id={}", id));
on_internal_error(rslog, format("Attempt to add the second instance of raft server with the same gid={}", gid));
}
ticker_type& ticker_from_map = it->second.ticker;
auto& grp = it->second;
try {
// start the server instance prior to arming the ticker timer.
// By the time the tick() is executed the server should already be initialized.
co_await srv_ref.get().start();
co_await grp.server->start();
} catch (...) {
// remove server from the map to prevent calling `abort()` on a
// non-started instance when `raft_services::uninit` is called.
@@ -199,15 +197,11 @@ future<> raft_services::add_server(raft::server_id id, create_server_result srv)
on_internal_error(rslog, std::current_exception());
}
ticker_from_map.arm_periodic(tick_interval);
grp.ticker->arm_periodic(tick_interval);
}
unsigned raft_services::shard_for_group(const raft::group_id& gid) const {
if (gid == schema_raft_state_machine::gid) {
return 0; // schema raft server is always owned by shard 0
}
// We haven't settled yet on how to organize and manage (group_id -> shard_id) mapping
on_internal_error(rslog, format("Could not map raft group id {} to a corresponding shard id", gid));
return 0; // schema raft server is always owned by shard 0
}
gms::inet_address raft_services::get_inet_address(raft::server_id id) const {
@@ -225,3 +219,4 @@ void raft_services::update_address_mapping(raft::server_id id, gms::inet_address
void raft_services::remove_address_mapping(raft::server_id id) {
_srv_address_mappings.erase(id);
}

View File

@@ -44,6 +44,13 @@ class gossiper;
class raft_rpc;
class raft_gossip_failure_detector;
struct raft_group_not_found: public raft::error {
raft::group_id gid;
raft_group_not_found(raft::group_id gid_arg)
: raft::error(format("Raft group {} not found", gid_arg)), gid(gid_arg)
{}
};
// This class is responsible for creating, storing and accessing raft servers.
// It also manages the raft rpc verbs initialization.
//
@@ -55,22 +62,21 @@ public:
// TODO: should be configurable.
static constexpr ticker_type::duration tick_interval = std::chrono::milliseconds(100);
private:
using create_server_result = std::pair<std::unique_ptr<raft::server>, raft_rpc*>;
netw::messaging_service& _ms;
gms::gossiper& _gossiper;
sharded<cql3::query_processor>& _qp;
// Shard-local failure detector instance shared among all raft groups
shared_ptr<raft_gossip_failure_detector> _fd;
struct servers_value_type {
struct server_for_group {
raft::group_id gid;
std::unique_ptr<raft::server> server;
raft_rpc* rpc;
ticker_type ticker;
std::unique_ptr<ticker_type> ticker;
raft_rpc& rpc;
};
// Raft servers along with the corresponding timers to tick each instance.
// Currently ticking every 100ms.
std::unordered_map<raft::server_id, servers_value_type> _servers;
std::unordered_map<raft::group_id, server_for_group> _servers;
// inet_address:es for remote raft servers known to us
raft_address_map<> _srv_address_mappings;
@@ -78,8 +84,9 @@ private:
seastar::future<> uninit_rpc_verbs();
seastar::future<> stop_servers();
create_server_result create_schema_server(raft::server_id id);
server_for_group create_server_for_group(raft::group_id id);
server_for_group& get_server_for_group(raft::group_id id);
public:
raft_services(netw::messaging_service& ms, gms::gossiper& gs, sharded<cql3::query_processor>& qp);
@@ -97,10 +104,15 @@ public:
// Must be invoked explicitly on each shard to stop this service.
seastar::future<> uninit();
raft_rpc& get_rpc(raft::server_id id);
raft_rpc& get_rpc(raft::group_id gid);
// Find server for group by group id. Throws exception if
// there is no such group.
raft::server& get_server(raft::group_id gid);
// Start raft server instance, store in the map of raft servers and
// initialize the associated timer to tick the server.
future<> add_server(raft::server_id id, create_server_result srv);
// arm the associated timer to tick the server.
future<> start_server_for_group(server_for_group grp);
unsigned shard_for_group(const raft::group_id& gid) const;
// Map raft server_id to inet_address to be consumed by `messaging_service`

View File

@@ -66,8 +66,8 @@ future<std::pair<raft::term_t, raft::server_id>> raft_sys_table_storage::load_te
co_return std::pair(raft::term_t(), raft::server_id());
}
const auto& static_row = rs->one();
raft::term_t vote_term = raft::term_t(static_row.get_as<int64_t>("vote_term"));
raft::server_id vote{static_row.get_as<utils::UUID>("vote")};
raft::term_t vote_term = raft::term_t(static_row.get_or<int64_t>("vote_term", raft::term_t{}));
raft::server_id vote{static_row.get_or<utils::UUID>("vote", raft::server_id{}.id)};
co_return std::pair(vote_term, vote);
}
@@ -77,6 +77,11 @@ future<raft::log_entries> raft_sys_table_storage::load_log() {
raft::log_entries log;
for (const cql3::untyped_result_set_row& row : *rs) {
if (!row.has("data")) {
// The partition only contains static cells, the log
// is empty.
break;
}
raft::term_t term = raft::term_t(row.get_as<int64_t>("term"));
raft::index_t idx = raft::index_t(row.get_as<int64_t>("index"));
auto raw_data = row.get_blob("data");
@@ -95,7 +100,7 @@ future<raft::log_entries> raft_sys_table_storage::load_log() {
future<raft::snapshot> raft_sys_table_storage::load_snapshot() {
static const auto load_id_cql = format("SELECT snapshot_id FROM system.{} WHERE group_id = ?", db::system_keyspace::RAFT);
::shared_ptr<cql3::untyped_result_set> id_rs = co_await _qp.execute_internal(load_id_cql, {_group_id.id});
if (id_rs->empty()) {
if (id_rs->empty() || !id_rs->one().has("snapshot_id")) {
co_return raft::snapshot();
}
const auto& id_row = id_rs->one(); // should be only one row since snapshot_id column is static
@@ -166,7 +171,7 @@ future<> raft_sys_table_storage::do_store_log_entries(const std::vector<raft::lo
// don't include serialized "data" here since it will require to linearize the stream
std::vector<cql3::raw_value> single_stmt_values = {
cql3::raw_value::make_value(long_type->decompose(_group_id.id)),
cql3::raw_value::make_value(timeuuid_type->decompose(_group_id.id)),
cql3::raw_value::make_value(long_type->decompose(int64_t(eptr->term))),
cql3::raw_value::make_value(long_type->decompose(int64_t(eptr->idx)))
};

View File

@@ -21,19 +21,19 @@
#include "service/raft/schema_raft_state_machine.hh"
future<> schema_raft_state_machine::apply(std::vector<raft::command_cref> command) {
throw std::runtime_error("Not implemented");
return make_ready_future<>();
}
future<raft::snapshot_id> schema_raft_state_machine::take_snapshot() {
throw std::runtime_error("Not implemented");
return make_ready_future<raft::snapshot_id>(raft::snapshot_id::create_random_id());
}
void schema_raft_state_machine::drop_snapshot(raft::snapshot_id id) {
throw std::runtime_error("Not implemented");
(void) id;
}
future<> schema_raft_state_machine::load_snapshot(raft::snapshot_id id) {
throw std::runtime_error("Not implemented");
return make_ready_future<>();
}
future<> schema_raft_state_machine::abort() {

View File

@@ -21,15 +21,12 @@
#pragma once
#include "raft/raft.hh"
#include "utils/UUID_gen.hh"
// Raft state machine implementation for managing schema changes.
// NOTE: schema raft server is always instantiated on shard 0.
class schema_raft_state_machine : public raft::state_machine {
public:
// schema raft server always belongs to a special pre-defined raft group with id = UUID(0, 0)
// TODO: 0-raft group id should be unique for each scylla cluster
static constexpr raft::group_id gid;
future<> apply(std::vector<raft::command_cref> command) override;
future<raft::snapshot_id> take_snapshot() override;
void drop_snapshot(raft::snapshot_id id) override;

View File

@@ -22,7 +22,7 @@
#include <seastar/testing/test_case.hh>
#include <seastar/core/coroutine.hh>
#include "utils/UUID.hh"
#include "utils/UUID_gen.hh"
#include "service/raft/raft_sys_table_storage.hh"
@@ -56,7 +56,7 @@ static bool operator==(const log_entry& lhs, const log_entry& rhs) {
} // namespace raft
static constexpr raft::group_id gid;
static raft::group_id gid{utils::UUID_gen::min_time_UUID()};
// Create a test log with entries of each kind to test that these get
// serialized/deserialized properly