From a903971a741b9e95c384eab580b2d720619952ee Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Fri, 12 Jul 2024 13:25:15 +0200 Subject: [PATCH] node_ops: service: create streaming tasks Create tasks which cover streaming part of topology changes. These tasks are children of respective node_ops_virtual_task. --- node_ops/task_manager_module.cc | 39 +++++++++++++++++++++++++++ node_ops/task_manager_module.hh | 18 +++++++++++++ service/storage_service.cc | 47 +++++++++++++++------------------ 3 files changed, 78 insertions(+), 26 deletions(-) diff --git a/node_ops/task_manager_module.cc b/node_ops/task_manager_module.cc index f216c8fcd2..22ee4efc5f 100644 --- a/node_ops/task_manager_module.cc +++ b/node_ops/task_manager_module.cc @@ -9,6 +9,7 @@ #include "db/system_keyspace.hh" #include "node_ops/task_manager_module.hh" #include "service/storage_service.hh" +#include "service/topology_coordinator.hh" #include "service/topology_state_machine.hh" #include "tasks/task_handler.hh" @@ -132,6 +133,44 @@ future> node_ops_virtual_task::get_stats() { })); } +streaming_task_impl::streaming_task_impl(tasks::task_manager::module_ptr module, + tasks::task_id parent_id, + streaming::stream_reason reason, + std::optional>& result, + std::function()> action) noexcept + : tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", "", "", "", parent_id) + , _reason(reason) + , _result(result) + , _action(std::move(action)) +{} + +std::string streaming_task_impl::type() const { + return fmt::format("{}: streaming", _reason); +} + +tasks::is_internal streaming_task_impl::is_internal() const noexcept { + return tasks::is_internal::no; +} + +future<> streaming_task_impl::run() { + // If no operation was previously started - start it now + // If previous operation still running - wait for it an return its result + // If previous operation completed successfully - return immediately + // If previous operation failed - restart it + if (!_result || _result->failed()) { + if (_result) { + service::rtlogger.info("retry streaming after previous attempt failed with {}", _result->get_future().get_exception()); + } else { + service::rtlogger.info("start streaming"); + } + _result = _action(); + } else { + service::rtlogger.debug("already streaming"); + } + co_await _result.value().get_future(); + service::rtlogger.info("streaming completed"); +} + task_manager_module::task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept : tasks::task_manager::module(tm, "node_ops") , _ss(ss) diff --git a/node_ops/task_manager_module.hh b/node_ops/task_manager_module.hh index a64b4ccb32..f994e22aeb 100644 --- a/node_ops/task_manager_module.hh +++ b/node_ops/task_manager_module.hh @@ -39,6 +39,24 @@ private: future> get_status_helper(tasks::task_id id) const; }; +class streaming_task_impl : public tasks::task_manager::task::impl { +private: + streaming::stream_reason _reason; + std::optional>& _result; + std::function()> _action; +public: + streaming_task_impl(tasks::task_manager::module_ptr module, + tasks::task_id parent_id, + streaming::stream_reason reason, + std::optional>& result, + std::function()> action) noexcept; + + virtual std::string type() const override; + virtual tasks::is_internal is_internal() const noexcept override; +protected: + virtual future<> run() override; +}; + class task_manager_module : public tasks::task_manager::module { private: service::storage_service& _ss; diff --git a/service/storage_service.cc b/service/storage_service.cc index 4baaf74e68..bd8bdc504a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5348,26 +5348,6 @@ future storage_service::raft_topology_cmd_handler(raft raft_topology_cmd_result result; rtlogger.debug("topology cmd rpc {} is called", cmd.cmd); - // The retrier does: - // If no operation was previously started - start it now - // If previous operation still running - wait for it an return its result - // If previous operation completed successfully - return immediately - // If previous operation failed - restart it - auto retrier = [] (std::optional>& f, auto&& func) -> future<> { - if (!f || f->failed()) { - if (f) { - rtlogger.info("retry streaming after previous attempt failed with {}", f->get_future().get_exception()); - } else { - rtlogger.info("start streaming"); - } - f = func(); - } else { - rtlogger.debug("already streaming"); - } - co_await f.value().get_future(); - rtlogger.info("streaming completed"); - }; - try { auto& raft_server = _group0->group0_server(); // do barrier to make sure we always see the latest topology @@ -5509,9 +5489,11 @@ future storage_service::raft_topology_cmd_handler(raft cf->notify_bootstrap_or_replace_start(); } }); + tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0}; if (rs.state == node_state::bootstrapping) { if (!_topology_state_machine._topology.normal_nodes.empty()) { // stream only if there is a node in normal state - co_await retrier(_bootstrap_result, coroutine::lambda([&] () -> future<> { + auto task = co_await get_task_manager_module().make_and_start_task(parent_info, + parent_info.id, streaming::stream_reason::bootstrap, _bootstrap_result, coroutine::lambda([this, &rs] () -> future<> { if (is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap)) { co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens); } else { @@ -5520,10 +5502,12 @@ future storage_service::raft_topology_cmd_handler(raft co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, _topology_state_machine._topology.session); } })); + co_await task->done(); } // Bootstrap did not complete yet, but streaming did } else { - co_await retrier(_bootstrap_result, coroutine::lambda([&] () ->future<> { + auto task = co_await get_task_manager_module().make_and_start_task(parent_info, + parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &raft_server] () -> future<> { if (!_topology_state_machine._topology.req_param.contains(raft_server.id())) { on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", raft_server.id())); } @@ -5547,6 +5531,7 @@ future storage_service::raft_topology_cmd_handler(raft co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, _topology_state_machine._topology.session, *existing_ip); } })); + co_await task->done(); } co_await _db.invoke_on_all([] (replica::database& db) { for (auto& cf : db.get_non_system_column_families()) { @@ -5556,9 +5541,13 @@ future storage_service::raft_topology_cmd_handler(raft result.status = raft_topology_cmd_result::command_status::success; } break; - case node_state::decommissioning: - co_await retrier(_decommission_result, coroutine::lambda([&] () { return unbootstrap(); })); + case node_state::decommissioning: { + tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0}; + auto task = co_await get_task_manager_module().make_and_start_task(parent_info, + parent_info.id, streaming::stream_reason::decommission, _decommission_result, coroutine::lambda([this] () { return unbootstrap(); })); + co_await task->done(); result.status = raft_topology_cmd_result::command_status::success; + } break; case node_state::normal: { // If asked to stream a node in normal state it means that remove operation is running @@ -5573,7 +5562,9 @@ future storage_service::raft_topology_cmd_handler(raft const auto& am = _group0->address_map(); auto ip = am.find(id); // map node id to ip assert (ip); // what to do if address is unknown? - co_await retrier(_remove_result[id], coroutine::lambda([&] () { + tasks::task_info parent_info{tasks::task_id{it->second.request_id}, 0}; + auto task = co_await get_task_manager_module().make_and_start_task(parent_info, + parent_info.id, streaming::stream_reason::removenode, _remove_result[id], coroutine::lambda([this, ip] () { auto as = make_shared(); auto sub = _abort_source.subscribe([as] () noexcept { if (!as->abort_requested()) { @@ -5596,13 +5587,16 @@ future storage_service::raft_topology_cmd_handler(raft return removenode_with_stream(*ip, _topology_state_machine._topology.session, as); } })); + co_await task->done(); result.status = raft_topology_cmd_result::command_status::success; } break; case node_state::rebuilding: { auto source_dc = std::get(_topology_state_machine._topology.req_param[raft_server.id()]).source_dc; rtlogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); - co_await retrier(_rebuild_result, [&] () -> future<> { + tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0}; + auto task = co_await get_task_manager_module().make_and_start_task(parent_info, + parent_info.id, streaming::stream_reason::rebuild, _rebuild_result, [this, &source_dc] () -> future<> { auto tmptr = get_token_metadata_ptr(); if (is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) { co_await _repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); @@ -5628,6 +5622,7 @@ future storage_service::raft_topology_cmd_handler(raft } } }); + co_await task->done(); _rebuild_result.reset(); result.status = raft_topology_cmd_result::command_status::success; }