storage_service: introduce join_node_query verb

When a node joins an existing cluster, it will ask a node that already
belongs to the cluster about which topology operations to use when
joining.
This commit is contained in:
Piotr Dulikowski
2024-01-25 10:25:23 +01:00
parent bab5d3bbe5
commit 7601f40bf8
5 changed files with 42 additions and 1 deletions

View File

@@ -8,6 +8,17 @@
namespace service {
struct join_node_query_params {};
struct join_node_query_result {
enum class topology_mode : uint8_t {
legacy = 0,
raft = 1,
};
service::join_node_query_result::topology_mode topo_mode;
};
struct join_node_request_params {
raft::server_id host_id;
std::optional<raft::server_id> replaced_id;
@@ -51,6 +62,7 @@ struct join_node_response_params {
struct join_node_response_result {};
verb join_node_query (raft::server_id dst_id, service::join_node_query_params) -> service::join_node_query_result;
verb join_node_request (raft::server_id dst_id, service::join_node_request_params) -> service::join_node_request_result;
verb join_node_response (raft::server_id dst_id, service::join_node_response_params) -> service::join_node_response_result;

View File

@@ -578,6 +578,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::RAFT_TOPOLOGY_CMD:
case messaging_verb::JOIN_NODE_REQUEST:
case messaging_verb::JOIN_NODE_RESPONSE:
case messaging_verb::JOIN_NODE_QUERY:
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
// DO NOT put any 'hot' (e.g. data path) verbs in this group,
// only verbs which are 'rare' and 'cheap'.

View File

@@ -193,7 +193,8 @@ enum class messaging_verb : int32_t {
TABLET_STREAM_FILES = 70,
STREAM_BLOB = 71,
TABLE_LOAD_STATS = 72,
LAST = 73,
JOIN_NODE_QUERY = 73,
LAST = 74,
};
} // namespace netw

View File

@@ -14,6 +14,20 @@
namespace service {
struct join_node_query_params {};
struct join_node_query_result {
enum class topology_mode : uint8_t {
// The cluster uses legacy, gossiper-based topology operations
legacy = 0,
// The cluster uses raft-based topology operations
raft = 1,
};
topology_mode topo_mode;
};
struct join_node_request_params {
raft::server_id host_id;
std::optional<raft::server_id> replaced_id;

View File

@@ -5778,6 +5778,19 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled)
co_return co_await ss.join_node_response_handler(std::move(params));
});
});
ser::join_node_rpc_verbs::register_join_node_query(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_query_params) {
return handle_raft_rpc(dst_id, [] (auto& ss) -> future<join_node_query_result> {
if (!ss.legacy_topology_change_enabled() && !ss.raft_topology_change_enabled()) {
throw std::runtime_error("The cluster is upgrading to raft topology. Nodes cannot join at this time.");
}
auto result = join_node_query_result{
.topo_mode = ss.raft_topology_change_enabled()
? join_node_query_result::topology_mode::raft
: join_node_query_result::topology_mode::legacy,
};
return make_ready_future<join_node_query_result>(std::move(result));
});
});
}
}