storage_service: raft topology: introduce snapshot transfer code for the topology table

This commit is contained in:
Gleb Natapov
2022-12-27 12:32:31 +02:00
parent 6a4d773b7e
commit d69a887366
6 changed files with 49 additions and 1 deletions

View File

@@ -24,5 +24,12 @@ namespace service {
service::raft_topology_cmd_result::command_status status;
};
struct raft_topology_snapshot {
std::vector<canonical_mutation> mutations;
};
struct raft_topology_pull_params {};
verb raft_topology_cmd (raft::term_t term, service::raft_topology_cmd) -> service::raft_topology_cmd_result;
verb raft_pull_topology_snapshot (service::raft_topology_pull_params) -> service::raft_topology_snapshot;
}

View File

@@ -572,6 +572,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::RAFT_ADD_ENTRY:
case messaging_verb::RAFT_MODIFY_CONFIG:
case messaging_verb::DIRECT_FD_PING:
case messaging_verb::RAFT_PULL_TOPOLOGY_SNAPSHOT:
return 2;
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:

View File

@@ -181,7 +181,8 @@ enum class messaging_verb : int32_t {
GET_GROUP0_UPGRADE_STATE = 62,
DIRECT_FD_PING = 63,
RAFT_TOPOLOGY_CMD = 64,
LAST = 65,
RAFT_PULL_TOPOLOGY_SNAPSHOT = 65,
LAST = 66,
};
} // namespace netw

View File

@@ -279,6 +279,16 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela
slogger.info("Checking bootstrapping/leaving nodes: ok");
}
future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) {
auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
std::vector<mutation> muts;
muts.reserve(snp.mutations.size());
boost::transform(snp.mutations, std::back_inserter(muts), [s] (const canonical_mutation& m) {
return m.to_mutation(s);
});
co_await _db.local().apply(freeze(muts), db::no_timeout);
}
future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_service,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<service::storage_proxy>& proxy,
@@ -3393,6 +3403,21 @@ void storage_service::init_messaging_service(sharded<service::storage_proxy>& pr
return ss.raft_topology_cmd_handler(sys_dist_ks, term, cmd);
});
});
ser::storage_service_rpc_verbs::register_raft_pull_topology_snapshot(&_messaging.local(), [this, &proxy] (raft_topology_pull_params params) {
return container().invoke_on(0, [&proxy] (auto& ss) -> future<raft_topology_snapshot> {
if (!ss._raft_topology_change_enabled) {
co_return raft_topology_snapshot{};
}
auto rs = co_await db::system_keyspace::query_mutations(proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
std::vector<canonical_mutation> results;
results.reserve(rs->partitions().size());
boost::range::transform(rs->partitions(), std::back_inserter(results), [s] (const partition& p) {
return canonical_mutation{p.mut().unfreeze(s)};
});
co_return raft_topology_snapshot{std::move(results)};
});
});
}
future<> storage_service::uninit_messaging_service() {

View File

@@ -782,7 +782,13 @@ private:
bool is_normal_state_handled_on_boot(gms::inet_address);
future<> wait_for_normal_state_handled_on_boot(const std::unordered_set<gms::inet_address>& nodes, sstring ops, node_ops_id uuid);
bool _raft_topology_change_enabled = false;
future<raft_topology_cmd_result> raft_topology_cmd_handler(sharded<db::system_distributed_keyspace>& sys_dist_ks, raft::term_t term, const raft_topology_cmd& cmd);
public:
// Applies received raft snapshot to local state machine persistent storage
future<> merge_topology_snapshot(raft_topology_snapshot snp);
};
}

View File

@@ -21,6 +21,7 @@
#include "raft/raft.hh"
#include "utils/UUID.hh"
#include "dht/i_partitioner.hh"
#include "mutation/canonical_mutation.hh"
namespace service {
@@ -88,6 +89,13 @@ struct topology {
bool contains(raft::server_id id);
};
struct raft_topology_snapshot {
std::vector<canonical_mutation> mutations;
};
struct raft_topology_pull_params {
};
// State machine that is responsible for topology change
struct topology_state_machine {
using topology_type = topology;