From d5539e080da8af104a0d898c05c21d5320edb4d1 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 28 Aug 2023 01:30:08 +0200 Subject: [PATCH] tablets: Implement cleanup step This change adds a stub for tablet cleanup on the replica side and wires it into the tablet migration process. The handling on replica side is incomplete because it doesn't remove the actual data yet. It only flushes the memtables, so that all data is in sstables and none requires a memtable flush. This patch is necessary to make decommission work. Otherwise, a memtable flush would happen when the decommissioned node is put in the drained state (as in nodetool drain) and it would fail on missing host id mapping (node is no longer in topology), which is examined by the tablet sharder when producing sstable sharding metadata. Leading to abort due to failed memtable flush. --- idl/storage_service.idl.hh | 1 + locator/tablets.cc | 5 +++ locator/tablets.hh | 1 + message/messaging_service.cc | 1 + message/messaging_service.hh | 3 +- replica/database.hh | 2 + replica/table.cc | 5 +++ service/storage_service.cc | 77 +++++++++++++++++++++++++++++++----- service/storage_service.hh | 1 + service/tablet_allocator.cc | 2 + 10 files changed, 87 insertions(+), 11 deletions(-) diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh index 99e3a326d3..121aa6282c 100644 --- a/idl/storage_service.idl.hh +++ b/idl/storage_service.idl.hh @@ -52,4 +52,5 @@ struct raft_topology_pull_params {}; verb raft_topology_cmd (raft::term_t term, uint64_t cmd_index, service::raft_topology_cmd) -> service::raft_topology_cmd_result; verb [[cancellable]] raft_pull_topology_snapshot (service::raft_topology_pull_params) -> service::raft_topology_snapshot; verb [[cancellable]] tablet_stream_data (locator::global_tablet_id); +verb [[cancellable]] tablet_cleanup (locator::global_tablet_id); } diff --git a/locator/tablets.cc b/locator/tablets.cc index 5103a214d7..d016639570 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -45,6 +45,8 @@ write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage return write_replica_set_selector::next; case tablet_transition_stage::cleanup: return write_replica_set_selector::next; + case tablet_transition_stage::end_migration: + return write_replica_set_selector::next; } on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast(stage))); } @@ -64,6 +66,8 @@ read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage) return read_replica_set_selector::next; case tablet_transition_stage::cleanup: return read_replica_set_selector::next; + case tablet_transition_stage::end_migration: + return read_replica_set_selector::next; } on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast(stage))); } @@ -232,6 +236,7 @@ static const std::unordered_map tablet_transit {tablet_transition_stage::streaming, "streaming"}, {tablet_transition_stage::use_new, "use_new"}, {tablet_transition_stage::cleanup, "cleanup"}, + {tablet_transition_stage::end_migration, "end_migration"}, }; static const std::unordered_map tablet_transition_stage_from_name = std::invoke([] { diff --git a/locator/tablets.hh b/locator/tablets.hh index cea0aa71c1..29abb4edb5 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -141,6 +141,7 @@ enum class tablet_transition_stage { write_both_read_new, use_new, cleanup, + end_migration, }; sstring tablet_transition_stage_to_string(tablet_transition_stage); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 12d01843e9..919b0657ff 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -597,6 +597,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::NODE_OPS_CMD: case messaging_verb::HINT_MUTATION: case messaging_verb::TABLET_STREAM_DATA: + case messaging_verb::TABLET_CLEANUP: return 1; case messaging_verb::CLIENT_ID: case messaging_verb::MUTATION: diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 75a13b0e5d..a5b88f13a6 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -183,7 +183,8 @@ enum class messaging_verb : int32_t { RAFT_TOPOLOGY_CMD = 64, RAFT_PULL_TOPOLOGY_SNAPSHOT = 65, TABLET_STREAM_DATA = 66, - LAST = 67, + TABLET_CLEANUP = 67, + LAST = 68, }; } // namespace netw diff --git a/replica/database.hh b/replica/database.hh index 54e10cbd3f..e54e76f588 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -62,6 +62,7 @@ #include "sstables/generation_type.hh" #include "db/rate_limiter.hh" #include "db/operation_type.hh" +#include "locator/tablets.hh" #include "utils/serialized_action.hh" #include "compaction/compaction_fwd.hh" @@ -792,6 +793,7 @@ public: const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; } void update_effective_replication_map(locator::effective_replication_map_ptr); [[gnu::always_inline]] bool uses_tablets() const; + future<> cleanup_tablet(locator::tablet_id); future find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const; future find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const; shard_id shard_of(const mutation& m) const { diff --git a/replica/table.cc b/replica/table.cc index b9c0e01011..685599d3cf 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2964,4 +2964,9 @@ bool table::requires_cleanup(const sstables::sstable_set& set) const { })); } +future<> table::cleanup_tablet(locator::tablet_id) { + co_await flush(); + // FIXME: Remove sstables +} + } // namespace replica diff --git a/service/storage_service.cc b/service/storage_service.cc index db0e932c1d..281a9eda0f 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1399,6 +1399,7 @@ class topology_coordinator { // Next migration of the same tablet is guaranteed to use a different instance. struct tablet_migration_state { background_action_holder streaming; + background_action_holder cleanup; std::unordered_map barriers; }; @@ -1508,11 +1509,15 @@ class topology_coordinator { .build()); }; - auto transition_to_with_barrier = [&] (locator::tablet_transition_stage stage) { - if (advance_in_background(gid, tablet_state.barriers[stage], "barrier", [&] { + auto do_barrier = [&] { + return advance_in_background(gid, tablet_state.barriers[trinfo.stage], "barrier", [&] { needs_barrier = true; return barrier.get_shared_future(); - })) { + }); + }; + + auto transition_to_with_barrier = [&] (locator::tablet_transition_stage stage) { + if (do_barrier()) { transition_to(stage); } }; @@ -1544,13 +1549,26 @@ class topology_coordinator { transition_to_with_barrier(locator::tablet_transition_stage::cleanup); break; case locator::tablet_transition_stage::cleanup: - // FIXME: Actually perform the cleanup. Block on integration with compaction groups. - _tablets.erase(gid); - updates.emplace_back( - replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table) - .del_transition(last_token) - .set_replicas(last_token, trinfo.next) - .build()); + if (advance_in_background(gid, tablet_state.cleanup, "cleanup", [&] { + locator::tablet_replica dst = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo); + slogger.info("raft topology: Initiating tablet cleanup of {} on {}", gid, dst); + return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging, + netw::msg_addr(id2ip(dst.host)), _as, gid); + })) { + transition_to(locator::tablet_transition_stage::end_migration); + } + break; + case locator::tablet_transition_stage::end_migration: + // Need a separate stage and a barrier after cleanup RPC to cut off stale RPCs. + // See do_tablet_operation() doc. + if (do_barrier()) { + _tablets.erase(gid); + updates.emplace_back( + replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table) + .del_transition(last_token) + .set_replicas(last_token, trinfo.next) + .build()); + } break; } }); @@ -6208,6 +6226,42 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { }); } +future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) { + return do_tablet_operation(tablet, "Cleanup", [this, tablet] (locator::tablet_metadata_guard& guard) { + shard_id shard; + + { + auto tm = guard.get_token_metadata(); + auto& tmap = guard.get_tablet_map(); + auto *trinfo = tmap.get_tablet_transition_info(tablet.tablet); + + // Check if the request is still valid. + // If there is mismatch, it means this cleanup was canceled and the coordinator moved on. + if (!trinfo) { + throw std::runtime_error(format("No transition info for tablet {}", tablet)); + } + if (trinfo->stage != locator::tablet_transition_stage::cleanup) { + throw std::runtime_error(format("Tablet {} stage is not at cleanup", tablet)); + } + + auto& tinfo = tmap.get_tablet_info(tablet.tablet); + locator::tablet_replica leaving_replica = locator::get_leaving_replica(tinfo, *trinfo); + if (leaving_replica.host != tm->get_my_id()) { + throw std::runtime_error(format("Tablet {} has leaving replica different than this one", tablet)); + } + auto shard_opt = tmap.get_shard(tablet.tablet, tm->get_my_id()); + if (!shard_opt) { + on_internal_error(slogger, format("Tablet {} has no shard on this node", tablet)); + } + shard = *shard_opt; + } + return _db.invoke_on(shard, [tablet] (replica::database& db) { + auto& table = db.find_column_family(tablet.table); + return table.cleanup_tablet(tablet.tablet); + }); + }); +} + void storage_service::init_messaging_service(sharded& proxy, sharded& sys_dist_ks) { _messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) { auto coordinator = cinfo.retrieve_auxiliary("baddr"); @@ -6276,6 +6330,9 @@ void storage_service::init_messaging_service(sharded& pr return ss.stream_tablet(tablet); }); }); + ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [this] (locator::global_tablet_id tablet) { + return cleanup_tablet(tablet); + }); } future<> storage_service::uninit_messaging_service() { diff --git a/service/storage_service.hh b/service/storage_service.hh index 4df3e909d2..087a84a2e7 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -159,6 +159,7 @@ private: sstring op_name, std::function(locator::tablet_metadata_guard&)> op); future<> stream_tablet(locator::global_tablet_id); + future<> cleanup_tablet(locator::global_tablet_id); inet_address host2ip(locator::host_id); public: storage_service(abort_source& as, distributed& db, diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index d12f86e5de..c6822e8df6 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -265,6 +265,8 @@ private: return false; case tablet_transition_stage::cleanup: return false; + case tablet_transition_stage::end_migration: + return false; } on_internal_error(lblogger, format("Invalid transition stage: {}", static_cast(trinfo->stage))); }