replica: Make global_table_ptr a class
Right now all users of global_table know it's a vector and reference its elements with this_shard_id() index. Making the global_table_ptr a class makes it possible to stop using operator[] and "index" this_shard_id() in its -> and * operators. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -1039,12 +1039,28 @@ future<> database::detach_column_family(table& cf) {
|
||||
}
|
||||
}
|
||||
|
||||
global_table_ptr::global_table_ptr() {
|
||||
_p.resize(smp::count);
|
||||
}
|
||||
|
||||
global_table_ptr::global_table_ptr(global_table_ptr&& o) noexcept
|
||||
: _p(std::move(o._p))
|
||||
{ }
|
||||
|
||||
global_table_ptr::~global_table_ptr() {}
|
||||
|
||||
void global_table_ptr::assign(table& t) {
|
||||
_p[this_shard_id()] = make_foreign(t.shared_from_this());
|
||||
}
|
||||
|
||||
table* global_table_ptr::operator->() const noexcept { return &*_p[this_shard_id()]; }
|
||||
table& global_table_ptr::operator*() const noexcept { return *_p[this_shard_id()]; }
|
||||
|
||||
future<global_table_ptr> get_table_on_all_shards(sharded<database>& sharded_db, table_id uuid) {
|
||||
global_table_ptr table_shards;
|
||||
table_shards.resize(smp::count);
|
||||
co_await sharded_db.invoke_on_all([&] (auto& db) {
|
||||
try {
|
||||
table_shards[this_shard_id()] = make_foreign(db.find_column_family(uuid).shared_from_this());
|
||||
table_shards.assign(db.find_column_family(uuid));
|
||||
} catch (no_such_column_family&) {
|
||||
on_internal_error(dblog, fmt::format("Table UUID={} not found", uuid));
|
||||
}
|
||||
@@ -1058,13 +1074,13 @@ future<> database::drop_table_on_all_shards(sharded<database>& sharded_db, sstri
|
||||
|
||||
auto uuid = sharded_db.local().find_uuid(ks_name, cf_name);
|
||||
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
|
||||
auto table_dir = fs::path(table_shards[this_shard_id()]->dir());
|
||||
auto table_dir = fs::path(table_shards->dir());
|
||||
std::optional<sstring> snapshot_name_opt;
|
||||
if (with_snapshot) {
|
||||
snapshot_name_opt = format("pre-drop-{}", db_clock::now().time_since_epoch().count());
|
||||
}
|
||||
co_await sharded_db.invoke_on_all([&] (database& db) {
|
||||
return db.detach_column_family(*table_shards[this_shard_id()]);
|
||||
return db.detach_column_family(*table_shards);
|
||||
});
|
||||
// Use a time point in the far future (9999-12-31T00:00:00+0000)
|
||||
// to ensure all sstables are truncated,
|
||||
@@ -1072,7 +1088,7 @@ future<> database::drop_table_on_all_shards(sharded<database>& sharded_db, sstri
|
||||
constexpr db_clock::time_point truncated_at(std::chrono::seconds(253402214400));
|
||||
auto f = co_await coroutine::as_future(truncate_table_on_all_shards(sharded_db, table_shards, truncated_at, with_snapshot, std::move(snapshot_name_opt)));
|
||||
co_await smp::invoke_on_all([&] {
|
||||
return table_shards[this_shard_id()]->stop();
|
||||
return table_shards->stop();
|
||||
});
|
||||
f.get(); // re-throw exception from truncate() if any
|
||||
co_await sstables::remove_table_directory_if_has_no_snapshots(table_dir);
|
||||
@@ -2321,7 +2337,7 @@ future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, s
|
||||
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
|
||||
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag);
|
||||
if (snap_views) {
|
||||
for (const auto& vp : table_shards[this_shard_id()]->views()) {
|
||||
for (const auto& vp : table_shards->views()) {
|
||||
co_await snapshot_table_on_all_shards(sharded_db, ks_name, vp->cf_name(), tag, db::snapshot_ctl::snap_views::no, skip_flush);
|
||||
}
|
||||
}
|
||||
@@ -2360,7 +2376,7 @@ struct database::table_truncate_state {
|
||||
};
|
||||
|
||||
future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, std::optional<db_clock::time_point> truncated_at_opt, bool with_snapshot, std::optional<sstring> snapshot_name_opt) {
|
||||
auto& cf = *table_shards[this_shard_id()];
|
||||
auto& cf = *table_shards;
|
||||
auto s = cf.schema();
|
||||
|
||||
// Schema tables changed commitlog domain at some point and this node will refuse to boot with
|
||||
@@ -2378,7 +2394,7 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, c
|
||||
|
||||
co_await coroutine::parallel_for_each(boost::irange(0u, smp::count), [&] (unsigned shard) -> future<> {
|
||||
table_states[shard] = co_await smp::submit_to(shard, [&] () -> future<foreign_ptr<std::unique_ptr<table_truncate_state>>> {
|
||||
auto& cf = *table_shards[this_shard_id()];
|
||||
auto& cf = *table_shards;
|
||||
auto st = std::make_unique<table_truncate_state>();
|
||||
|
||||
st->holder = cf.async_gate().hold();
|
||||
@@ -2423,7 +2439,7 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, c
|
||||
};
|
||||
co_await sharded_db.invoke_on_all([&] (replica::database& db) -> future<> {
|
||||
unsigned shard = this_shard_id();
|
||||
auto& cf = *table_shards[shard];
|
||||
auto& cf = *table_shards;
|
||||
auto& st = *table_states[shard];
|
||||
|
||||
co_await flush_or_clear(cf);
|
||||
@@ -2444,7 +2460,7 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, c
|
||||
|
||||
co_await sharded_db.invoke_on_all([&] (database& db) {
|
||||
auto shard = this_shard_id();
|
||||
auto& cf = *table_shards[shard];
|
||||
auto& cf = *table_shards;
|
||||
auto& st = *table_states[shard];
|
||||
|
||||
return db.truncate(cf, st, truncated_at);
|
||||
|
||||
@@ -66,7 +66,6 @@
|
||||
#include "compaction/compaction_fwd.hh"
|
||||
#include "utils/disk-error-handler.hh"
|
||||
#include "rust/wasmtime_bindings.hh"
|
||||
#include "replica/global_table_ptr.hh" // temporary -- replace with fwd decl
|
||||
|
||||
class cell_locker;
|
||||
class cell_locker_stats;
|
||||
@@ -140,6 +139,7 @@ extern logging::logger dblog;
|
||||
namespace replica {
|
||||
|
||||
using shared_memtable = lw_shared_ptr<memtable>;
|
||||
class global_table_ptr;
|
||||
|
||||
// We could just add all memtables, regardless of types, to a single list, and
|
||||
// then filter them out when we read them. Here's why I have chosen not to do
|
||||
|
||||
@@ -16,7 +16,17 @@ namespace replica {
|
||||
class database;
|
||||
class table;
|
||||
|
||||
using global_table_ptr = std::vector<foreign_ptr<lw_shared_ptr<table>>>;
|
||||
class global_table_ptr {
|
||||
std::vector<foreign_ptr<lw_shared_ptr<table>>> _p;
|
||||
public:
|
||||
global_table_ptr();
|
||||
global_table_ptr(global_table_ptr&&) noexcept;
|
||||
~global_table_ptr();
|
||||
void assign(table& t);
|
||||
table* operator->() const noexcept;
|
||||
table& operator*() const noexcept;
|
||||
};
|
||||
|
||||
future<global_table_ptr> get_table_on_all_shards(sharded<database>& db, table_id uuid);
|
||||
|
||||
} // replica namespace
|
||||
|
||||
@@ -47,6 +47,7 @@
|
||||
#include "utils/lister.hh"
|
||||
#include "dht/token.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "replica/global_table_ptr.hh"
|
||||
|
||||
#include <boost/range/algorithm/remove_if.hpp>
|
||||
#include <boost/range/algorithm.hpp>
|
||||
@@ -1682,11 +1683,11 @@ future<> table::write_schema_as_cql(database& db, sstring dir) const {
|
||||
|
||||
// Runs the orchestration code on an arbitrary shard to balance the load.
|
||||
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name) {
|
||||
auto jsondir = table_shards[this_shard_id()]->_config.datadir + "/snapshots/" + name;
|
||||
auto jsondir = table_shards->_config.datadir + "/snapshots/" + name;
|
||||
auto orchestrator = std::hash<sstring>()(jsondir) % smp::count;
|
||||
|
||||
co_await smp::submit_to(orchestrator, [&] () -> future<> {
|
||||
auto& t = *table_shards[this_shard_id()];
|
||||
auto& t = *table_shards;
|
||||
auto s = t.schema();
|
||||
tlogger.debug("Taking snapshot of {}.{}: directory={}", s->ks_name(), s->cf_name(), jsondir);
|
||||
|
||||
@@ -1695,7 +1696,7 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
|
||||
|
||||
co_await coroutine::parallel_for_each(boost::irange(0u, smp::count), [&] (unsigned shard) -> future<> {
|
||||
file_sets.emplace_back(co_await smp::submit_to(shard, [&] {
|
||||
return table_shards[this_shard_id()]->take_snapshot(sharded_db.local(), jsondir);
|
||||
return table_shards->take_snapshot(sharded_db.local(), jsondir);
|
||||
}));
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user