storage_service: check destination host ID in raft verbs

In unlucky but possible circumstances where a node is being replaced
very quickly, RPC requests using raft-related verbs from storage_service
might be sent to it - even before the node starts its group 0 server.
In the latter case, this triggers on_internal_error.

This commit adds protection to the existing verbs in storage_service:
they check whether the group 0 is running and whether the received
host_id matches the actual recipient's host_id.

None of the verbs that are modified are in any existing release, so the
added parameter does not have to be wrapped in rpc::optional.
This commit is contained in:
Piotr Dulikowski
2023-09-08 11:16:23 +02:00
parent 0317705f5a
commit 51b0e4d44f
6 changed files with 43 additions and 30 deletions

View File

@@ -49,8 +49,8 @@ struct raft_topology_snapshot {
struct raft_topology_pull_params {};
verb raft_topology_cmd (raft::term_t term, uint64_t cmd_index, service::raft_topology_cmd) -> service::raft_topology_cmd_result;
verb [[cancellable]] raft_pull_topology_snapshot (service::raft_topology_pull_params) -> service::raft_topology_snapshot;
verb [[cancellable]] tablet_stream_data (locator::global_tablet_id);
verb [[cancellable]] tablet_cleanup (locator::global_tablet_id);
verb raft_topology_cmd (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, service::raft_topology_cmd) -> service::raft_topology_cmd_result;
verb [[cancellable]] raft_pull_topology_snapshot (raft::server_id dst_id, service::raft_topology_pull_params) -> service::raft_topology_snapshot;
verb [[cancellable]] tablet_stream_data (raft::server_id dst_id, locator::global_tablet_id);
verb [[cancellable]] tablet_cleanup (raft::server_id dst_id, locator::global_tablet_id);
}

View File

@@ -34,6 +34,7 @@
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/raft/raft_address_map.hh"
#include "partition_slice_builder.hh"
#include "timestamp.hh"
#include "utils/overloaded_functor.hh"
@@ -199,7 +200,16 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
_ss._topology_state_machine.event.broadcast();
}
future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) {
future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) {
// FIXME: The translation will ultimately be done by messaging_service
auto from_ip = _address_map.find(from_id);
if (!from_ip.has_value()) {
// This is virtually impossible. We've just received the
// snapshot from the sender and must have updated our
// address map with its IP address.
const auto msg = format("Failed to apply snapshot from {}: ip address of the sender is not found", from_ip);
co_await coroutine::return_exception(raft::transport_error(msg));
}
try {
// Note that this may bring newer state than the group0 state machine raft's
// log, so some raft entries may be double applied, but since the state
@@ -207,8 +217,8 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s
auto holder = _gate.hold();
slogger.trace("transfer snapshot from {} index {} snp id {}", from, snp.idx, snp.id);
netw::messaging_service::msg_addr addr{from, 0};
slogger.trace("transfer snapshot from {} index {} snp id {}", from_ip, snp.idx, snp.id);
netw::messaging_service::msg_addr addr{*from_ip, 0};
auto& as = _abort_source;
// (Ab)use MIGRATION_REQUEST to also transfer group0 history table mutation besides schema tables mutations.
@@ -219,7 +229,7 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s
on_internal_error(slogger, "Expected MIGRATION_REQUEST to return canonical mutations");
}
auto topology_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_topology_snapshot(&_mm._messaging, addr, as, service::raft_topology_pull_params{});
auto topology_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_topology_snapshot(&_mm._messaging, addr, as, from_id, service::raft_topology_pull_params{});
auto history_mut = extract_history_mutation(*cm, _sp.data_dictionary());

View File

@@ -99,7 +99,7 @@ public:
future<raft::snapshot_id> take_snapshot() override;
void drop_snapshot(raft::snapshot_id id) override;
future<> load_snapshot(raft::snapshot_id id) override;
future<> transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) override;
future<> transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) override;
future<> abort() override;
};

View File

@@ -175,15 +175,7 @@ future<raft::read_barrier_reply> raft_rpc::execute_read_barrier(raft::server_id
}
future<raft::snapshot_reply> raft_rpc::apply_snapshot(raft::server_id from, raft::install_snapshot snp) {
auto ip_addr = _address_map.find(from);
if (!ip_addr.has_value()) {
// This is virtually impossible. We've just received the
// snapshot from the sender and must have updated our
// address map with its IP address.
const auto msg = format("Failed to apply snapshot from {}: ip address of the sender is not found", from);
co_return coroutine::exception(std::make_exception_ptr(raft::transport_error(msg)));
}
co_await _sm.transfer_snapshot(*ip_addr, snp.snp);
co_await _sm.transfer_snapshot(from, snp.snp);
co_return co_await raft_with_gate(_shutdown_gate, [&] {
return _client->apply_snapshot(from, std::move(snp));
});

View File

@@ -16,7 +16,7 @@ namespace service {
// Snapshot transfer is delegated to a state machine implementation
class raft_state_machine : public raft::state_machine {
public:
virtual future<> transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) = 0;
virtual future<> transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) = 0;
};
} // end of namespace service

