diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh index 121aa6282c..1bb926de72 100644 --- a/idl/storage_service.idl.hh +++ b/idl/storage_service.idl.hh @@ -49,8 +49,8 @@ struct raft_topology_snapshot { struct raft_topology_pull_params {}; -verb raft_topology_cmd (raft::term_t term, uint64_t cmd_index, service::raft_topology_cmd) -> service::raft_topology_cmd_result; -verb [[cancellable]] raft_pull_topology_snapshot (service::raft_topology_pull_params) -> service::raft_topology_snapshot; -verb [[cancellable]] tablet_stream_data (locator::global_tablet_id); -verb [[cancellable]] tablet_cleanup (locator::global_tablet_id); +verb raft_topology_cmd (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, service::raft_topology_cmd) -> service::raft_topology_cmd_result; +verb [[cancellable]] raft_pull_topology_snapshot (raft::server_id dst_id, service::raft_topology_pull_params) -> service::raft_topology_snapshot; +verb [[cancellable]] tablet_stream_data (raft::server_id dst_id, locator::global_tablet_id); +verb [[cancellable]] tablet_cleanup (raft::server_id dst_id, locator::global_tablet_id); } diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc index d17396be0f..17b50aa406 100644 --- a/service/raft/group0_state_machine.cc +++ b/service/raft/group0_state_machine.cc @@ -34,6 +34,7 @@ #include "db/system_keyspace.hh" #include "service/storage_proxy.hh" #include "service/raft/raft_group0_client.hh" +#include "service/raft/raft_address_map.hh" #include "partition_slice_builder.hh" #include "timestamp.hh" #include "utils/overloaded_functor.hh" @@ -199,7 +200,16 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) { _ss._topology_state_machine.event.broadcast(); } -future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) { +future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) { + // FIXME: The translation will ultimately be done by messaging_service + auto from_ip = _address_map.find(from_id); + if (!from_ip.has_value()) { + // This is virtually impossible. We've just received the + // snapshot from the sender and must have updated our + // address map with its IP address. + const auto msg = format("Failed to apply snapshot from {}: ip address of the sender is not found", from_ip); + co_await coroutine::return_exception(raft::transport_error(msg)); + } try { // Note that this may bring newer state than the group0 state machine raft's // log, so some raft entries may be double applied, but since the state @@ -207,8 +217,8 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s auto holder = _gate.hold(); - slogger.trace("transfer snapshot from {} index {} snp id {}", from, snp.idx, snp.id); - netw::messaging_service::msg_addr addr{from, 0}; + slogger.trace("transfer snapshot from {} index {} snp id {}", from_ip, snp.idx, snp.id); + netw::messaging_service::msg_addr addr{*from_ip, 0}; auto& as = _abort_source; // (Ab)use MIGRATION_REQUEST to also transfer group0 history table mutation besides schema tables mutations. @@ -219,7 +229,7 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s on_internal_error(slogger, "Expected MIGRATION_REQUEST to return canonical mutations"); } - auto topology_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_topology_snapshot(&_mm._messaging, addr, as, service::raft_topology_pull_params{}); + auto topology_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_topology_snapshot(&_mm._messaging, addr, as, from_id, service::raft_topology_pull_params{}); auto history_mut = extract_history_mutation(*cm, _sp.data_dictionary()); diff --git a/service/raft/group0_state_machine.hh b/service/raft/group0_state_machine.hh index 3a45e3b2c9..f6bbd58d88 100644 --- a/service/raft/group0_state_machine.hh +++ b/service/raft/group0_state_machine.hh @@ -99,7 +99,7 @@ public: future take_snapshot() override; void drop_snapshot(raft::snapshot_id id) override; future<> load_snapshot(raft::snapshot_id id) override; - future<> transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) override; + future<> transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) override; future<> abort() override; }; diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc index 52af090694..1c1da9d7de 100644 --- a/service/raft/raft_rpc.cc +++ b/service/raft/raft_rpc.cc @@ -175,15 +175,7 @@ future raft_rpc::execute_read_barrier(raft::server_id } future raft_rpc::apply_snapshot(raft::server_id from, raft::install_snapshot snp) { - auto ip_addr = _address_map.find(from); - if (!ip_addr.has_value()) { - // This is virtually impossible. We've just received the - // snapshot from the sender and must have updated our - // address map with its IP address. - const auto msg = format("Failed to apply snapshot from {}: ip address of the sender is not found", from); - co_return coroutine::exception(std::make_exception_ptr(raft::transport_error(msg))); - } - co_await _sm.transfer_snapshot(*ip_addr, snp.snp); + co_await _sm.transfer_snapshot(from, snp.snp); co_return co_await raft_with_gate(_shutdown_gate, [&] { return _client->apply_snapshot(from, std::move(snp)); }); diff --git a/service/raft/raft_state_machine.hh b/service/raft/raft_state_machine.hh index c543c3fb38..49602571a0 100644 --- a/service/raft/raft_state_machine.hh +++ b/service/raft/raft_state_machine.hh @@ -16,7 +16,7 @@ namespace service { // Snapshot transfer is delegated to a state machine implementation class raft_state_machine : public raft::state_machine { public: - virtual future<> transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) = 0; + virtual future<> transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) = 0; }; } // end of namespace service diff --git a/service/storage_service.cc b/service/storage_service.cc index 284fb52527..dc13ba657c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1036,7 +1036,7 @@ class topology_coordinator { auto result = utils::fb_utilities::is_me(*ip) ? co_await _raft_topology_cmd_handler(_sys_dist_ks, _term, cmd_index, cmd) : co_await ser::storage_service_rpc_verbs::send_raft_topology_cmd( - &_messaging, netw::msg_addr{*ip}, _term, cmd_index, cmd); + &_messaging, netw::msg_addr{*ip}, id, _term, cmd_index, cmd); if (result.status == raft_topology_cmd_result::command_status::fail) { co_await coroutine::exception(std::make_exception_ptr( std::runtime_error(::format("failed status returned from {}/{}", id, *ip)))); @@ -1612,7 +1612,7 @@ class topology_coordinator { slogger.info("raft topology: Initiating tablet streaming of {} to {}", gid, trinfo.pending_replica); auto dst = trinfo.pending_replica.host; return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging, - netw::msg_addr(id2ip(dst)), _as, gid); + netw::msg_addr(id2ip(dst)), _as, raft::server_id(dst.uuid()), gid); })) { transition_to(locator::tablet_transition_stage::write_both_read_new); } @@ -1628,7 +1628,7 @@ class topology_coordinator { locator::tablet_replica dst = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo); slogger.info("raft topology: Initiating tablet cleanup of {} on {}", gid, dst); return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging, - netw::msg_addr(id2ip(dst.host)), _as, gid); + netw::msg_addr(id2ip(dst.host)), _as, raft::server_id(dst.host.uuid()), gid); })) { transition_to(locator::tablet_transition_stage::end_migration); } @@ -6194,13 +6194,24 @@ void storage_service::init_messaging_service(shardedjoined_group0()) { + throw std::runtime_error("The node did not join group 0 yet"); + } + if (ss._group0->load_my_id() != dst_id) { + throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id); + } + return handler(ss); + }); + }; + ser::storage_service_rpc_verbs::register_raft_topology_cmd(&_messaging.local(), [&sys_dist_ks, handle_raft_rpc] (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) { + return handle_raft_rpc(dst_id, [&sys_dist_ks, cmd = std::move(cmd), term, cmd_index] (auto& ss) { return ss.raft_topology_cmd_handler(sys_dist_ks, term, cmd_index, cmd); }); }); - ser::storage_service_rpc_verbs::register_raft_pull_topology_snapshot(&_messaging.local(), [this] (raft_topology_pull_params params) { - return container().invoke_on(0, [] (auto& ss) -> future { + ser::storage_service_rpc_verbs::register_raft_pull_topology_snapshot(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft_topology_pull_params params) { + return handle_raft_rpc(dst_id, [] (storage_service& ss) -> future { if (!ss._raft_topology_change_enabled) { co_return raft_topology_snapshot{}; } @@ -6248,13 +6259,13 @@ void storage_service::init_messaging_service(sharded future<> { + ser::storage_service_rpc_verbs::register_tablet_stream_data(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) { + return handle_raft_rpc(dst_id, [tablet] (auto& ss) { return ss.stream_tablet(tablet); }); }); - ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [this] (locator::global_tablet_id tablet) { - return container().invoke_on(0, [tablet] (auto& ss) -> future<> { + ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) { + return handle_raft_rpc(dst_id, [tablet] (auto& ss) { return ss.cleanup_tablet(tablet); }); });