From 68ecec0197d91980ed879d31ef592a636c3f1a60 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 23 Sep 2021 11:54:00 +0300 Subject: [PATCH] sstables_loader: Accept the sstables loading code The code was moved in the relevant .cc file by previous patch, now make it sit in the relevant class. One "significant" change is that the messaging service is available by local reference already, not by the sharded one. Other dependencies are already satisfied by the patch that introduced the sstables_loader class. Signed-off-by: Pavel Emelyanov --- api/storage_service.cc | 9 +++++---- service/storage_service.cc | 1 + service/storage_service.hh | 25 ------------------------- sstables_loader.cc | 18 ++++++++---------- sstables_loader.hh | 33 +++++++++++++++++++++++++++++---- 5 files changed, 43 insertions(+), 43 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 75e7d7b723..9edfe45e37 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -55,6 +55,7 @@ #include "cdc/generation_service.hh" #include "service/storage_proxy.hh" #include "locator/abstract_replication_strategy.hh" +#include "sstables_loader.hh" extern logging::logger apilog; @@ -297,7 +298,7 @@ void unset_repair(http_context& ctx, routes& r) { } void set_sstables_loader(http_context& ctx, routes& r, sharded& sst_loader, sharded& ss) { - ss::load_new_ss_tables.set(r, [&ctx, &ss](std::unique_ptr req) { + ss::load_new_ss_tables.set(r, [&ctx, &sst_loader](std::unique_ptr req) { auto ks = validate_keyspace(ctx, req->param); auto cf = req->get_query_param("cf"); auto stream = req->get_query_param("load_and_stream"); @@ -309,10 +310,10 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded& // No need to add the keyspace, since all we want is to avoid always sending this to the same // CPU. Even then I am being overzealous here. This is not something that happens all the time. auto coordinator = std::hash()(cf) % smp::count; - return ss.invoke_on(coordinator, + return sst_loader.invoke_on(coordinator, [ks = std::move(ks), cf = std::move(cf), - load_and_stream, primary_replica_only] (service::storage_service& s) { - return s.load_new_sstables(ks, cf, load_and_stream, primary_replica_only); + load_and_stream, primary_replica_only] (sstables_loader& loader) { + return loader.load_new_sstables(ks, cf, load_and_stream, primary_replica_only); }).then_wrapped([] (auto&& f) { if (f.failed()) { auto msg = fmt::format("Failed to load new sstables: {}", f.get_exception()); diff --git a/service/storage_service.cc b/service/storage_service.cc index 8076f96050..9824c6ebdc 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -143,6 +143,7 @@ storage_service::storage_service(abort_source& abort_source, _listeners.emplace_back(make_lw_shared(snitch.local()->when_reconfigured(_snitch_reconfigure))); } (void) _raft_gr; + (void) _view_update_generator; // temporary } enum class node_external_status { diff --git a/service/storage_service.hh b/service/storage_service.hh index f172ab1376..1d43ba957a 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -172,13 +172,6 @@ private: sharded& _messaging; sharded& _migration_manager; sharded& _repair; - // Note that this is obviously only valid for the current shard. Users of - // this facility should elect a shard to be the coordinator based on any - // given objective criteria - // - // It shouldn't be impossible to actively serialize two callers if the need - // ever arise. - bool _loading_new_sstables = false; sstring _operation_in_progress; bool _ms_stopped = false; bool _stream_manager_stopped = false; @@ -819,24 +812,6 @@ private: public: int32_t get_exception_count(); - /** - * Load new SSTables not currently tracked by the system - * - * This can be called, for instance, after copying a batch of SSTables to a CF directory. - * - * This should not be called in parallel for the same keyspace / column family, and doing - * so will throw an std::runtime_exception. - * - * @param ks_name the keyspace in which to search for new SSTables. - * @param cf_name the column family in which to search for new SSTables. - * @return a future<> when the operation finishes. - */ - future<> load_new_sstables(sstring ks_name, sstring cf_name, - bool load_and_stream, bool primary_replica_only); - future<> load_and_stream(sstring ks_name, sstring cf_name, - utils::UUID table_id, std::vector sstables, - bool primary_replica_only); - future<> set_tables_autocompaction(const sstring &keyspace, std::vector tables, bool enabled); template diff --git a/sstables_loader.cc b/sstables_loader.cc index cdc3ab21be..7558153509 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -30,11 +30,9 @@ #include "locator/abstract_replication_strategy.hh" #include "message/messaging_service.hh" -#include "service/storage_service.hh" // temporary - static logging::logger llog("sstables_loader"); -namespace service { +namespace { class send_meta_data { gms::inet_address _node; @@ -122,7 +120,9 @@ public: } }; -future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name, +} // anonymous namespace + +future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name, utils::UUID table_id, std::vector sstables, bool primary_replica_only) { const auto full_partition_range = dht::partition_range::make_open_ended_both_sides(); const auto full_token_range = dht::token_range::make_open_ended_both_sides(); @@ -165,7 +165,7 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name, std::exception_ptr eptr; bool failed = false; try { - netw::messaging_service& ms = _messaging.local(); + netw::messaging_service& ms = _messaging; while (auto mf = co_await reader()) { bool is_partition_start = mf->is_partition_start(); if (is_partition_start) { @@ -247,7 +247,7 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name, // For more details, see the commends on column_family::load_new_sstables // All the global operations are going to happen here, and just the reloading happens // in there. -future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name, +future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name, bool load_and_stream, bool primary_replica_only) { if (_loading_new_sstables) { throw std::runtime_error("Already loading SSTables. Try again later"); @@ -261,8 +261,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name, utils::UUID table_id; std::vector> sstables_on_shards; std::tie(table_id, sstables_on_shards) = co_await distributed_loader::get_sstables_from_upload_dir(_db, ks_name, cf_name); - co_await container().invoke_on_all([&sstables_on_shards, ks_name, cf_name, table_id, primary_replica_only] (storage_service& ss) mutable -> future<> { - co_await ss.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only); + co_await container().invoke_on_all([&sstables_on_shards, ks_name, cf_name, table_id, primary_replica_only] (sstables_loader& loader) mutable -> future<> { + co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only); }); } else { co_await distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, ks_name, cf_name); @@ -278,5 +278,3 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name, _loading_new_sstables = false; co_return; } - -} // namespace service diff --git a/sstables_loader.hh b/sstables_loader.hh index 6bbc3492ad..c6a4901ec2 100644 --- a/sstables_loader.hh +++ b/sstables_loader.hh @@ -22,6 +22,8 @@ #pragma once #include +#include "utils/UUID.hh" +#include "sstables/shared_sstable.hh" using namespace seastar; @@ -44,6 +46,18 @@ class sstables_loader : public seastar::peering_sharded_service sharded& _view_update_generator; netw::messaging_service& _messaging; + // Note that this is obviously only valid for the current shard. Users of + // this facility should elect a shard to be the coordinator based on any + // given objective criteria + // + // It shouldn't be impossible to actively serialize two callers if the need + // ever arise. + bool _loading_new_sstables = false; + + future<> load_and_stream(sstring ks_name, sstring cf_name, + utils::UUID table_id, std::vector sstables, + bool primary_replica_only); + public: sstables_loader(sharded& db, sharded& sys_dist_ks, @@ -54,9 +68,20 @@ public: , _view_update_generator(view_update_generator) , _messaging(messaging) { - (void)_db; - (void)_sys_dist_ks; - (void)_view_update_generator; - (void)_messaging; } + + /** + * Load new SSTables not currently tracked by the system + * + * This can be called, for instance, after copying a batch of SSTables to a CF directory. + * + * This should not be called in parallel for the same keyspace / column family, and doing + * so will throw an std::runtime_exception. + * + * @param ks_name the keyspace in which to search for new SSTables. + * @param cf_name the column family in which to search for new SSTables. + * @return a future<> when the operation finishes. + */ + future<> load_new_sstables(sstring ks_name, sstring cf_name, + bool load_and_stream, bool primary_replica_only); };