View File

@@ -1036,7 +1036,7 @@ class topology_coordinator {
auto result = utils::fb_utilities::is_me(*ip) ?
co_await _raft_topology_cmd_handler(_sys_dist_ks, _term, cmd_index, cmd) :
co_await ser::storage_service_rpc_verbs::send_raft_topology_cmd(
&_messaging, netw::msg_addr{*ip}, _term, cmd_index, cmd);
&_messaging, netw::msg_addr{*ip}, id, _term, cmd_index, cmd);
if (result.status == raft_topology_cmd_result::command_status::fail) {
co_await coroutine::exception(std::make_exception_ptr(
std::runtime_error(::format("failed status returned from {}/{}", id, *ip))));
@@ -1612,7 +1612,7 @@ class topology_coordinator {
slogger.info("raft topology: Initiating tablet streaming of {} to {}", gid, trinfo.pending_replica);
auto dst = trinfo.pending_replica.host;
return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging,
netw::msg_addr(id2ip(dst)), _as, gid);
netw::msg_addr(id2ip(dst)), _as, raft::server_id(dst.uuid()), gid);
})) {
transition_to(locator::tablet_transition_stage::write_both_read_new);
}
@@ -1628,7 +1628,7 @@ class topology_coordinator {
locator::tablet_replica dst = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
slogger.info("raft topology: Initiating tablet cleanup of {} on {}", gid, dst);
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
netw::msg_addr(id2ip(dst.host)), _as, gid);
netw::msg_addr(id2ip(dst.host)), _as, raft::server_id(dst.host.uuid()), gid);
})) {
transition_to(locator::tablet_transition_stage::end_migration);
}
@@ -6194,13 +6194,24 @@ void storage_service::init_messaging_service(sharded<db::system_distributed_keys
return ss.node_ops_cmd_handler(coordinator, std::move(req));
});
});
ser::storage_service_rpc_verbs::register_raft_topology_cmd(&_messaging.local(), [this, &sys_dist_ks] (raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) {
return container().invoke_on(0, [&sys_dist_ks, cmd = std::move(cmd), term, cmd_index] (auto& ss) {
auto handle_raft_rpc = [&cont = container()] (raft::server_id dst_id, auto handler) {
return cont.invoke_on(0, [dst_id, handler = std::move(handler)] (auto& ss) mutable {
if (!ss._group0 || !ss._group0->joined_group0()) {
throw std::runtime_error("The node did not join group 0 yet");
}
if (ss._group0->load_my_id() != dst_id) {
throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id);
}
return handler(ss);
});
};
ser::storage_service_rpc_verbs::register_raft_topology_cmd(&_messaging.local(), [&sys_dist_ks, handle_raft_rpc] (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) {
return handle_raft_rpc(dst_id, [&sys_dist_ks, cmd = std::move(cmd), term, cmd_index] (auto& ss) {
return ss.raft_topology_cmd_handler(sys_dist_ks, term, cmd_index, cmd);
});
});
ser::storage_service_rpc_verbs::register_raft_pull_topology_snapshot(&_messaging.local(), [this] (raft_topology_pull_params params) {
return container().invoke_on(0, [] (auto& ss) -> future<raft_topology_snapshot> {
ser::storage_service_rpc_verbs::register_raft_pull_topology_snapshot(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft_topology_pull_params params) {
return handle_raft_rpc(dst_id, [] (storage_service& ss) -> future<raft_topology_snapshot> {
if (!ss._raft_topology_change_enabled) {
co_return raft_topology_snapshot{};
}
@@ -6248,13 +6259,13 @@ void storage_service::init_messaging_service(sharded<db::system_distributed_keys
};
});
});
ser::storage_service_rpc_verbs::register_tablet_stream_data(&_messaging.local(), [this] (locator::global_tablet_id tablet) {
return container().invoke_on(0, [tablet] (auto& ss) -> future<> {
ser::storage_service_rpc_verbs::register_tablet_stream_data(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) {
return handle_raft_rpc(dst_id, [tablet] (auto& ss) {
return ss.stream_tablet(tablet);
});
});
ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [this] (locator::global_tablet_id tablet) {
return container().invoke_on(0, [tablet] (auto& ss) -> future<> {
ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) {
return handle_raft_rpc(dst_id, [tablet] (auto& ss) {
return ss.cleanup_tablet(tablet);
});
});