replica: Use global_table_ptr in distributed loader

The loader has very similar global_column_family_ptr class for its
distributed loadings. Now it can use the "standard" one.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2023-05-16 19:57:10 +03:00
parent d7f99d031d
commit c3fca9481c
3 changed files with 14 additions and 26 deletions

View File

@@ -1056,6 +1056,11 @@ void global_table_ptr::assign(table& t) {
table* global_table_ptr::operator->() const noexcept { return &*_p[this_shard_id()]; }
table& global_table_ptr::operator*() const noexcept { return *_p[this_shard_id()]; }
future<global_table_ptr> get_table_on_all_shards(sharded<database>& sharded_db, sstring ks_name, sstring cf_name) {
auto uuid = sharded_db.local().find_uuid(ks_name, cf_name);
return get_table_on_all_shards(sharded_db, std::move(uuid));
}
future<global_table_ptr> get_table_on_all_shards(sharded<database>& sharded_db, table_id uuid) {
global_table_ptr table_shards;
co_await sharded_db.invoke_on_all([&] (auto& db) {

View File

@@ -14,6 +14,7 @@
#include <seastar/util/closeable.hh>
#include "distributed_loader.hh"
#include "replica/database.hh"
#include "replica/global_table_ptr.hh"
#include "db/config.hh"
#include "db/extensions.hh"
#include "db/system_keyspace.hh"
@@ -83,26 +84,6 @@ io_error_handler error_handler_gen_for_upload_dir(disk_error_signal_type& dummy)
return error_handler_for_upload_dir();
}
// global_column_family_ptr provides a way to easily retrieve local instance of a given column family.
class global_column_family_ptr {
distributed<replica::database>& _db;
table_id _id;
private:
replica::column_family& get() const { return _db.local().find_column_family(_id); }
public:
global_column_family_ptr(distributed<replica::database>& db, sstring ks_name, sstring cf_name)
: _db(db)
, _id(_db.local().find_column_family(ks_name, cf_name).schema()->id()) {
}
replica::column_family* operator->() const {
return &get();
}
replica::column_family& operator*() const {
return get();
}
};
future<>
distributed_loader::process_sstable_dir(sharded<sstables::sstable_directory>& dir, sstables::sstable_directory::process_flags flags) {
co_await dir.invoke_on(0, [] (const sstables::sstable_directory& d) {
@@ -449,7 +430,7 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
attr.sched_group = db.local().get_streaming_scheduling_group();
return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, ks = std::move(ks), cf = std::move(cf)] {
global_column_family_ptr global_table(db, ks, cf);
auto global_table = get_table_on_all_shards(db, ks, cf).get0();
sharded<sstables::sstable_directory> directory;
auto upload = fs::path(global_table->dir()) / sstables::upload_dir;
@@ -516,7 +497,7 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
distributed_loader::get_sstables_from_upload_dir(distributed<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg) {
return seastar::async([&db, ks = std::move(ks), cf = std::move(cf), cfg] {
global_column_family_ptr global_table(db, ks, cf);
auto global_table = get_table_on_all_shards(db, ks, cf).get0();
sharded<sstables::sstable_directory> directory;
auto table_id = global_table->schema()->id();
auto upload = fs::path(global_table->dir()) / sstables::upload_dir;
@@ -595,18 +576,18 @@ class table_populator {
distributed<replica::database>& _db;
sstring _ks;
sstring _cf;
global_column_family_ptr _global_table;
global_table_ptr _global_table;
fs::path _base_path;
std::unordered_map<sstring, lw_shared_ptr<sharded<sstables::sstable_directory>>> _sstable_directories;
sstables::sstable_version_types _highest_version = sstables::oldest_writable_sstable_format;
std::optional<sstables::generation_type> _highest_generation;
public:
table_populator(distributed<replica::database>& db, sstring ks, sstring cf)
table_populator(global_table_ptr ptr, distributed<replica::database>& db, sstring ks, sstring cf)
: _db(db)
, _ks(std::move(ks))
, _cf(std::move(cf))
, _global_table(_db, _ks, _cf)
, _global_table(std::move(ptr))
, _base_path(_global_table->dir())
{}
@@ -788,7 +769,8 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
auto sstdir = ks.column_family_directory(ksdir, cfname, uuid);
dblog.info("Keyspace {}: Reading CF {} id={} version={} storage={}", ks_name, cfname, uuid, s->version(), cf->get_storage_options().type_string());
auto metadata = table_populator(db, ks_name, cfname);
auto gtable = co_await get_table_on_all_shards(db, ks_name, cfname);
auto metadata = table_populator(std::move(gtable), db, ks_name, cfname);
std::exception_ptr ex;
try {

View File

@@ -28,5 +28,6 @@ public:
};
future<global_table_ptr> get_table_on_all_shards(sharded<database>& db, table_id uuid);
future<global_table_ptr> get_table_on_all_shards(sharded<database>& db, sstring ks_name, sstring cf_name);
} // replica namespace