sstables_loader: Accept the sstables loading code

The code was moved in the relevant .cc file by previous patch, now
make it sit in the relevant class. One "significant" change is that
the messaging service is available by local reference already, not
by the sharded one. Other dependencies are already satisfied by the
patch that introduced the sstables_loader class.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2021-09-23 11:54:00 +03:00
parent 42f83f6669
commit 68ecec0197
5 changed files with 43 additions and 43 deletions

View File

@@ -55,6 +55,7 @@
#include "cdc/generation_service.hh"
#include "service/storage_proxy.hh"
#include "locator/abstract_replication_strategy.hh"
#include "sstables_loader.hh"
extern logging::logger apilog;
@@ -297,7 +298,7 @@ void unset_repair(http_context& ctx, routes& r) {
}
void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>& sst_loader, sharded<service::storage_service>& ss) {
ss::load_new_ss_tables.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
ss::load_new_ss_tables.set(r, [&ctx, &sst_loader](std::unique_ptr<request> req) {
auto ks = validate_keyspace(ctx, req->param);
auto cf = req->get_query_param("cf");
auto stream = req->get_query_param("load_and_stream");
@@ -309,10 +310,10 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>&
// No need to add the keyspace, since all we want is to avoid always sending this to the same
// CPU. Even then I am being overzealous here. This is not something that happens all the time.
auto coordinator = std::hash<sstring>()(cf) % smp::count;
return ss.invoke_on(coordinator,
return sst_loader.invoke_on(coordinator,
[ks = std::move(ks), cf = std::move(cf),
load_and_stream, primary_replica_only] (service::storage_service& s) {
return s.load_new_sstables(ks, cf, load_and_stream, primary_replica_only);
load_and_stream, primary_replica_only] (sstables_loader& loader) {
return loader.load_new_sstables(ks, cf, load_and_stream, primary_replica_only);
}).then_wrapped([] (auto&& f) {
if (f.failed()) {
auto msg = fmt::format("Failed to load new sstables: {}", f.get_exception());

View File

@@ -143,6 +143,7 @@ storage_service::storage_service(abort_source& abort_source,
_listeners.emplace_back(make_lw_shared(snitch.local()->when_reconfigured(_snitch_reconfigure)));
}
(void) _raft_gr;
(void) _view_update_generator; // temporary
}
enum class node_external_status {

View File

@@ -172,13 +172,6 @@ private:
sharded<netw::messaging_service>& _messaging;
sharded<service::migration_manager>& _migration_manager;
sharded<repair_service>& _repair;
// Note that this is obviously only valid for the current shard. Users of
// this facility should elect a shard to be the coordinator based on any
// given objective criteria
//
// It shouldn't be impossible to actively serialize two callers if the need
// ever arise.
bool _loading_new_sstables = false;
sstring _operation_in_progress;
bool _ms_stopped = false;
bool _stream_manager_stopped = false;
@@ -819,24 +812,6 @@ private:
public:
int32_t get_exception_count();
/**
* Load new SSTables not currently tracked by the system
*
* This can be called, for instance, after copying a batch of SSTables to a CF directory.
*
* This should not be called in parallel for the same keyspace / column family, and doing
* so will throw an std::runtime_exception.
*
* @param ks_name the keyspace in which to search for new SSTables.
* @param cf_name the column family in which to search for new SSTables.
* @return a future<> when the operation finishes.
*/
future<> load_new_sstables(sstring ks_name, sstring cf_name,
bool load_and_stream, bool primary_replica_only);
future<> load_and_stream(sstring ks_name, sstring cf_name,
utils::UUID table_id, std::vector<sstables::shared_sstable> sstables,
bool primary_replica_only);
future<> set_tables_autocompaction(const sstring &keyspace, std::vector<sstring> tables, bool enabled);
template <typename Func>

View File

@@ -30,11 +30,9 @@
#include "locator/abstract_replication_strategy.hh"
#include "message/messaging_service.hh"
#include "service/storage_service.hh" // temporary
static logging::logger llog("sstables_loader");
namespace service {
namespace {
class send_meta_data {
gms::inet_address _node;
@@ -122,7 +120,9 @@ public:
}
};
future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name,
} // anonymous namespace
future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
utils::UUID table_id, std::vector<sstables::shared_sstable> sstables, bool primary_replica_only) {
const auto full_partition_range = dht::partition_range::make_open_ended_both_sides();
const auto full_token_range = dht::token_range::make_open_ended_both_sides();
@@ -165,7 +165,7 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name,
std::exception_ptr eptr;
bool failed = false;
try {
netw::messaging_service& ms = _messaging.local();
netw::messaging_service& ms = _messaging;
while (auto mf = co_await reader()) {
bool is_partition_start = mf->is_partition_start();
if (is_partition_start) {
@@ -247,7 +247,7 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name,
// For more details, see the commends on column_family::load_new_sstables
// All the global operations are going to happen here, and just the reloading happens
// in there.
future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name,
future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
bool load_and_stream, bool primary_replica_only) {
if (_loading_new_sstables) {
throw std::runtime_error("Already loading SSTables. Try again later");
@@ -261,8 +261,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name,
utils::UUID table_id;
std::vector<std::vector<sstables::shared_sstable>> sstables_on_shards;
std::tie(table_id, sstables_on_shards) = co_await distributed_loader::get_sstables_from_upload_dir(_db, ks_name, cf_name);
co_await container().invoke_on_all([&sstables_on_shards, ks_name, cf_name, table_id, primary_replica_only] (storage_service& ss) mutable -> future<> {
co_await ss.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only);
co_await container().invoke_on_all([&sstables_on_shards, ks_name, cf_name, table_id, primary_replica_only] (sstables_loader& loader) mutable -> future<> {
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only);
});
} else {
co_await distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, ks_name, cf_name);
@@ -278,5 +278,3 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name,
_loading_new_sstables = false;
co_return;
}
} // namespace service

View File

@@ -22,6 +22,8 @@
#pragma once
#include <seastar/core/sharded.hh>
#include "utils/UUID.hh"
#include "sstables/shared_sstable.hh"
using namespace seastar;
@@ -44,6 +46,18 @@ class sstables_loader : public seastar::peering_sharded_service<sstables_loader>
sharded<db::view::view_update_generator>& _view_update_generator;
netw::messaging_service& _messaging;
// Note that this is obviously only valid for the current shard. Users of
// this facility should elect a shard to be the coordinator based on any
// given objective criteria
//
// It shouldn't be impossible to actively serialize two callers if the need
// ever arise.
bool _loading_new_sstables = false;
future<> load_and_stream(sstring ks_name, sstring cf_name,
utils::UUID table_id, std::vector<sstables::shared_sstable> sstables,
bool primary_replica_only);
public:
sstables_loader(sharded<database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
@@ -54,9 +68,20 @@ public:
, _view_update_generator(view_update_generator)
, _messaging(messaging)
{
(void)_db;
(void)_sys_dist_ks;
(void)_view_update_generator;
(void)_messaging;
}
/**
* Load new SSTables not currently tracked by the system
*
* This can be called, for instance, after copying a batch of SSTables to a CF directory.
*
* This should not be called in parallel for the same keyspace / column family, and doing
* so will throw an std::runtime_exception.
*
* @param ks_name the keyspace in which to search for new SSTables.
* @param cf_name the column family in which to search for new SSTables.
* @return a future<> when the operation finishes.
*/
future<> load_new_sstables(sstring ks_name, sstring cf_name,
bool load_and_stream, bool primary_replica_only);
};