replica: move ::database, ::keyspace, and ::table to replica namespace

Move replica-oriented classes to the replica namespace. The main
classes moved are ::database, ::keyspace, and ::table, but a few
ancillary classes are also moved. There are certainly classes that
should be moved but aren't (like distributed_loader) but we have
to start somewhere.

References are adjusted treewide. In many cases, it is obvious that
a call site should not access the replica (but the data_dictionary
instead), but that is left for separate work.

scylla-gdb.py is adjusted to look for both the new and old names.
This commit is contained in:
Avi Kivity
2022-01-03 19:01:09 +02:00
parent ae3a360725
commit bbad8f4677
160 changed files with 1100 additions and 982 deletions

View File

@@ -50,7 +50,7 @@ namespace db::view {
// the same as the previous one (as a result of trimming cpu_id),
// the duplicated fragment is ignored.
class build_progress_virtual_reader {
database& _db;
replica::database& _db;
struct build_progress_reader : flat_mutation_reader::impl {
column_id _scylla_next_token_col;
@@ -65,7 +65,7 @@ class build_progress_virtual_reader {
build_progress_reader(
schema_ptr legacy_schema,
reader_permit permit,
column_family& scylla_views_build_progress,
replica::column_family& scylla_views_build_progress,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -189,7 +189,7 @@ class build_progress_virtual_reader {
};
public:
build_progress_virtual_reader(database& db)
build_progress_virtual_reader(replica::database& db)
: _db(db) {
}

View File

@@ -1271,7 +1271,7 @@ future<> mutate_MV(
dht::token base_token,
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,
cf_stats& cf_stats,
replica::cf_stats& cf_stats,
tracing::trace_state_ptr tr_state,
db::timeout_semaphore_units pending_view_updates,
service::allow_hints allow_hints,
@@ -1387,7 +1387,7 @@ future<> mutate_MV(
});
}
view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_dist_ks, service::migration_notifier& mn)
view_builder::view_builder(replica::database& db, db::system_distributed_keyspace& sys_dist_ks, service::migration_notifier& mn)
: _db(db)
, _sys_dist_ks(sys_dist_ks)
, _mnotifier(mn)
@@ -1625,7 +1625,7 @@ void view_builder::setup_shard_build_step(
try {
_db.find_schema(view->view_info()->base_id());
return true;
} catch (const no_such_column_family&) {
} catch (const replica::no_such_column_family&) {
return false;
}
};
@@ -1640,7 +1640,7 @@ void view_builder::setup_shard_build_step(
}
// The view was dropped and a table was re-created with the same name,
// but the write to the view-related system tables didn't make it.
} catch (const no_such_column_family&) {
} catch (const replica::no_such_column_family&) {
// Fall-through
}
if (this_shard_id() == 0) {
@@ -1687,7 +1687,7 @@ future<> view_builder::calculate_shard_build_step(view_builder_init_state& vbi)
try {
_db.find_schema(view->view_info()->base_id());
return true;
} catch (const no_such_column_family&) {
} catch (const replica::no_such_column_family&) {
return false;
}
};
@@ -1749,7 +1749,7 @@ future<> view_builder::add_new_view(view_ptr view, build_step& step) {
system_keyspace::register_view_for_building(view->ks_name(), view->cf_name(), step.current_token())).discard_result();
}
static future<> flush_base(lw_shared_ptr<column_family> base, abort_source& as) {
static future<> flush_base(lw_shared_ptr<replica::column_family> base, abort_source& as) {
struct empty_state { };
return exponential_backoff_retry::do_until_value(1s, 1min, as, [base = std::move(base)] {
return base->flush().then_wrapped([base] (future<> f) -> std::optional<empty_state> {
@@ -1784,7 +1784,7 @@ void view_builder::on_create_view(const sstring& ks_name, const sstring& view_na
});
});
});
}).handle_exception_type([] (no_such_column_family&) { });
}).handle_exception_type([] (replica::no_such_column_family&) { });
}
void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) {
@@ -1801,7 +1801,7 @@ void view_builder::on_update_view(const sstring& ks_name, const sstring& view_na
if (status_it != step_it->second.build_status.end()) {
status_it->view = std::move(view);
}
}).handle_exception_type([] (no_such_column_family&) { });
}).handle_exception_type([] (replica::no_such_column_family&) { });
}
void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name) {
@@ -2157,7 +2157,7 @@ future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_
});
}
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const table& t, streaming::stream_reason reason) {
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const replica::table& t, streaming::stream_reason reason) {
if (is_internal_keyspace(t.schema()->ks_name())) {
return make_ready_future<bool>(false);
}
@@ -2206,7 +2206,7 @@ void view_updating_consumer::maybe_flush_buffer_mid_partition() {
}
}
view_updating_consumer::view_updating_consumer(schema_ptr schema, reader_permit permit, table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
view_updating_consumer::view_updating_consumer(schema_ptr schema, reader_permit permit, replica::table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
evictable_reader_handle& staging_reader_handle)
: view_updating_consumer(std::move(schema), std::move(permit), as, staging_reader_handle,
[table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables)] (mutation m) mutable {

View File

@@ -29,7 +29,10 @@
#include "frozen_mutation.hh"
class frozen_mutation_and_schema;
namespace replica {
struct cf_stats;
}
namespace service {
struct allow_hints_tag;
@@ -232,7 +235,7 @@ future<> mutate_MV(
dht::token base_token,
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,
cf_stats& cf_stats,
replica::cf_stats& cf_stats,
tracing::trace_state_ptr tr_state,
db::timeout_semaphore_units pending_view_updates,
service::allow_hints allow_hints,

View File

@@ -57,7 +57,10 @@ namespace service {
class migration_manager;
} // namespace service
namespace replica {
class database;
}
class exponential_backoff_retry;
namespace db::view {
@@ -142,7 +145,7 @@ class view_builder final : public service::migration_listener::only_view_notific
struct build_step final {
// Ensure we pin the column_family. It may happen that all views are removed,
// and that the base table is too before we can detect it.
lw_shared_ptr<column_family> base;
lw_shared_ptr<replica::column_family> base;
query::partition_slice pslice;
dht::partition_range prange;
flat_mutation_reader reader{nullptr};
@@ -156,7 +159,7 @@ class view_builder final : public service::migration_listener::only_view_notific
using base_to_build_step_type = std::unordered_map<utils::UUID, build_step>;
database& _db;
replica::database& _db;
db::system_distributed_keyspace& _sys_dist_ks;
service::migration_notifier& _mnotifier;
reader_permit _permit;
@@ -195,7 +198,7 @@ public:
static constexpr size_t batch_memory_max = 1024*1024;
public:
view_builder(database&, db::system_distributed_keyspace&, service::migration_notifier&);
view_builder(replica::database&, db::system_distributed_keyspace&, service::migration_notifier&);
view_builder(view_builder&&) = delete;
/**

View File

@@ -25,7 +25,9 @@
#include "streaming/stream_reason.hh"
#include "seastarx.hh"
namespace replica {
class table;
}
namespace db {
@@ -36,6 +38,6 @@ class system_distributed_keyspace;
namespace db::view {
future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const sstring& ks_name, const sstring& cf_name);
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const table& t, streaming::stream_reason reason);
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const replica::table& t, streaming::stream_reason reason);
}

View File

@@ -149,7 +149,7 @@ bool view_update_generator::should_throttle() const {
return !_started.available();
}
future<> view_update_generator::register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<table> table) {
future<> view_update_generator::register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table) {
if (_as.abort_requested()) {
return make_ready_future<>();
}
@@ -184,7 +184,7 @@ void view_update_generator::setup_metrics() {
void view_update_generator::discover_staging_sstables() {
for (auto& x : _db.get_column_families()) {
table& t = *(x.second);
replica::table& t = *(x.second);
for (auto sstables = t.get_sstables(); sstables::shared_sstable sst : *sstables) {
if (sst->requires_view_building()) {
_sstables_with_tables[t.shared_from_this()].push_back(std::move(sst));

View File

@@ -35,23 +35,23 @@ public:
static constexpr size_t registration_queue_size = 5;
private:
database& _db;
replica::database& _db;
seastar::abort_source _as;
future<> _started = make_ready_future<>();
seastar::condition_variable _pending_sstables;
named_semaphore _registration_sem{registration_queue_size, named_semaphore_exception_factory{"view update generator"}};
std::unordered_map<lw_shared_ptr<table>, std::vector<sstables::shared_sstable>> _sstables_with_tables;
std::unordered_map<lw_shared_ptr<table>, std::vector<sstables::shared_sstable>> _sstables_to_move;
std::unordered_map<lw_shared_ptr<replica::table>, std::vector<sstables::shared_sstable>> _sstables_with_tables;
std::unordered_map<lw_shared_ptr<replica::table>, std::vector<sstables::shared_sstable>> _sstables_to_move;
metrics::metric_groups _metrics;
public:
view_update_generator(database& db) : _db(db) {
view_update_generator(replica::database& db) : _db(db) {
setup_metrics();
discover_staging_sstables();
}
future<> start();
future<> stop();
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<table> table);
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table);
ssize_t available_register_units() const { return _registration_sem.available_units(); }
private:

View File

@@ -71,7 +71,7 @@ public:
, _view_update_pusher(std::move(view_update_pusher))
{ }
view_updating_consumer(schema_ptr schema, reader_permit permit, table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
view_updating_consumer(schema_ptr schema, reader_permit permit, replica::table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
evictable_reader_handle& staging_reader_handle);
view_updating_consumer(view_updating_consumer&&) = default;