node_ops: service: create streaming tasks

Create tasks which cover streaming part of topology changes. These
tasks are children of respective node_ops_virtual_task.
This commit is contained in:
Aleksandra Martyniuk
2024-07-12 13:25:15 +02:00
parent 63e82764e1
commit a903971a74
3 changed files with 78 additions and 26 deletions

View File

@@ -9,6 +9,7 @@
#include "db/system_keyspace.hh"
#include "node_ops/task_manager_module.hh"
#include "service/storage_service.hh"
#include "service/topology_coordinator.hh"
#include "service/topology_state_machine.hh"
#include "tasks/task_handler.hh"
@@ -132,6 +133,44 @@ future<std::vector<tasks::task_stats>> node_ops_virtual_task::get_stats() {
}));
}
streaming_task_impl::streaming_task_impl(tasks::task_manager::module_ptr module,
tasks::task_id parent_id,
streaming::stream_reason reason,
std::optional<shared_future<>>& result,
std::function<future<>()> action) noexcept
: tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", "", "", "", parent_id)
, _reason(reason)
, _result(result)
, _action(std::move(action))
{}
std::string streaming_task_impl::type() const {
return fmt::format("{}: streaming", _reason);
}
tasks::is_internal streaming_task_impl::is_internal() const noexcept {
return tasks::is_internal::no;
}
future<> streaming_task_impl::run() {
// If no operation was previously started - start it now
// If previous operation still running - wait for it an return its result
// If previous operation completed successfully - return immediately
// If previous operation failed - restart it
if (!_result || _result->failed()) {
if (_result) {
service::rtlogger.info("retry streaming after previous attempt failed with {}", _result->get_future().get_exception());
} else {
service::rtlogger.info("start streaming");
}
_result = _action();
} else {
service::rtlogger.debug("already streaming");
}
co_await _result.value().get_future();
service::rtlogger.info("streaming completed");
}
task_manager_module::task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept
: tasks::task_manager::module(tm, "node_ops")
, _ss(ss)

View File

@@ -39,6 +39,24 @@ private:
future<std::optional<tasks::task_status>> get_status_helper(tasks::task_id id) const;
};
class streaming_task_impl : public tasks::task_manager::task::impl {
private:
streaming::stream_reason _reason;
std::optional<shared_future<>>& _result;
std::function<future<>()> _action;
public:
streaming_task_impl(tasks::task_manager::module_ptr module,
tasks::task_id parent_id,
streaming::stream_reason reason,
std::optional<shared_future<>>& result,
std::function<future<>()> action) noexcept;
virtual std::string type() const override;
virtual tasks::is_internal is_internal() const noexcept override;
protected:
virtual future<> run() override;
};
class task_manager_module : public tasks::task_manager::module {
private:
service::storage_service& _ss;