diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh index 99e3a326d3..121aa6282c 100644 --- a/idl/storage_service.idl.hh +++ b/idl/storage_service.idl.hh @@ -52,4 +52,5 @@ struct raft_topology_pull_params {}; verb raft_topology_cmd (raft::term_t term, uint64_t cmd_index, service::raft_topology_cmd) -> service::raft_topology_cmd_result; verb [[cancellable]] raft_pull_topology_snapshot (service::raft_topology_pull_params) -> service::raft_topology_snapshot; verb [[cancellable]] tablet_stream_data (locator::global_tablet_id); +verb [[cancellable]] tablet_cleanup (locator::global_tablet_id); } diff --git a/locator/tablets.cc b/locator/tablets.cc index 5103a214d7..d016639570 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -45,6 +45,8 @@ write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage return write_replica_set_selector::next; case tablet_transition_stage::cleanup: return write_replica_set_selector::next; + case tablet_transition_stage::end_migration: + return write_replica_set_selector::next; } on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast(stage))); } @@ -64,6 +66,8 @@ read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage) return read_replica_set_selector::next; case tablet_transition_stage::cleanup: return read_replica_set_selector::next; + case tablet_transition_stage::end_migration: + return read_replica_set_selector::next; } on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast(stage))); } @@ -232,6 +236,7 @@ static const std::unordered_map tablet_transit {tablet_transition_stage::streaming, "streaming"}, {tablet_transition_stage::use_new, "use_new"}, {tablet_transition_stage::cleanup, "cleanup"}, + {tablet_transition_stage::end_migration, "end_migration"}, }; static const std::unordered_map tablet_transition_stage_from_name = std::invoke([] { diff --git a/locator/tablets.hh b/locator/tablets.hh index cea0aa71c1..29abb4edb5 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -141,6 +141,7 @@ enum class tablet_transition_stage { write_both_read_new, use_new, cleanup, + end_migration, }; sstring tablet_transition_stage_to_string(tablet_transition_stage); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 12d01843e9..919b0657ff 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -597,6 +597,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::NODE_OPS_CMD: case messaging_verb::HINT_MUTATION: case messaging_verb::TABLET_STREAM_DATA: + case messaging_verb::TABLET_CLEANUP: return 1; case messaging_verb::CLIENT_ID: case messaging_verb::MUTATION: diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 75a13b0e5d..a5b88f13a6 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -183,7 +183,8 @@ enum class messaging_verb : int32_t { RAFT_TOPOLOGY_CMD = 64, RAFT_PULL_TOPOLOGY_SNAPSHOT = 65, TABLET_STREAM_DATA = 66, - LAST = 67, + TABLET_CLEANUP = 67, + LAST = 68, }; } // namespace netw diff --git a/replica/database.hh b/replica/database.hh index 54e10cbd3f..e54e76f588 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -62,6 +62,7 @@ #include "sstables/generation_type.hh" #include "db/rate_limiter.hh" #include "db/operation_type.hh" +#include "locator/tablets.hh" #include "utils/serialized_action.hh" #include "compaction/compaction_fwd.hh" @@ -792,6 +793,7 @@ public: const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; } void update_effective_replication_map(locator::effective_replication_map_ptr); [[gnu::always_inline]] bool uses_tablets() const; + future<> cleanup_tablet(locator::tablet_id); future find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const; future find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const; shard_id shard_of(const mutation& m) const { diff --git a/replica/table.cc b/replica/table.cc index b9c0e01011..685599d3cf 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2964,4 +2964,9 @@ bool table::requires_cleanup(const sstables::sstable_set& set) const { })); } +future<> table::cleanup_tablet(locator::tablet_id) { + co_await flush(); + // FIXME: Remove sstables +} + } // namespace replica diff --git a/service/storage_service.cc b/service/storage_service.cc index db0e932c1d..281a9eda0f 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1399,6 +1399,7 @@ class topology_coordinator { // Next migration of the same tablet is guaranteed to use a different instance. struct tablet_migration_state { background_action_holder streaming; + background_action_holder cleanup; std::unordered_map barriers; }; @@ -1508,11 +1509,15 @@ class topology_coordinator { .build()); }; - auto transition_to_with_barrier = [&] (locator::tablet_transition_stage stage) { - if (advance_in_background(gid, tablet_state.barriers[stage], "barrier", [&] { + auto do_barrier = [&] { + return advance_in_background(gid, tablet_state.barriers[trinfo.stage], "barrier", [&] { needs_barrier = true; return barrier.get_shared_future(); - })) { + }); + }; + + auto transition_to_with_barrier = [&] (locator::tablet_transition_stage stage) { + if (do_barrier()) { transition_to(stage); } }; @@ -1544,13 +1549,26 @@ class topology_coordinator { transition_to_with_barrier(locator::tablet_transition_stage::cleanup); break; case locator::tablet_transition_stage::cleanup: - // FIXME: Actually perform the cleanup. Block on integration with compaction groups. - _tablets.erase(gid); - updates.emplace_back( - replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table) - .del_transition(last_token) - .set_replicas(last_token, trinfo.next) - .build()); + if (advance_in_background(gid, tablet_state.cleanup, "cleanup", [&] { + locator::tablet_replica dst = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo); + slogger.info("raft topology: Initiating tablet cleanup of {} on {}", gid, dst); + return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging, + netw::msg_addr(id2ip(dst.host)), _as, gid); + })) { + transition_to(locator::tablet_transition_stage::end_migration); + } + break; + case locator::tablet_transition_stage::end_migration: + // Need a separate stage and a barrier after cleanup RPC to cut off stale RPCs. + // See do_tablet_operation() doc. + if (do_barrier()) { + _tablets.erase(gid); + updates.emplace_back( + replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table) + .del_transition(last_token) + .set_replicas(last_token, trinfo.next) + .build()); + } break; } }); @@ -6208,6 +6226,42 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { }); } +future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) { + return do_tablet_operation(tablet, "Cleanup", [this, tablet] (locator::tablet_metadata_guard& guard) { + shard_id shard; + + { + auto tm = guard.get_token_metadata(); + auto& tmap = guard.get_tablet_map(); + auto *trinfo = tmap.get_tablet_transition_info(tablet.tablet); + + // Check if the request is still valid. + // If there is mismatch, it means this cleanup was canceled and the coordinator moved on. + if (!trinfo) { + throw std::runtime_error(format("No transition info for tablet {}", tablet)); + } + if (trinfo->stage != locator::tablet_transition_stage::cleanup) { + throw std::runtime_error(format("Tablet {} stage is not at cleanup", tablet)); + } + + auto& tinfo = tmap.get_tablet_info(tablet.tablet); + locator::tablet_replica leaving_replica = locator::get_leaving_replica(tinfo, *trinfo); + if (leaving_replica.host != tm->get_my_id()) { + throw std::runtime_error(format("Tablet {} has leaving replica different than this one", tablet)); + } + auto shard_opt = tmap.get_shard(tablet.tablet, tm->get_my_id()); + if (!shard_opt) { + on_internal_error(slogger, format("Tablet {} has no shard on this node", tablet)); + } + shard = *shard_opt; + } + return _db.invoke_on(shard, [tablet] (replica::database& db) { + auto& table = db.find_column_family(tablet.table); + return table.cleanup_tablet(tablet.tablet); + }); + }); +} + void storage_service::init_messaging_service(sharded& proxy, sharded& sys_dist_ks) { _messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) { auto coordinator = cinfo.retrieve_auxiliary("baddr"); @@ -6276,6 +6330,9 @@ void storage_service::init_messaging_service(sharded& pr return ss.stream_tablet(tablet); }); }); + ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [this] (locator::global_tablet_id tablet) { + return cleanup_tablet(tablet); + }); } future<> storage_service::uninit_messaging_service() { diff --git a/service/storage_service.hh b/service/storage_service.hh index 4df3e909d2..087a84a2e7 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -159,6 +159,7 @@ private: sstring op_name, std::function(locator::tablet_metadata_guard&)> op); future<> stream_tablet(locator::global_tablet_id); + future<> cleanup_tablet(locator::global_tablet_id); inet_address host2ip(locator::host_id); public: storage_service(abort_source& as, distributed& db, diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index d12f86e5de..c6822e8df6 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -265,6 +265,8 @@ private: return false; case tablet_transition_stage::cleanup: return false; + case tablet_transition_stage::end_migration: + return false; } on_internal_error(lblogger, format("Invalid transition stage: {}", static_cast(trinfo->stage))); }