From f70c4127c6c8aa8b0c4fcf4bdfe290638036677d Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 25 Oct 2023 18:34:07 +0300 Subject: [PATCH] storage_service: topology coordinator: introduce sstable cleanup fiber Introduce a fiber that waits on a topology event and when it sees that the node it runs on needs to perform sstable cleanup it initiates one for each non tablet, non local table and resets "cleanup" flag back to "clean" in the topology. --- service/storage_service.cc | 103 ++++++++++++++++++++++++++++++++++++- service/storage_service.hh | 3 ++ 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 18078465c6..6433ebe640 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -22,6 +22,7 @@ #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" #include "db/consistency_level.hh" +#include "seastar/core/when_all.hh" #include "service/tablet_allocator.hh" #include "locator/tablets.hh" #include "locator/tablet_metadata_guard.hh" @@ -1042,6 +1043,104 @@ topology_node_mutation_builder& topology_mutation_builder::with_node(raft::serve return *_node_builder; } +future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded& proxy) noexcept { + while (!_group0_as.abort_requested()) { + bool err = false; + try { + co_await _topology_state_machine.event.when([&] { + auto me = _topology_state_machine._topology.find(server.id()); + return me && me->second.cleanup == cleanup_status::running; + }); + + std::vector> tasks; + + auto do_cleanup_ks = [this, &proxy] (sstring ks_name, std::vector table_infos) -> future<> { + // Wait for all local writes to complete before cleanup + co_await proxy.invoke_on_all([] (storage_proxy& sp) -> future<> { + co_return co_await sp.await_pending_writes(); + }); + auto& compaction_module = _db.local().get_compaction_manager().get_task_manager_module(); + auto task = co_await compaction_module.make_and_start_task({}, ks_name, _db, table_infos); + try { + co_return co_await task->done(); + } catch (...) { + slogger.error("raft topology: cleanup failed keyspace={} tables={} failed: {}", task->get_status().keyspace, table_infos, std::current_exception()); + throw; + } + }; + + { + // The scope for the guard + auto guard = co_await _group0->client().start_operation(&_group0_as); + auto me = _topology_state_machine._topology.find(server.id()); + // Recheck that cleanup is needed after the barrier + if (!me || me->second.cleanup != cleanup_status::running) { + slogger.trace("raft topology: cleanup triggered, but not needed"); + continue; + } + + slogger.info("raft topology: start cleanup"); + + auto keyspaces = _db.local().get_all_keyspaces(); + + tasks.reserve(keyspaces.size()); + + co_await coroutine::parallel_for_each(keyspaces.begin(), keyspaces.end(), [this, &tasks, &do_cleanup_ks] (const sstring& ks_name) -> future<> { + auto ks = _db.local().find_keyspace(ks_name); + if (ks.get_replication_strategy().is_per_table() || is_system_keyspace(ks_name)) { + // Skip tablets tables since they do their own cleanup and system tables + // since they are local and not affected by range movements. + co_return; + } + const auto& cf_meta_data = ks.metadata().get()->cf_meta_data(); + std::vector table_infos; + table_infos.reserve(cf_meta_data.size()); + for (const auto& [name, schema] : cf_meta_data) { + table_infos.emplace_back(table_info{name, schema->id()}); + } + + tasks.push_back(do_cleanup_ks(std::move(ks_name), std::move(table_infos))); + }); + } + + // Note that the guard is released while we are waiting for cleanup tasks to complete + co_await when_all_succeed(tasks.begin(), tasks.end()).discard_result(); + + slogger.info("raft topology: cleanup ended"); + + while (true) { + auto guard = co_await _group0->client().start_operation(&_group0_as); + topology_mutation_builder builder(guard.write_timestamp()); + builder.with_node(server.id()).set("cleanup_status", cleanup_status::clean); + + topology_change change{{builder.build()}}; + group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup completed for {}", server.id())); + + try { + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); + } catch (group0_concurrent_modification&) { + slogger.info("raft topology: cleanup flag clearing: concurrent operation is detected, retrying."); + continue; + } + break; + } + slogger.debug("raft topology: cleanup flag cleared"); + } catch (const seastar::abort_requested_exception &) { + slogger.info("raft topology: cleanup fiber aborted"); + break; + } catch (raft::request_aborted&) { + slogger.info("raft topology: cleanup fiber aborted"); + break; + } catch (...) { + slogger.error("raft topology: cleanup fiber got an error: {}", std::current_exception()); + err = true; + } + if (err) { + co_await sleep_abortable(std::chrono::seconds(1), _group0_as); + } + } +} + using raft_topology_cmd_handler_type = noncopyable_function( raft::term_t, uint64_t, const raft_topology_cmd&)>; @@ -3519,6 +3618,8 @@ future<> storage_service::join_token_ring(sharded storage_service::stop() { future<> storage_service::wait_for_group0_stop() { _group0_as.request_abort(); _topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception())); - co_await std::move(_raft_state_monitor); + co_await when_all(std::move(_raft_state_monitor), std::move(_sstable_cleanup_fiber)); } future<> storage_service::check_for_endpoint_collision(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features) { diff --git a/service/storage_service.hh b/service/storage_service.hh index 1597830535..380db87978 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -805,6 +805,9 @@ private: shared_promise<> _join_node_response_done; semaphore _join_node_response_handler_mutex{1}; + future<> _sstable_cleanup_fiber = make_ready_future<>(); + future<> sstable_cleanup_fiber(raft::server& raft, sharded& proxy) noexcept; + // We need to be able to abort all group0 operation during shutdown, so we need special abort source for that abort_source _group0_as;