From 395ce87eff712003ceb46c533b38a22de31140a5 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Mon, 10 Jul 2023 11:42:00 +0200 Subject: [PATCH 01/10] replica: futurize database::add_column_family and database::remove As a preparation for further changes, database::add_column_family and database::remove return future<>. --- replica/database.cc | 14 ++++++++------ replica/database.hh | 4 ++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index 2983b3cfc9..93dc5cf335 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -990,10 +990,10 @@ future<> database::create_local_system_table( cfg.memtable_scheduling_group = default_scheduling_group(); cfg.memtable_to_cache_scheduling_group = default_scheduling_group(); } - add_column_family(ks, table, std::move(cfg)); + co_await add_column_family(ks, table, std::move(cfg)); } -void database::add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg) { +future<> database::add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg) { schema = local_schema_registry().learn(schema); schema->registry_entry()->mark_synced(); auto&& rs = ks.get_replication_strategy(); @@ -1032,14 +1032,15 @@ void database::add_column_family(keyspace& ks, schema_ptr schema, column_family: if (schema->is_view()) { find_column_family(schema->view_info()->base_id()).add_or_update_view(view_ptr(schema)); } + return make_ready_future(); } future<> database::add_column_family_and_make_directory(schema_ptr schema) { auto& ks = find_keyspace(schema->ks_name()); - add_column_family(ks, schema, ks.make_column_family_config(*schema, *this)); + co_await add_column_family(ks, schema, ks.make_column_family_config(*schema, *this)); auto& cf = find_column_family(schema); cf.get_index_manager().reload(); - return cf.init_storage(); + co_await cf.init_storage(); } bool database::update_column_family(schema_ptr new_schema) { @@ -1060,7 +1061,7 @@ bool database::update_column_family(schema_ptr new_schema) { return columns_changed; } -void database::remove(table& cf) noexcept { +future<> database::remove(table& cf) noexcept { auto s = cf.schema(); auto& ks = find_keyspace(s->ks_name()); cf.deregister_metrics(); @@ -1074,11 +1075,12 @@ void database::remove(table& cf) noexcept { // Drop view mutations received after base table drop. } } + return make_ready_future(); } future<> database::detach_column_family(table& cf) { auto uuid = cf.schema()->id(); - remove(cf); + co_await remove(cf); cf.clear_views(); co_await cf.await_pending_ops(); for (auto* sem : {&_read_concurrency_sem, &_streaming_concurrency_sem, &_compaction_concurrency_sem, &_system_read_concurrency_sem}) { diff --git a/replica/database.hh b/replica/database.hh index e03a6aeba3..85bfcd1c82 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1449,7 +1449,7 @@ private: Future update_write_metrics(Future&& f); void update_write_metrics_for_timed_out_write(); future<> create_keyspace(const lw_shared_ptr&, locator::effective_replication_map_factory& erm_factory, system_keyspace system); - void remove(table&) noexcept; + future<> remove(table&) noexcept; void drop_keyspace(const sstring& name); future<> update_keyspace(const keyspace_metadata& tmp_ksm); static future<> modify_keyspace_on_all_shards(sharded& sharded_db, std::function(replica::database&)> func, std::function(replica::database&)> notifier); @@ -1703,7 +1703,7 @@ public: public: bool update_column_family(schema_ptr s); private: - void add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg); + future<> add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg); future<> detach_column_family(table& cf); struct table_truncate_state; From 52afd9d42d638220b3085ff671b992421253fae0 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Tue, 18 Jul 2023 13:06:24 +0200 Subject: [PATCH 02/10] replica: wrap column families related maps into tables_metadata As a preparation for ensuring access safety for column families related maps, add tables_metadata, access to members of which would be protected by rwlock. --- api/column_family.cc | 8 ++--- api/column_family.hh | 2 +- api/compaction_manager.cc | 2 +- api/storage_service.cc | 4 +-- cdc/generation.cc | 2 +- db/commitlog/commitlog_replayer.cc | 4 +-- db/view/view_update_generator.cc | 2 +- db/virtual_tables.cc | 6 ++-- main.cc | 6 ++-- repair/repair.cc | 4 +-- repair/row_level.cc | 2 +- replica/data_dictionary_impl.hh | 2 +- replica/database.cc | 50 +++++++++++++++--------------- replica/database.hh | 27 ++++++++-------- replica/distributed_loader.cc | 2 +- scylla-gdb.py | 8 ++--- service/misc_services.cc | 8 ++--- service/storage_service.cc | 2 +- streaming/stream_session.cc | 2 +- test/boost/cql_query_large_test.cc | 2 +- test/lib/cql_test_env.cc | 2 +- 21 files changed, 74 insertions(+), 73 deletions(-) diff --git a/api/column_family.cc b/api/column_family.cc index d6b63af280..030ab20fe6 100644 --- a/api/column_family.cc +++ b/api/column_family.cc @@ -135,7 +135,7 @@ static future get_cf_histogram(http_context& ctx, const static future get_cf_histogram(http_context& ctx, utils::timed_rate_moving_average_summary_and_histogram replica::column_family_stats::*f) { std::function fun = [f] (const replica::database& db) { utils::ihistogram res; - for (auto i : db.get_column_families()) { + for (auto i : db.get_tables_metadata()._column_families) { res += (i.second->get_stats().*f).hist; } return res; @@ -162,7 +162,7 @@ static future get_cf_rate_and_histogram(http_context& c static future get_cf_rate_and_histogram(http_context& ctx, utils::timed_rate_moving_average_summary_and_histogram replica::column_family_stats::*f) { std::function fun = [f] (const replica::database& db) { utils::rate_moving_average_and_histogram res; - for (auto i : db.get_column_families()) { + for (auto i : db.get_tables_metadata()._column_families) { res += (i.second->get_stats().*f).rate(); } return res; @@ -306,7 +306,7 @@ ratio_holder filter_recent_false_positive_as_ratio_holder(const sstables::shared void set_column_family(http_context& ctx, routes& r, sharded& sys_ks) { cf::get_column_family_name.set(r, [&ctx] (const_req req){ std::vector res; - for (auto i: ctx.db.local().get_column_families_mapping()) { + for (auto i: ctx.db.local().get_tables_metadata()._ks_cf_to_uuid) { res.push_back(i.first.first + ":" + i.first.second); } return res; @@ -314,7 +314,7 @@ void set_column_family(http_context& ctx, routes& r, sharded req){ std::list res; - for (auto i: ctx.db.local().get_column_families_mapping()) { + for (auto i: ctx.db.local().get_tables_metadata()._ks_cf_to_uuid) { cf::column_family_info info; info.ks = i.first.first; info.cf = i.first.second; diff --git a/api/column_family.hh b/api/column_family.hh index 2730e7e1d0..0418c97fac 100644 --- a/api/column_family.hh +++ b/api/column_family.hh @@ -68,7 +68,7 @@ struct map_reduce_column_families_locally { std::function(std::unique_ptr, std::unique_ptr)> reducer; future> operator()(replica::database& db) const { auto res = seastar::make_lw_shared>(std::make_unique(init)); - return do_for_each(db.get_column_families(), [res, this](const std::pair>& i) { + return do_for_each(db.get_tables_metadata()._column_families, [res, this](const std::pair>& i) { *res = reducer(std::move(*res), mapper(*i.second.get())); }).then([res] { return std::move(*res); diff --git a/api/compaction_manager.cc b/api/compaction_manager.cc index 7a1aadc261..461b396f19 100644 --- a/api/compaction_manager.cc +++ b/api/compaction_manager.cc @@ -68,7 +68,7 @@ void set_compaction_manager(http_context& ctx, routes& r) { cm::get_pending_tasks_by_table.set(r, [&ctx] (std::unique_ptr req) { return ctx.db.map_reduce0([](replica::database& db) { return do_with(std::unordered_map, uint64_t, utils::tuple_hash>(), [&db](std::unordered_map, uint64_t, utils::tuple_hash>& tasks) { - return do_for_each(db.get_column_families(), [&tasks](const std::pair>& i) -> future<> { + return do_for_each(db.get_tables_metadata()._column_families, [&tasks](const std::pair>& i) -> future<> { replica::table& cf = *i.second.get(); tasks[std::make_pair(cf.schema()->ks_name(), cf.schema()->cf_name())] = cf.estimate_pending_compactions(); return make_ready_future<>(); diff --git a/api/storage_service.cc b/api/storage_service.cc index 7d81936dfe..2ef549646f 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -980,7 +980,7 @@ void set_storage_service(http_context& ctx, routes& r, shardedset_incremental_backups(value); } @@ -1258,7 +1258,7 @@ void set_storage_service(http_context& ctx, routes& r, shardedschema(); if ((ks.empty() || ks == schema->ks_name()) && (cf.empty() || cf == schema->cf_name())) { // at most Nsstables long diff --git a/cdc/generation.cc b/cdc/generation.cc index e72108b5da..530bcc14a4 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -641,7 +641,7 @@ future<> generation_service::maybe_rewrite_streams_descriptions() { // For each CDC log table get the TTL setting (from CDC options) and the table's creation time std::vector times_and_ttls; - for (auto& [_, cf] : _db.get_column_families()) { + for (auto& [_, cf] : _db.get_tables_metadata()._column_families) { auto& s = *cf->schema(); auto base = cdc::get_base_table(_db, s.ks_name(), s.cf_name()); if (!base) { diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index 9862e694e8..d582733614 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -126,7 +126,7 @@ future<> db::commitlog_replayer::impl::init() { } }, [this](replica::database& db) { return do_with(shard_rpm_map{}, [this, &db](shard_rpm_map& map) { - return parallel_for_each(db.get_column_families(), [this, &map](auto& cfp) { + return parallel_for_each(db.get_tables_metadata()._column_families, [this, &map](auto& cfp) { auto uuid = cfp.first; // We do this on each cpu, for each CF, which technically is a little wasteful, but the values are // cached, this is only startup, and it makes the code easier. @@ -156,7 +156,7 @@ future<> db::commitlog_replayer::impl::init() { // existing sstables-per-shard. // So, go through all CF:s and check, if a shard mapping does not // have data for it, assume we must set global pos to zero. - for (auto&p : _db.local().get_column_families()) { + for (auto&p : _db.local().get_tables_metadata()._column_families) { for (auto&p1 : _rpm) { // for each shard if (!p1.second.contains(p.first)) { _min_pos[p1.first] = replay_position(); diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index bbb703200d..d062aff210 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -265,7 +265,7 @@ void view_update_generator::setup_metrics() { } void view_update_generator::discover_staging_sstables() { - for (auto& x : _db.get_column_families()) { + for (auto& x : _db.get_tables_metadata()._column_families) { auto t = x.second->shared_from_this(); for (auto sstables = t->get_sstables(); sstables::shared_sstable sst : *sstables) { if (sst->requires_view_building()) { diff --git a/db/virtual_tables.cc b/db/virtual_tables.cc index fd8c3432d6..44a1aea1e1 100644 --- a/db/virtual_tables.cc +++ b/db/virtual_tables.cc @@ -283,7 +283,7 @@ public: const auto snapshots_by_tables = co_await _db.map_reduce(snapshot_reducer(), [ks_name_ = ks_data.name] (replica::database& db) mutable -> future { auto ks_name = std::move(ks_name_); snapshots_by_tables_map snapshots_by_tables; - for (auto& [_, table] : db.get_column_families()) { + for (auto& [_, table] : db.get_tables_metadata()._column_families) { if (table->schema()->ks_name() != ks_name) { continue; } @@ -433,7 +433,7 @@ private: }; co_return co_await _db.map_reduce(shard_reducer(reduce), [map, reduce] (replica::database& db) { T val = {}; - for (auto& [_, table] : db.get_column_families()) { + for (auto& [_, table] : db.get_tables_metadata()._column_families) { val = reduce(val, map(*table)); } return val; @@ -560,7 +560,7 @@ public: res.total = occupancy.total_space(); res.free = occupancy.free_space(); res.entries = db.row_cache_tracker().partitions(); - for (const auto& [_, t] : db.get_column_families()) { + for (const auto& [_, t] : db.get_tables_metadata()._column_families) { auto& cache_stats = t->get_row_cache().stats(); res.hits += cache_stats.hits.count(); res.misses += cache_stats.misses.count(); diff --git a/main.cc b/main.cc index 2da58da815..b73c9cfd45 100644 --- a/main.cc +++ b/main.cc @@ -1346,7 +1346,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // Needs to happen before replaying the schema commitlog, which interprets // replay position in the truncation record. // Needs to happen before system_keyspace::setup(), which reads truncation records. - for (auto&& e : db.local().get_column_families()) { + for (auto&& e : db.local().get_tables_metadata()._column_families) { auto table_ptr = e.second; if (table_ptr->schema()->ks_name() == db::schema_tables::NAME) { if (table_ptr->get_truncation_record() != db_clock::time_point::min()) { @@ -1405,7 +1405,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl } db.invoke_on_all([] (replica::database& db) { - for (auto& x : db.get_column_families()) { + for (auto& x : db.get_tables_metadata()._column_families) { replica::table& t = *(x.second); t.enable_auto_compaction(); } @@ -1423,7 +1423,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // streaming db.invoke_on_all([] (replica::database& db) { - for (auto& x : db.get_column_families()) { + for (auto& x : db.get_tables_metadata()._column_families) { replica::column_family& cf = *(x.second); cf.trigger_compaction(); } diff --git a/repair/repair.cc b/repair/repair.cc index 27a5ccb54f..bfad21f924 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -127,7 +127,7 @@ std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo } static size_t get_nr_tables(const replica::database& db, const sstring& keyspace) { - auto& m = db.get_column_families_mapping(); + auto& m = db.get_tables_metadata()._ks_cf_to_uuid; return std::count_if(m.begin(), m.end(), [&keyspace] (auto& e) { return e.first.first == keyspace; }); @@ -135,7 +135,7 @@ static size_t get_nr_tables(const replica::database& db, const sstring& keyspace static std::vector list_column_families(const replica::database& db, const sstring& keyspace) { std::vector ret; - for (auto &&e : db.get_column_families_mapping()) { + for (auto &&e : db.get_tables_metadata()._ks_cf_to_uuid) { if (e.first.first == keyspace) { ret.push_back(e.first.second); } diff --git a/repair/row_level.cc b/repair/row_level.cc index 20fe2f7f8b..7419ba9c84 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -3050,7 +3050,7 @@ future<> repair_service::cleanup_history(tasks::task_id repair_id) { } future<> repair_service::load_history() { - auto tables = get_db().local().get_column_families(); + auto tables = get_db().local().get_tables_metadata()._column_families; for (const auto& x : tables) { auto& table_uuid = x.first; auto& table = x.second; diff --git a/replica/data_dictionary_impl.hh b/replica/data_dictionary_impl.hh index 1ea8767932..de84aa16f5 100644 --- a/replica/data_dictionary_impl.hh +++ b/replica/data_dictionary_impl.hh @@ -66,7 +66,7 @@ public: } virtual std::vector get_tables(data_dictionary::database db) const override { std::vector ret; - auto&& tables = unwrap(db).get_column_families(); + auto&& tables = unwrap(db).get_tables_metadata()._column_families; ret.reserve(tables.size()); for (auto&& [uuid, cf] : tables) { ret.push_back(wrap(*cf)); diff --git a/replica/database.cc b/replica/database.cc index 93dc5cf335..72026d79c2 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -272,7 +272,7 @@ void database::setup_scylla_memory_diagnostics_producer() { for (const auto& [name, op_count_getter] : phased_barriers) { writeln(" {} (top 10):\n", name); auto total = 0; - for (const auto& [count, table_list] : phased_barrier_top_10_counts(_column_families, op_count_getter)) { + for (const auto& [count, table_list] : phased_barrier_top_10_counts(_tables_metadata._column_families, op_count_getter)) { total += count; writeln(" {}", count); if (table_list.empty()) { @@ -863,13 +863,13 @@ database::init_commitlog() { return db::commitlog::create_commitlog(db::commitlog::config::from_db_config(_cfg, _dbcfg.commitlog_scheduling_group, _dbcfg.available_memory)).then([this](db::commitlog&& log) { _commitlog = std::make_unique(std::move(log)); _commitlog->add_flush_handler([this](db::cf_id_type id, db::replay_position pos) { - if (!_column_families.contains(id)) { + if (!_tables_metadata._column_families.contains(id)) { // the CF has been removed. _commitlog->discard_completed_segments(id); return; } // Initiate a background flush. Waited upon in `stop()`. - (void)_column_families[id]->flush(pos); + (void)_tables_metadata._column_families[id]->flush(pos); }).release(); // we have longer life time than CL. Ignore reg anchor }); } @@ -959,13 +959,13 @@ void database::maybe_init_schema_commitlog() { _schema_commitlog = std::make_unique(db::commitlog::create_commitlog(std::move(c)).get0()); _schema_commitlog->add_flush_handler([this] (db::cf_id_type id, db::replay_position pos) { - if (!_column_families.contains(id)) { + if (!_tables_metadata._column_families.contains(id)) { // the CF has been removed. _schema_commitlog->discard_completed_segments(id); return; } // Initiate a background flush. Waited upon in `stop()`. - (void)_column_families[id]->flush(pos); + (void)_tables_metadata._column_families[id]->flush(pos); }).release(); } @@ -1017,18 +1017,18 @@ future<> database::add_column_family(keyspace& ks, schema_ptr schema, column_fam cf->set_durable_writes(ks.metadata()->durable_writes()); auto uuid = schema->id(); - if (_column_families.contains(uuid)) { + if (_tables_metadata._column_families.contains(uuid)) { throw std::invalid_argument("UUID " + uuid.to_sstring() + " already mapped"); } auto kscf = std::make_pair(schema->ks_name(), schema->cf_name()); - if (_ks_cf_to_uuid.contains(kscf)) { + if (_tables_metadata._ks_cf_to_uuid.contains(kscf)) { throw std::invalid_argument("Column family " + schema->cf_name() + " exists"); } ks.add_or_update_column_family(schema); cf->start(); schema->registry_entry()->set_table(cf->weak_from_this()); - _column_families.emplace(uuid, std::move(cf)); - _ks_cf_to_uuid.emplace(std::move(kscf), uuid); + _tables_metadata._column_families.emplace(uuid, std::move(cf)); + _tables_metadata._ks_cf_to_uuid.emplace(std::move(kscf), uuid); if (schema->is_view()) { find_column_family(schema->view_info()->base_id()).add_or_update_view(view_ptr(schema)); } @@ -1065,9 +1065,9 @@ future<> database::remove(table& cf) noexcept { auto s = cf.schema(); auto& ks = find_keyspace(s->ks_name()); cf.deregister_metrics(); - _column_families.erase(s->id()); + _tables_metadata._column_families.erase(s->id()); ks.metadata()->remove_column_family(s); - _ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name())); + _tables_metadata._ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name())); if (s->is_view()) { try { find_column_family(s->view_info()->base_id()).remove_view(view_ptr(s)); @@ -1149,7 +1149,7 @@ future<> database::drop_table_on_all_shards(sharded& sharded_db, sstri const table_id& database::find_uuid(std::string_view ks, std::string_view cf) const { try { - return _ks_cf_to_uuid.at(std::make_pair(ks, cf)); + return _tables_metadata._ks_cf_to_uuid.at(std::make_pair(ks, cf)); } catch (std::out_of_range&) { throw no_such_column_family(ks, cf); } @@ -1245,7 +1245,7 @@ std::unordered_map databa std::vector> database::get_non_system_column_families() const { return boost::copy_range>>( - get_column_families() + get_tables_metadata()._column_families | boost::adaptors::map_values | boost::adaptors::filtered([](const lw_shared_ptr& cf) { return !is_system_keyspace(cf->schema()->ks_name()); @@ -1272,7 +1272,7 @@ const column_family& database::find_column_family(std::string_view ks_name, std: column_family& database::find_column_family(const table_id& uuid) { try { - return *_column_families.at(uuid); + return *_tables_metadata._column_families.at(uuid); } catch (...) { throw no_such_column_family(uuid); } @@ -1280,14 +1280,14 @@ column_family& database::find_column_family(const table_id& uuid) { const column_family& database::find_column_family(const table_id& uuid) const { try { - return *_column_families.at(uuid); + return *_tables_metadata._column_families.at(uuid); } catch (...) { throw no_such_column_family(uuid); } } bool database::column_family_exists(const table_id& uuid) const { - return _column_families.contains(uuid); + return _tables_metadata._column_families.contains(uuid); } future<> @@ -1411,7 +1411,7 @@ schema_ptr database::find_schema(const table_id& uuid) const { } bool database::has_schema(std::string_view ks_name, std::string_view cf_name) const { - return _ks_cf_to_uuid.contains(std::make_pair(ks_name, cf_name)); + return _tables_metadata._ks_cf_to_uuid.contains(std::make_pair(ks_name, cf_name)); } std::vector database::get_views() const { @@ -1456,7 +1456,7 @@ future<> database::create_keyspace_on_all_shards(sharded& sharded_db, future<> database::drop_caches() const { - std::unordered_map> tables = get_column_families(); + std::unordered_map> tables = get_tables_metadata()._column_families; for (auto&& e : tables) { table& t = *e.second; co_await t.get_row_cache().invalidate(row_cache::external_updater([] {})); @@ -1806,7 +1806,7 @@ std::ostream& operator<<(std::ostream& out, const column_family& cf) { std::ostream& operator<<(std::ostream& out, const database& db) { out << "{\n"; - for (auto&& e : db._column_families) { + for (auto&& e : db._tables_metadata._column_families) { auto&& cf = *e.second; out << "(" << e.first.to_sstring() << ", " << cf.schema()->cf_name() << ", " << cf.schema()->ks_name() << "): " << cf << "\n"; } @@ -2314,7 +2314,7 @@ schema_ptr database::find_indexed_table(const sstring& ks_name, const sstring& i future<> database::close_tables(table_kind kind_to_close) { auto b = defer([this] { _stop_barrier.abort(); }); - co_await coroutine::parallel_for_each(_column_families, [this, kind_to_close](auto& val_pair) -> future<> { + co_await coroutine::parallel_for_each(_tables_metadata._column_families, [this, kind_to_close](auto& val_pair) -> future<> { auto& s = val_pair.second->schema(); table_kind k = is_system_table(*s) || _cfg.extensions().is_extension_internal_keyspace(s->ks_name()) ? table_kind::system : table_kind::user; if (k == kind_to_close) { @@ -2403,7 +2403,7 @@ future<> database::stop() { } future<> database::flush_all_memtables() { - return parallel_for_each(_column_families, [] (auto& cfp) { + return parallel_for_each(_tables_metadata._column_families, [] (auto& cfp) { return cfp.second->flush(); }); } @@ -2790,8 +2790,8 @@ future<> database::clear_snapshot(sstring tag, std::vector keyspace_nam // and has no remaining snapshots if (!has_snapshots) { auto [cf_name, cf_uuid] = extract_cf_name_and_uuid(table_ent->name); - const auto& it = _ks_cf_to_uuid.find(std::make_pair(ks_name, cf_name)); - auto dropped = (it == _ks_cf_to_uuid.cend()) || (cf_uuid != it->second); + const auto& it = _tables_metadata._ks_cf_to_uuid.find(std::make_pair(ks_name, cf_name)); + auto dropped = (it == _tables_metadata._ks_cf_to_uuid.cend()) || (cf_uuid != it->second); if (dropped) { dblog.info("Removing dropped table dir {}", table_dir); sstables::remove_table_directory_if_has_no_snapshots(table_dir).get(); @@ -2804,7 +2804,7 @@ future<> database::clear_snapshot(sstring tag, std::vector keyspace_nam } future<> database::flush_non_system_column_families() { - auto non_system_cfs = get_column_families() | boost::adaptors::filtered([this] (auto& uuid_and_cf) { + auto non_system_cfs = get_tables_metadata()._column_families | boost::adaptors::filtered([this] (auto& uuid_and_cf) { auto cf = uuid_and_cf.second; auto& ks = cf->schema()->ks_name(); return !is_system_keyspace(ks) && !_cfg.extensions().is_extension_internal_keyspace(ks); @@ -2826,7 +2826,7 @@ future<> database::flush_non_system_column_families() { } future<> database::flush_system_column_families() { - auto system_cfs = get_column_families() | boost::adaptors::filtered([this] (auto& uuid_and_cf) { + auto system_cfs = get_tables_metadata()._column_families | boost::adaptors::filtered([this] (auto& uuid_and_cf) { auto cf = uuid_and_cf.second; auto& ks = cf->schema()->ks_name(); return is_system_keyspace(ks) || _cfg.extensions().is_extension_internal_keyspace(ks); diff --git a/replica/database.hh b/replica/database.hh index 85bfcd1c82..a30032837c 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1300,6 +1300,15 @@ public: } }; + using ks_cf_t = std::pair; + using ks_cf_to_uuid_t = + flat_hash_map; + class tables_metadata { + rwlock _cf_lock; + public: // FIXME: change member access to private. + std::unordered_map> _column_families; + ks_cf_to_uuid_t _ks_cf_to_uuid; + }; private: replica::cf_stats _cf_stats; static constexpr size_t max_count_concurrent_reads{100}; @@ -1366,10 +1375,7 @@ private: db::per_partition_rate_limit::info> _apply_stage; flat_hash_map _keyspaces; - std::unordered_map> _column_families; - using ks_cf_to_uuid_t = - flat_hash_map, table_id, utils::tuple_hash, string_pair_eq>; - ks_cf_to_uuid_t _ks_cf_to_uuid; + tables_metadata _tables_metadata; std::unique_ptr _commitlog; std::unique_ptr _schema_commitlog; utils::updateable_value_source _version; @@ -1646,23 +1652,18 @@ public: return _keyspaces; } - const std::unordered_map>& get_column_families() const { - return _column_families; + const tables_metadata& get_tables_metadata() const { + return _tables_metadata; } - std::unordered_map>& get_column_families() { - return _column_families; + tables_metadata& get_tables_metadata() { + return _tables_metadata; } std::vector> get_non_system_column_families() const; std::vector get_views() const; - const ks_cf_to_uuid_t& - get_column_families_mapping() const { - return _ks_cf_to_uuid; - } - const db::config& get_config() const { return _cfg; } diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index a7ac85aaee..45332f6f5e 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -472,7 +472,7 @@ future<> distributed_loader::populate_keyspace(distributed& d dblog.info("Populating Keyspace {}", ks_name); auto& ks = i->second; - auto& column_families = db.local().get_column_families(); + auto& column_families = db.local().get_tables_metadata()._column_families; co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data() | boost::adaptors::map_values, [&] (schema_ptr s) -> future<> { auto uuid = s->id(); diff --git a/scylla-gdb.py b/scylla-gdb.py index 69e3aec75b..45c227f286 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -1209,7 +1209,7 @@ def find_dbs(): def for_each_table(db=None): if not db: db = find_db() - cfs = db['_column_families'] + cfs = db['_tables_metadata']['_column_families'] for (key, value) in unordered_map(cfs): yield value['_p'].reinterpret_cast(lookup_type(['replica::table', 'column_family'])[1].pointer()).dereference() # it's a lw_shared_ptr @@ -1511,7 +1511,7 @@ class scylla_tables(gdb.Command): for shard in shards: db = find_db(shard) - cfs = db['_column_families'] + cfs = db['_tables_metadata']['_column_families'] for (key, value) in unordered_map(cfs): value = seastar_lw_shared_ptr(value).get().dereference() schema = schema_ptr(value['_schema']) @@ -1533,7 +1533,7 @@ class scylla_table(gdb.Command): def _find_table(self, ks, cf): db = find_db() - cfs = db['_column_families'] + cfs = db['_tables_metadata']['_column_families'] for (key, value) in unordered_map(cfs): value = seastar_lw_shared_ptr(value).get().dereference() schema = schema_ptr(value['_schema']) @@ -1900,7 +1900,7 @@ class seastar_lw_shared_ptr(): def all_tables(db): """Returns pointers to table objects which exist on current shard""" - for (key, value) in unordered_map(db['_column_families']): + for (key, value) in unordered_map(db['_tables_metadata']['_column_families']): yield seastar_lw_shared_ptr(value).get() diff --git a/service/misc_services.cc b/service/misc_services.cc index a4722413fd..cc72f9e9b0 100644 --- a/service/misc_services.cc +++ b/service/misc_services.cc @@ -78,7 +78,7 @@ void load_broadcaster::start_broadcasting() { llogger.debug("Disseminating load info ..."); _done = _db.map_reduce0([](replica::database& db) { int64_t res = 0; - for (auto i : db.get_column_families()) { + for (auto i : db.get_tables_metadata()._column_families) { res += i.second->get_stats().live_disk_space_used; } return res; @@ -137,7 +137,7 @@ future cache_hitrate_calculator::recalculate_hitrates() }; auto cf_to_cache_hit_stats = [non_system_filter] (replica::database& db) { - return boost::copy_range>(db.get_column_families() | boost::adaptors::filtered(non_system_filter) | + return boost::copy_range>(db.get_tables_metadata()._column_families | boost::adaptors::filtered(non_system_filter) | boost::adaptors::transformed([] (const std::pair>& cf) { auto& stats = cf.second->get_row_cache().stats(); return std::make_pair(cf.first, stat{float(stats.reads_with_no_misses.rate().rates[0]), float(stats.reads_with_misses.rate().rates[0])}); @@ -159,8 +159,8 @@ future cache_hitrate_calculator::recalculate_hitrates() // set calculated rates on all shards return _db.invoke_on_all([this, cpuid = this_shard_id()] (replica::database& db) { return do_for_each(_rates, [this, cpuid, &db] (auto&& r) mutable { - auto it = db.get_column_families().find(r.first); - if (it == db.get_column_families().end()) { // a table may be added before map/reduce completes and this code runs + auto it = db.get_tables_metadata()._column_families.find(r.first); + if (it == db.get_tables_metadata()._column_families.end()) { // a table may be added before map/reduce completes and this code runs return; } auto& cf = *it; diff --git a/service/storage_service.cc b/service/storage_service.cc index 67aa6cc433..e249e52d22 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3093,7 +3093,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt co_await container().invoke_on_all([&] (storage_service& ss) { auto& db = ss._db.local(); auto tmptr = pending_token_metadata_ptr[this_shard_id()]; - for (auto&& [id, cf] : db.get_column_families()) { // Safe because we iterate without preemption + for (auto&& [id, cf] : db.get_tables_metadata()._column_families) { // Safe because we iterate without preemption auto rs = db.find_keyspace(cf->schema()->keypace_name()).get_replication_strategy_ptr(); locator::effective_replication_map_ptr erm; if (auto pt_rs = rs->maybe_as_per_table()) { diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 6fc704ebd0..4b27f40e13 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -462,7 +462,7 @@ std::vector stream_session::get_column_family_stores(co std::vector stores; auto& db = manager().db(); if (column_families.empty()) { - for (auto& x : db.get_column_families()) { + for (auto& x : db.get_tables_metadata()._column_families) { replica::column_family& cf = *(x.second); auto cf_name = cf.schema()->cf_name(); auto ks_name = cf.schema()->ks_name(); diff --git a/test/boost/cql_query_large_test.cc b/test/boost/cql_query_large_test.cc index ca8764e57f..36ebf52a81 100644 --- a/test/boost/cql_query_large_test.cc +++ b/test/boost/cql_query_large_test.cc @@ -116,7 +116,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) { // and the old sstable is deleted. flush(e); e.db().invoke_on_all([] (replica::database& dbi) { - return parallel_for_each(dbi.get_column_families(), [&dbi] (auto& table) { + return parallel_for_each(dbi.get_tables_metadata()._column_families, [&dbi] (auto& table) { return dbi.get_compaction_manager().perform_major_compaction((table.second)->as_table_state()); }); }).get(); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 4ae2cb2fa1..f0ae855cce 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -860,7 +860,7 @@ public: replica::distributed_loader::init_non_system_keyspaces(db, proxy, sys_ks).get(); db.invoke_on_all([] (replica::database& db) { - for (auto& x : db.get_column_families()) { + for (auto& x : db.get_tables_metadata()._column_families) { replica::table& t = *(x.second); t.enable_auto_compaction(); } From 8842bd87c3cf5af5987c71c8a68ca4c7ef63381b Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Tue, 18 Jul 2023 13:24:57 +0200 Subject: [PATCH 03/10] replica: add methods to safely add and remove table --- replica/database.cc | 32 ++++++++++++++++++++++++++------ replica/database.hh | 3 +++ 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index 72026d79c2..787ebfe126 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1027,12 +1027,10 @@ future<> database::add_column_family(keyspace& ks, schema_ptr schema, column_fam ks.add_or_update_column_family(schema); cf->start(); schema->registry_entry()->set_table(cf->weak_from_this()); - _tables_metadata._column_families.emplace(uuid, std::move(cf)); - _tables_metadata._ks_cf_to_uuid.emplace(std::move(kscf), uuid); + co_await _tables_metadata.add_table(schema); if (schema->is_view()) { find_column_family(schema->view_info()->base_id()).add_or_update_view(view_ptr(schema)); } - return make_ready_future(); } future<> database::add_column_family_and_make_directory(schema_ptr schema) { @@ -1065,9 +1063,8 @@ future<> database::remove(table& cf) noexcept { auto s = cf.schema(); auto& ks = find_keyspace(s->ks_name()); cf.deregister_metrics(); - _tables_metadata._column_families.erase(s->id()); + co_await _tables_metadata.remove_table(s); ks.metadata()->remove_column_family(s); - _tables_metadata._ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name())); if (s->is_view()) { try { find_column_family(s->view_info()->base_id()).remove_view(view_ptr(s)); @@ -1075,7 +1072,6 @@ future<> database::remove(table& cf) noexcept { // Drop view mutations received after base table drop. } } - return make_ready_future(); } future<> database::detach_column_family(table& cf) { @@ -2859,6 +2855,30 @@ future<> database::drain() { b.cancel(); } +future<> database::tables_metadata::add_table(schema_ptr schema) { + auto holder = co_await _cf_lock.hold_write_lock(); + auto id = schema->id(); + auto kscf = std::make_pair(schema->ks_name(), schema->cf_name()); + try { + _column_families.emplace(id, schema->table().shared_from_this()); + _ks_cf_to_uuid.emplace(kscf, id); + } catch (...) { + _ks_cf_to_uuid.erase(std::move(kscf)); + _column_families.erase(id); + throw; + } +} + +future<> database::tables_metadata::remove_table(schema_ptr schema) noexcept { + try { + auto holder = co_await _cf_lock.hold_write_lock(); + _column_families.erase(schema->id()); + _ks_cf_to_uuid.erase(std::make_pair(schema->ks_name(), schema->cf_name())); + } catch (...) { + on_fatal_internal_error(dblog, format("tables_metadata::remove_cf: {}", std::current_exception())); + } +} + data_dictionary::database database::as_data_dictionary() const { static constinit data_dictionary_impl _impl; diff --git a/replica/database.hh b/replica/database.hh index a30032837c..0cf1a357ac 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1308,6 +1308,9 @@ public: public: // FIXME: change member access to private. std::unordered_map> _column_families; ks_cf_to_uuid_t _ks_cf_to_uuid; + + future<> add_table(schema_ptr schema); + future<> remove_table(schema_ptr schema) noexcept; }; private: replica::cf_stats _cf_stats; From a21d3357c36fd321872367d43ec6b8fc85235fbe Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 19 Jul 2023 14:02:14 +0200 Subject: [PATCH 04/10] replica: pass tables_metadata to phased_barrier_top_10_counts --- replica/database.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index 787ebfe126..6da0b7bc13 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -144,7 +144,7 @@ public: }; const boost::container::static_vector>, 10> -phased_barrier_top_10_counts(const std::unordered_map>& tables, std::function op_count_getter) { +phased_barrier_top_10_counts(const database::tables_metadata& tables_metadata, std::function op_count_getter) { using table_list = boost::container::static_vector; using count_and_tables = std::pair; const auto less = [] (const count_and_tables& a, const count_and_tables& b) { @@ -154,7 +154,7 @@ phased_barrier_top_10_counts(const std::unordered_map res; count_and_tables* min_element = nullptr; - for (const auto& [tid, table] : tables) { + for (const auto& [tid, table] : tables_metadata._column_families) { const auto count = op_count_getter(*table); if (!count) { continue; @@ -272,7 +272,7 @@ void database::setup_scylla_memory_diagnostics_producer() { for (const auto& [name, op_count_getter] : phased_barriers) { writeln(" {} (top 10):\n", name); auto total = 0; - for (const auto& [count, table_list] : phased_barrier_top_10_counts(_tables_metadata._column_families, op_count_getter)) { + for (const auto& [count, table_list] : phased_barrier_top_10_counts(_tables_metadata, op_count_getter)) { total += count; writeln(" {}", count); if (table_list.empty()) { From cdbfa0b2f5eccbec129823842213753313fa6193 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Tue, 18 Jul 2023 18:12:42 +0200 Subject: [PATCH 05/10] replica: iterate safely over tables related maps Loops over _column_families and _ks_cf_to_uuid which may preempt are protected by reader mode of rwlock so that iterators won't get invalid. --- api/column_family.cc | 28 +++++++------- api/column_family.hh | 7 ++-- api/compaction_manager.cc | 4 +- api/storage_service.cc | 11 +++--- cdc/generation.cc | 10 ++--- db/commitlog/commitlog_replayer.cc | 9 ++--- db/view/view_update_generator.cc | 6 +-- db/virtual_tables.cc | 14 +++---- main.cc | 17 ++++---- repair/repair.cc | 15 ++++---- repair/row_level.cc | 10 ++--- replica/data_dictionary_impl.hh | 10 ++--- replica/database.cc | 62 ++++++++++++++++++++++-------- replica/database.hh | 6 +++ service/misc_services.cc | 6 +-- service/storage_service.cc | 8 ++-- streaming/stream_session.cc | 6 +-- test/boost/cql_query_large_test.cc | 4 +- test/lib/cql_test_env.cc | 6 +-- 19 files changed, 135 insertions(+), 104 deletions(-) diff --git a/api/column_family.cc b/api/column_family.cc index 030ab20fe6..22b5e36fad 100644 --- a/api/column_family.cc +++ b/api/column_family.cc @@ -135,9 +135,9 @@ static future get_cf_histogram(http_context& ctx, const static future get_cf_histogram(http_context& ctx, utils::timed_rate_moving_average_summary_and_histogram replica::column_family_stats::*f) { std::function fun = [f] (const replica::database& db) { utils::ihistogram res; - for (auto i : db.get_tables_metadata()._column_families) { - res += (i.second->get_stats().*f).hist; - } + db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) mutable { + res += (table->get_stats().*f).hist; + }); return res; }; return ctx.db.map(fun).then([](const std::vector &res) { @@ -162,9 +162,9 @@ static future get_cf_rate_and_histogram(http_context& c static future get_cf_rate_and_histogram(http_context& ctx, utils::timed_rate_moving_average_summary_and_histogram replica::column_family_stats::*f) { std::function fun = [f] (const replica::database& db) { utils::rate_moving_average_and_histogram res; - for (auto i : db.get_tables_metadata()._column_families) { - res += (i.second->get_stats().*f).rate(); - } + db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) { + res += (table->get_stats().*f).rate(); + }); return res; }; return ctx.db.map(fun).then([](const std::vector &res) { @@ -306,21 +306,21 @@ ratio_holder filter_recent_false_positive_as_ratio_holder(const sstables::shared void set_column_family(http_context& ctx, routes& r, sharded& sys_ks) { cf::get_column_family_name.set(r, [&ctx] (const_req req){ std::vector res; - for (auto i: ctx.db.local().get_tables_metadata()._ks_cf_to_uuid) { - res.push_back(i.first.first + ":" + i.first.second); - } + ctx.db.local().get_tables_metadata().for_each_table_id([&] (const std::pair& kscf, table_id) { + res.push_back(kscf.first + ":" + kscf.second); + }); return res; }); cf::get_column_family.set(r, [&ctx] (std::unique_ptr req){ - std::list res; - for (auto i: ctx.db.local().get_tables_metadata()._ks_cf_to_uuid) { + std::list res; + ctx.db.local().get_tables_metadata().for_each_table_id([&] (const std::pair& kscf, table_id) { cf::column_family_info info; - info.ks = i.first.first; - info.cf = i.first.second; + info.ks = kscf.first; + info.cf = kscf.second; info.type = "ColumnFamilies"; res.push_back(info); - } + }); return make_ready_future(json::stream_range_as_array(std::move(res), std::identity())); }); diff --git a/api/column_family.hh b/api/column_family.hh index 0418c97fac..a90229e545 100644 --- a/api/column_family.hh +++ b/api/column_family.hh @@ -68,9 +68,10 @@ struct map_reduce_column_families_locally { std::function(std::unique_ptr, std::unique_ptr)> reducer; future> operator()(replica::database& db) const { auto res = seastar::make_lw_shared>(std::make_unique(init)); - return do_for_each(db.get_tables_metadata()._column_families, [res, this](const std::pair>& i) { - *res = reducer(std::move(*res), mapper(*i.second.get())); - }).then([res] { + return db.get_tables_metadata().for_each_table_gently([res, this] (table_id, seastar::lw_shared_ptr table) { + *res = reducer(std::move(*res), mapper(*table.get())); + return make_ready_future(); + }).then([res] () { return std::move(*res); }); } diff --git a/api/compaction_manager.cc b/api/compaction_manager.cc index 461b396f19..0f2d0ec184 100644 --- a/api/compaction_manager.cc +++ b/api/compaction_manager.cc @@ -68,8 +68,8 @@ void set_compaction_manager(http_context& ctx, routes& r) { cm::get_pending_tasks_by_table.set(r, [&ctx] (std::unique_ptr req) { return ctx.db.map_reduce0([](replica::database& db) { return do_with(std::unordered_map, uint64_t, utils::tuple_hash>(), [&db](std::unordered_map, uint64_t, utils::tuple_hash>& tasks) { - return do_for_each(db.get_tables_metadata()._column_families, [&tasks](const std::pair>& i) -> future<> { - replica::table& cf = *i.second.get(); + return db.get_tables_metadata().for_each_table_gently([&tasks] (table_id, lw_shared_ptr table) { + replica::table& cf = *table.get(); tasks[std::make_pair(cf.schema()->ks_name(), cf.schema()->cf_name())] = cf.estimate_pending_compactions(); return make_ready_future<>(); }).then([&tasks] { diff --git a/api/storage_service.cc b/api/storage_service.cc index 2ef549646f..e0fec01544 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -980,10 +980,9 @@ void set_storage_service(http_context& ctx, routes& r, shardedset_incremental_backups(value); - } + db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) { + table->set_incremental_backups(value); + }); }).then([] { return make_ready_future(json_void()); }); @@ -1258,7 +1257,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded t) { auto& schema = t->schema(); if ((ks.empty() || ks == schema->ks_name()) && (cf.empty() || cf == schema->cf_name())) { // at most Nsstables long @@ -1339,7 +1338,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded generation_service::maybe_rewrite_streams_descriptions() { // For each CDC log table get the TTL setting (from CDC options) and the table's creation time std::vector times_and_ttls; - for (auto& [_, cf] : _db.get_tables_metadata()._column_families) { - auto& s = *cf->schema(); + _db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr t) { + auto& s = *t->schema(); auto base = cdc::get_base_table(_db, s.ks_name(), s.cf_name()); if (!base) { // Not a CDC log table. - continue; + return; } auto& cdc_opts = base->cdc_options(); if (!cdc_opts.enabled()) { // This table is named like a CDC log table but it's not one. - continue; + return; } times_and_ttls.push_back(time_and_ttl{as_timepoint(s.id().uuid()), cdc_opts.ttl()}); - } + }); if (times_and_ttls.empty()) { // There's no point in rewriting old generations' streams (they don't contain any data). diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index d582733614..35f97c856e 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -126,8 +126,7 @@ future<> db::commitlog_replayer::impl::init() { } }, [this](replica::database& db) { return do_with(shard_rpm_map{}, [this, &db](shard_rpm_map& map) { - return parallel_for_each(db.get_tables_metadata()._column_families, [this, &map](auto& cfp) { - auto uuid = cfp.first; + return db.get_tables_metadata().parallel_for_each_table([this, &map] (table_id uuid, lw_shared_ptr) { // We do this on each cpu, for each CF, which technically is a little wasteful, but the values are // cached, this is only startup, and it makes the code easier. // Get all truncation records for the CF and initialize max rps if @@ -156,13 +155,13 @@ future<> db::commitlog_replayer::impl::init() { // existing sstables-per-shard. // So, go through all CF:s and check, if a shard mapping does not // have data for it, assume we must set global pos to zero. - for (auto&p : _db.local().get_tables_metadata()._column_families) { + _db.local().get_tables_metadata().for_each_table([&] (table_id id, lw_shared_ptr) { for (auto&p1 : _rpm) { // for each shard - if (!p1.second.contains(p.first)) { + if (!p1.second.contains(id)) { _min_pos[p1.first] = replay_position(); } } - } + }); for (auto&p : _min_pos) { rlogger.debug("minimum position for shard {}: {}", p.first, p.second); } diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index d062aff210..ec50f31583 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -265,8 +265,8 @@ void view_update_generator::setup_metrics() { } void view_update_generator::discover_staging_sstables() { - for (auto& x : _db.get_tables_metadata()._column_families) { - auto t = x.second->shared_from_this(); + _db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) { + auto t = table->shared_from_this(); for (auto sstables = t->get_sstables(); sstables::shared_sstable sst : *sstables) { if (sst->requires_view_building()) { _progress_tracker->on_sstable_registration(sst); @@ -276,7 +276,7 @@ void view_update_generator::discover_staging_sstables() { _registration_sem.consume(1); } } - } + }); } } diff --git a/db/virtual_tables.cc b/db/virtual_tables.cc index 44a1aea1e1..90241431d7 100644 --- a/db/virtual_tables.cc +++ b/db/virtual_tables.cc @@ -283,13 +283,13 @@ public: const auto snapshots_by_tables = co_await _db.map_reduce(snapshot_reducer(), [ks_name_ = ks_data.name] (replica::database& db) mutable -> future { auto ks_name = std::move(ks_name_); snapshots_by_tables_map snapshots_by_tables; - for (auto& [_, table] : db.get_tables_metadata()._column_families) { + co_await db.get_tables_metadata().for_each_table_gently(coroutine::lambda([&] (table_id, lw_shared_ptr table) -> future<> { if (table->schema()->ks_name() != ks_name) { - continue; + co_return; } const auto unordered_snapshots = co_await table->get_snapshot_details(); snapshots_by_tables.emplace(table->schema()->cf_name(), std::map(unordered_snapshots.begin(), unordered_snapshots.end())); - } + })); co_return snapshots_by_tables; }); @@ -433,9 +433,9 @@ private: }; co_return co_await _db.map_reduce(shard_reducer(reduce), [map, reduce] (replica::database& db) { T val = {}; - for (auto& [_, table] : db.get_tables_metadata()._column_families) { + db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) { val = reduce(val, map(*table)); - } + }); return val; }); } @@ -560,13 +560,13 @@ public: res.total = occupancy.total_space(); res.free = occupancy.free_space(); res.entries = db.row_cache_tracker().partitions(); - for (const auto& [_, t] : db.get_tables_metadata()._column_families) { + db.get_tables_metadata().for_each_table([&] (table_id id, lw_shared_ptr t) { auto& cache_stats = t->get_row_cache().stats(); res.hits += cache_stats.hits.count(); res.misses += cache_stats.misses.count(); res.hits_moving_average += cache_stats.hits.rate(); res.requests_moving_average += (cache_stats.hits.rate() + cache_stats.misses.rate()); - } + }); return res; }, stats{}, stats::reduce).then([] (stats s) { return std::vector>{ diff --git a/main.cc b/main.cc index b73c9cfd45..8c3c68e7d5 100644 --- a/main.cc +++ b/main.cc @@ -1346,8 +1346,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // Needs to happen before replaying the schema commitlog, which interprets // replay position in the truncation record. // Needs to happen before system_keyspace::setup(), which reads truncation records. - for (auto&& e : db.local().get_tables_metadata()._column_families) { - auto table_ptr = e.second; + db.local().get_tables_metadata().for_each_table([] (table_id, lw_shared_ptr table_ptr) { if (table_ptr->schema()->ks_name() == db::schema_tables::NAME) { if (table_ptr->get_truncation_record() != db_clock::time_point::min()) { // replay_position stored in the truncation record may belong to @@ -1360,7 +1359,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl table_ptr->schema()->ks_name(), table_ptr->schema()->cf_name())); } } - } + }); auto sch_cl = db.local().schema_commitlog(); if (sch_cl != nullptr) { @@ -1405,10 +1404,10 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl } db.invoke_on_all([] (replica::database& db) { - for (auto& x : db.get_tables_metadata()._column_families) { - replica::table& t = *(x.second); + db.get_tables_metadata().for_each_table([] (table_id, lw_shared_ptr table) { + replica::table& t = *table; t.enable_auto_compaction(); - } + }); }).get(); // If the same sstable is shared by several shards, it cannot be @@ -1423,10 +1422,10 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // streaming db.invoke_on_all([] (replica::database& db) { - for (auto& x : db.get_tables_metadata()._column_families) { - replica::column_family& cf = *(x.second); + db.get_tables_metadata().for_each_table([] (table_id, lw_shared_ptr table) { + replica::column_family& cf = *table; cf.trigger_compaction(); - } + }); }).get(); api::set_server_gossip(ctx, gossiper).get(); api::set_server_snitch(ctx, snitch).get(); diff --git a/repair/repair.cc b/repair/repair.cc index bfad21f924..79af1dbb9a 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -127,19 +127,20 @@ std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo } static size_t get_nr_tables(const replica::database& db, const sstring& keyspace) { - auto& m = db.get_tables_metadata()._ks_cf_to_uuid; - return std::count_if(m.begin(), m.end(), [&keyspace] (auto& e) { - return e.first.first == keyspace; + size_t tables = 0; + db.get_tables_metadata().for_each_table_id([&keyspace, &tables] (const std::pair& kscf, table_id) { + tables += kscf.first == keyspace; }); + return tables; } static std::vector list_column_families(const replica::database& db, const sstring& keyspace) { std::vector ret; - for (auto &&e : db.get_tables_metadata()._ks_cf_to_uuid) { - if (e.first.first == keyspace) { - ret.push_back(e.first.second); + db.get_tables_metadata().for_each_table_id([&] (const std::pair& kscf, table_id) { + if (kscf.first == keyspace) { + ret.push_back(kscf.second); } - } + }); return ret; } diff --git a/repair/row_level.cc b/repair/row_level.cc index 7419ba9c84..a9afa3fc6b 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -3050,13 +3050,10 @@ future<> repair_service::cleanup_history(tasks::task_id repair_id) { } future<> repair_service::load_history() { - auto tables = get_db().local().get_tables_metadata()._column_families; - for (const auto& x : tables) { - auto& table_uuid = x.first; - auto& table = x.second; + co_await get_db().local().get_tables_metadata().for_each_table_gently(coroutine::lambda([&] (table_id table_uuid, lw_shared_ptr table) -> future<> { auto shard = unsigned(table_uuid.uuid().get_most_significant_bits()) % smp::count; if (shard != this_shard_id()) { - continue; + co_return; } rlogger.info("Loading repair history for keyspace={}, table={}, table_uuid={}", table->schema()->ks_name(), table->schema()->cf_name(), table_uuid); @@ -3077,8 +3074,7 @@ future<> repair_service::load_history() { entry.ks, entry.cf, range, repair_time); } }); - } - co_return; + })); } repair_meta_ptr repair_service::get_repair_meta(gms::inet_address from, uint32_t repair_meta_id) { diff --git a/replica/data_dictionary_impl.hh b/replica/data_dictionary_impl.hh index de84aa16f5..58ce6e72ad 100644 --- a/replica/data_dictionary_impl.hh +++ b/replica/data_dictionary_impl.hh @@ -66,11 +66,11 @@ public: } virtual std::vector get_tables(data_dictionary::database db) const override { std::vector ret; - auto&& tables = unwrap(db).get_tables_metadata()._column_families; - ret.reserve(tables.size()); - for (auto&& [uuid, cf] : tables) { - ret.push_back(wrap(*cf)); - } + auto& tmd = unwrap(db).get_tables_metadata(); + ret.reserve(tmd.size()); + tmd.for_each_table([&] (table_id, const lw_shared_ptr table) { + ret.push_back(wrap(*table)); + }); return ret; } virtual std::optional try_find_table(data_dictionary::database db, std::string_view ks, std::string_view table) const override { diff --git a/replica/database.cc b/replica/database.cc index 6da0b7bc13..80cad24dc8 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -154,20 +154,20 @@ phased_barrier_top_10_counts(const database::tables_metadata& tables_metadata, s boost::container::static_vector res; count_and_tables* min_element = nullptr; - for (const auto& [tid, table] : tables_metadata._column_families) { + tables_metadata.for_each_table([&] (table_id tid, lw_shared_ptr
table) { const auto count = op_count_getter(*table); if (!count) { - continue; + return; } if (res.size() < res.capacity()) { auto& elem = res.emplace_back(count, table_list({table.get()})); if (!min_element || min_element->first > count) { min_element = &elem; } - continue; + return; } if (min_element->first > count) { - continue; + return; } auto it = boost::find_if(res, [count] (const count_and_tables& x) { @@ -175,13 +175,13 @@ phased_barrier_top_10_counts(const database::tables_metadata& tables_metadata, s }); if (it != res.end()) { it->second.push_back(table.get()); - continue; + return; } // If we are here, min_element->first < count *min_element = {count, table_list({table.get()})}; min_element = &*boost::min_element(res, less); - } + }); boost::sort(res, less); @@ -1802,10 +1802,10 @@ std::ostream& operator<<(std::ostream& out, const column_family& cf) { std::ostream& operator<<(std::ostream& out, const database& db) { out << "{\n"; - for (auto&& e : db._tables_metadata._column_families) { - auto&& cf = *e.second; - out << "(" << e.first.to_sstring() << ", " << cf.schema()->cf_name() << ", " << cf.schema()->ks_name() << "): " << cf << "\n"; - } + db._tables_metadata.for_each_table([&] (table_id id, const lw_shared_ptr
tp) { + auto&& cf = *tp; + out << "(" << id.to_sstring() << ", " << cf.schema()->cf_name() << ", " << cf.schema()->ks_name() << "): " << cf << "\n"; + }); out << "}"; return out; } @@ -2310,13 +2310,13 @@ schema_ptr database::find_indexed_table(const sstring& ks_name, const sstring& i future<> database::close_tables(table_kind kind_to_close) { auto b = defer([this] { _stop_barrier.abort(); }); - co_await coroutine::parallel_for_each(_tables_metadata._column_families, [this, kind_to_close](auto& val_pair) -> future<> { - auto& s = val_pair.second->schema(); + co_await _tables_metadata.parallel_for_each_table(coroutine::lambda([this, kind_to_close] (table_id, lw_shared_ptr
table) -> future<> { + auto& s = table->schema(); table_kind k = is_system_table(*s) || _cfg.extensions().is_extension_internal_keyspace(s->ks_name()) ? table_kind::system : table_kind::user; if (k == kind_to_close) { - co_await val_pair.second->stop(); + co_await table->stop(); } - }); + })); co_await _stop_barrier.arrive_and_wait(); b.cancel(); } @@ -2399,8 +2399,8 @@ future<> database::stop() { } future<> database::flush_all_memtables() { - return parallel_for_each(_tables_metadata._column_families, [] (auto& cfp) { - return cfp.second->flush(); + return _tables_metadata.parallel_for_each_table([] (table_id, lw_shared_ptr
table) { + return table->flush(); }); } @@ -2855,6 +2855,10 @@ future<> database::drain() { b.cancel(); } +size_t database::tables_metadata::size() const noexcept { + return _column_families.size(); +} + future<> database::tables_metadata::add_table(schema_ptr schema) { auto holder = co_await _cf_lock.hold_write_lock(); auto id = schema->id(); @@ -2879,6 +2883,32 @@ future<> database::tables_metadata::remove_table(schema_ptr schema) noexcept { } } +void database::tables_metadata::for_each_table(std::function)> f) const { + for (auto& [id, table]: _column_families) { + f(id, table); + } +} + +void database::tables_metadata::for_each_table_id(std::function f) const { + for (auto& [kscf, id]: _ks_cf_to_uuid) { + f(kscf, id); + } +} + +future<> database::tables_metadata::for_each_table_gently(std::function(table_id, lw_shared_ptr
)> f) { + auto holder = co_await _cf_lock.hold_read_lock(); + for (auto& [id, table]: _column_families) { + co_await f(id, table); + } +} + +future<> database::tables_metadata::parallel_for_each_table(std::function(table_id, lw_shared_ptr
)> f) { + auto holder = co_await _cf_lock.hold_read_lock(); + co_await coroutine::parallel_for_each(_column_families, [f = std::move(f)] (auto& table) { + return f(table.first, table.second); + }); +} + data_dictionary::database database::as_data_dictionary() const { static constinit data_dictionary_impl _impl; diff --git a/replica/database.hh b/replica/database.hh index 0cf1a357ac..554d6b57cc 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1309,8 +1309,14 @@ public: std::unordered_map> _column_families; ks_cf_to_uuid_t _ks_cf_to_uuid; + size_t size() const noexcept; + future<> add_table(schema_ptr schema); future<> remove_table(schema_ptr schema) noexcept; + void for_each_table(std::function)> f) const; + void for_each_table_id(std::function f) const; + future<> for_each_table_gently(std::function(table_id, lw_shared_ptr
)> f); + future<> parallel_for_each_table(std::function(table_id, lw_shared_ptr
)> f); }; private: replica::cf_stats _cf_stats; diff --git a/service/misc_services.cc b/service/misc_services.cc index cc72f9e9b0..ed9497bb80 100644 --- a/service/misc_services.cc +++ b/service/misc_services.cc @@ -78,9 +78,9 @@ void load_broadcaster::start_broadcasting() { llogger.debug("Disseminating load info ..."); _done = _db.map_reduce0([](replica::database& db) { int64_t res = 0; - for (auto i : db.get_tables_metadata()._column_families) { - res += i.second->get_stats().live_disk_space_used; - } + db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) { + res += table->get_stats().live_disk_space_used; + }); return res; }, int64_t(0), std::plus()).then([this] (int64_t size) { return _gossiper.add_local_application_state(gms::application_state::LOAD, diff --git a/service/storage_service.cc b/service/storage_service.cc index e249e52d22..4100a1e8d1 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3093,16 +3093,16 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt co_await container().invoke_on_all([&] (storage_service& ss) { auto& db = ss._db.local(); auto tmptr = pending_token_metadata_ptr[this_shard_id()]; - for (auto&& [id, cf] : db.get_tables_metadata()._column_families) { // Safe because we iterate without preemption - auto rs = db.find_keyspace(cf->schema()->keypace_name()).get_replication_strategy_ptr(); + db.get_tables_metadata().for_each_table([&] (table_id id, lw_shared_ptr table) { + auto rs = db.find_keyspace(table->schema()->keypace_name()).get_replication_strategy_ptr(); locator::effective_replication_map_ptr erm; if (auto pt_rs = rs->maybe_as_per_table()) { erm = pt_rs->make_replication_map(id, tmptr); } else { - erm = pending_effective_replication_maps[this_shard_id()][cf->schema()->keypace_name()]; + erm = pending_effective_replication_maps[this_shard_id()][table->schema()->keypace_name()]; } pending_table_erms[this_shard_id()].emplace(id, std::move(erm)); - } + }); }); } catch (...) { ex = std::current_exception(); diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 4b27f40e13..e5764353f9 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -462,15 +462,15 @@ std::vector stream_session::get_column_family_stores(co std::vector stores; auto& db = manager().db(); if (column_families.empty()) { - for (auto& x : db.get_tables_metadata()._column_families) { - replica::column_family& cf = *(x.second); + db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr tp) { + replica::column_family& cf = *tp; auto cf_name = cf.schema()->cf_name(); auto ks_name = cf.schema()->ks_name(); if (ks_name == keyspace) { sslog.debug("Find ks={} cf={}", ks_name, cf_name); stores.push_back(&cf); } - } + }); } else { // TODO: We can move this to database class and use shared_ptr instead for (auto& cf_name : column_families) { diff --git a/test/boost/cql_query_large_test.cc b/test/boost/cql_query_large_test.cc index 36ebf52a81..c08f2fa8b9 100644 --- a/test/boost/cql_query_large_test.cc +++ b/test/boost/cql_query_large_test.cc @@ -116,8 +116,8 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) { // and the old sstable is deleted. flush(e); e.db().invoke_on_all([] (replica::database& dbi) { - return parallel_for_each(dbi.get_tables_metadata()._column_families, [&dbi] (auto& table) { - return dbi.get_compaction_manager().perform_major_compaction((table.second)->as_table_state()); + return dbi.get_tables_metadata().parallel_for_each_table([&dbi] (table_id, lw_shared_ptr t) { + return dbi.get_compaction_manager().perform_major_compaction(t->as_table_state()); }); }).get(); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index f0ae855cce..2c69de282a 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -860,10 +860,10 @@ public: replica::distributed_loader::init_non_system_keyspaces(db, proxy, sys_ks).get(); db.invoke_on_all([] (replica::database& db) { - for (auto& x : db.get_tables_metadata()._column_families) { - replica::table& t = *(x.second); + db.get_tables_metadata().for_each_table([] (table_id, lw_shared_ptr table) { + replica::table& t = *table; t.enable_auto_compaction(); - } + }); }).get(); if (raft_gr.local().is_enabled()) { From e072a2341dcfcb9b661900fbc447e7dd14be577b Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Thu, 20 Jul 2023 12:26:44 +0200 Subject: [PATCH 06/10] replica: api: return table_id instead of const table_id& Return table_id instead of const table_id& from database::find_uuid as copying table_id does not cause much overhead and simplifies methods signature. --- api/column_family.cc | 4 ++-- api/column_family.hh | 2 +- replica/database.cc | 4 ++-- replica/database.hh | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/api/column_family.cc b/api/column_family.cc index 22b5e36fad..d423f06cd1 100644 --- a/api/column_family.cc +++ b/api/column_family.cc @@ -43,7 +43,7 @@ std::tuple parse_fully_qualified_cf_name(sstring name) { return std::make_tuple(name.substr(0, pos), name.substr(end)); } -const table_id& get_uuid(const sstring& ks, const sstring& cf, const replica::database& db) { +table_id get_uuid(const sstring& ks, const sstring& cf, const replica::database& db) { try { return db.find_uuid(ks, cf); } catch (replica::no_such_column_family& e) { @@ -51,7 +51,7 @@ const table_id& get_uuid(const sstring& ks, const sstring& cf, const replica::da } } -const table_id& get_uuid(const sstring& name, const replica::database& db) { +table_id get_uuid(const sstring& name, const replica::database& db) { auto [ks, cf] = parse_fully_qualified_cf_name(name); return get_uuid(ks, cf, db); } diff --git a/api/column_family.hh b/api/column_family.hh index a90229e545..ea16a17672 100644 --- a/api/column_family.hh +++ b/api/column_family.hh @@ -23,7 +23,7 @@ namespace api { void set_column_family(http_context& ctx, httpd::routes& r, sharded& sys_ks); void unset_column_family(http_context& ctx, httpd::routes& r); -const table_id& get_uuid(const sstring& name, const replica::database& db); +table_id get_uuid(const sstring& name, const replica::database& db); future<> foreach_column_family(http_context& ctx, const sstring& name, std::function f); diff --git a/replica/database.cc b/replica/database.cc index 80cad24dc8..89173cf979 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1143,7 +1143,7 @@ future<> database::drop_table_on_all_shards(sharded& sharded_db, sstri co_await table_shards->destroy_storage(); } -const table_id& database::find_uuid(std::string_view ks, std::string_view cf) const { +table_id database::find_uuid(std::string_view ks, std::string_view cf) const { try { return _tables_metadata._ks_cf_to_uuid.at(std::make_pair(ks, cf)); } catch (std::out_of_range&) { @@ -1151,7 +1151,7 @@ const table_id& database::find_uuid(std::string_view ks, std::string_view cf) co } } -const table_id& database::find_uuid(const schema_ptr& schema) const { +table_id database::find_uuid(const schema_ptr& schema) const { return find_uuid(schema->ks_name(), schema->cf_name()); } diff --git a/replica/database.hh b/replica/database.hh index 554d6b57cc..ba18234017 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1551,8 +1551,8 @@ public: future<> add_column_family_and_make_directory(schema_ptr schema); /* throws no_such_column_family if missing */ - const table_id& find_uuid(std::string_view ks, std::string_view cf) const; - const table_id& find_uuid(const schema_ptr&) const; + table_id find_uuid(std::string_view ks, std::string_view cf) const; + table_id find_uuid(const schema_ptr&) const; /** * Creates a keyspace for a given metadata if it still doesn't exist. From 6796721c3d487723e48272379f60a5a29dcd3734 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 19 Jul 2023 11:38:31 +0200 Subject: [PATCH 07/10] replica: add methods to get table or table id --- replica/database.cc | 36 ++++++++++++++++++++++++++++------- replica/database.hh | 4 ++++ replica/distributed_loader.cc | 4 ++-- service/misc_services.cc | 12 ++++++------ 4 files changed, 41 insertions(+), 15 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index 89173cf979..b5b5989948 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -869,7 +869,7 @@ database::init_commitlog() { return; } // Initiate a background flush. Waited upon in `stop()`. - (void)_tables_metadata._column_families[id]->flush(pos); + (void)_tables_metadata.get_table(id).flush(pos); }).release(); // we have longer life time than CL. Ignore reg anchor }); } @@ -965,7 +965,7 @@ void database::maybe_init_schema_commitlog() { return; } // Initiate a background flush. Waited upon in `stop()`. - (void)_tables_metadata._column_families[id]->flush(pos); + (void)_tables_metadata.get_table(id).flush(pos); }).release(); } @@ -1145,7 +1145,7 @@ future<> database::drop_table_on_all_shards(sharded& sharded_db, sstri table_id database::find_uuid(std::string_view ks, std::string_view cf) const { try { - return _tables_metadata._ks_cf_to_uuid.at(std::make_pair(ks, cf)); + return _tables_metadata.get_table_id(std::make_pair(ks, cf)); } catch (std::out_of_range&) { throw no_such_column_family(ks, cf); } @@ -1268,7 +1268,7 @@ const column_family& database::find_column_family(std::string_view ks_name, std: column_family& database::find_column_family(const table_id& uuid) { try { - return *_tables_metadata._column_families.at(uuid); + return _tables_metadata.get_table(uuid); } catch (...) { throw no_such_column_family(uuid); } @@ -1276,7 +1276,7 @@ column_family& database::find_column_family(const table_id& uuid) { const column_family& database::find_column_family(const table_id& uuid) const { try { - return *_tables_metadata._column_families.at(uuid); + return _tables_metadata.get_table(uuid); } catch (...) { throw no_such_column_family(uuid); } @@ -2786,8 +2786,8 @@ future<> database::clear_snapshot(sstring tag, std::vector keyspace_nam // and has no remaining snapshots if (!has_snapshots) { auto [cf_name, cf_uuid] = extract_cf_name_and_uuid(table_ent->name); - const auto& it = _tables_metadata._ks_cf_to_uuid.find(std::make_pair(ks_name, cf_name)); - auto dropped = (it == _tables_metadata._ks_cf_to_uuid.cend()) || (cf_uuid != it->second); + auto id_opt = _tables_metadata.get_table_id_if_exists(std::make_pair(ks_name, cf_name)); + auto dropped = !id_opt || (cf_uuid != id_opt); if (dropped) { dblog.info("Removing dropped table dir {}", table_dir); sstables::remove_table_directory_if_has_no_snapshots(table_dir).get(); @@ -2883,6 +2883,28 @@ future<> database::tables_metadata::remove_table(schema_ptr schema) noexcept { } } +table& database::tables_metadata::get_table(table_id id) const { + return *_column_families.at(id); +} + +table_id database::tables_metadata::get_table_id(const std::pair& kscf) const { + return _ks_cf_to_uuid.at(kscf); +} + +lw_shared_ptr
database::tables_metadata::get_table_if_exists(table_id id) const { + if (auto it = _column_families.find(id); it != _column_families.end()) { + return it->second; + } + return nullptr; +} + +table_id database::tables_metadata::get_table_id_if_exists(const std::pair& kscf) const { + if (auto it = _ks_cf_to_uuid.find(kscf); it != _ks_cf_to_uuid.end()) { + return it->second; + } + return table_id::create_null_id(); +} + void database::tables_metadata::for_each_table(std::function)> f) const { for (auto& [id, table]: _column_families) { f(id, table); diff --git a/replica/database.hh b/replica/database.hh index ba18234017..b66945c5ad 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1313,6 +1313,10 @@ public: future<> add_table(schema_ptr schema); future<> remove_table(schema_ptr schema) noexcept; + table& get_table(table_id id) const; + table_id get_table_id(const std::pair& kscf) const; + lw_shared_ptr
get_table_if_exists(table_id id) const; + table_id get_table_id_if_exists(const std::pair& kscf) const; void for_each_table(std::function)> f) const; void for_each_table_id(std::function f) const; future<> for_each_table_gently(std::function(table_id, lw_shared_ptr
)> f); diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 45332f6f5e..eab14fc5ca 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -472,11 +472,11 @@ future<> distributed_loader::populate_keyspace(distributed& d dblog.info("Populating Keyspace {}", ks_name); auto& ks = i->second; - auto& column_families = db.local().get_tables_metadata()._column_families; + auto& tables_metadata = db.local().get_tables_metadata(); co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data() | boost::adaptors::map_values, [&] (schema_ptr s) -> future<> { auto uuid = s->id(); - lw_shared_ptr cf = column_families[uuid]; + lw_shared_ptr cf = tables_metadata.get_table(uuid).shared_from_this(); // System tables (from system and system_schema keyspaces) are loaded in two phases. // The populate_keyspace function can be called in the second phase for tables that diff --git a/service/misc_services.cc b/service/misc_services.cc index ed9497bb80..8a2bbd8a98 100644 --- a/service/misc_services.cc +++ b/service/misc_services.cc @@ -159,11 +159,11 @@ future cache_hitrate_calculator::recalculate_hitrates() // set calculated rates on all shards return _db.invoke_on_all([this, cpuid = this_shard_id()] (replica::database& db) { return do_for_each(_rates, [this, cpuid, &db] (auto&& r) mutable { - auto it = db.get_tables_metadata()._column_families.find(r.first); - if (it == db.get_tables_metadata()._column_families.end()) { // a table may be added before map/reduce completes and this code runs + auto cf_opt = db.get_tables_metadata().get_table_if_exists(r.first); + if (!cf_opt) { // a table may be added before map/reduce completes and this code runs return; } - auto& cf = *it; + auto& cf = cf_opt; stat& s = r.second; float rate = 0; if (s.h) { @@ -171,10 +171,10 @@ future cache_hitrate_calculator::recalculate_hitrates() } if (this_shard_id() == cpuid) { // calculate max difference between old rate and new one for all cfs - _diff = std::max(_diff, std::abs(float(cf.second->get_global_cache_hit_rate()) - rate)); - _gstate += format("{}.{}:{:0.6f};", cf.second->schema()->ks_name(), cf.second->schema()->cf_name(), rate); + _diff = std::max(_diff, std::abs(float(cf->get_global_cache_hit_rate()) - rate)); + _gstate += format("{}.{}:{:0.6f};", cf->schema()->ks_name(), cf->schema()->cf_name(), rate); } - cf.second->set_global_cache_hit_rate(cache_temperature(rate)); + cf->set_global_cache_hit_rate(cache_temperature(rate)); }); }); }).then([this] { From ff26b2ba3f0c4dfa27dddd595bb4e988e483c86d Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 19 Jul 2023 11:50:16 +0200 Subject: [PATCH 08/10] replica: add methods to check if given table exists --- replica/database.cc | 20 ++++++++++++++------ replica/database.hh | 2 ++ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index b5b5989948..28cb49a62d 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -863,7 +863,7 @@ database::init_commitlog() { return db::commitlog::create_commitlog(db::commitlog::config::from_db_config(_cfg, _dbcfg.commitlog_scheduling_group, _dbcfg.available_memory)).then([this](db::commitlog&& log) { _commitlog = std::make_unique(std::move(log)); _commitlog->add_flush_handler([this](db::cf_id_type id, db::replay_position pos) { - if (!_tables_metadata._column_families.contains(id)) { + if (!_tables_metadata.contains(id)) { // the CF has been removed. _commitlog->discard_completed_segments(id); return; @@ -959,7 +959,7 @@ void database::maybe_init_schema_commitlog() { _schema_commitlog = std::make_unique(db::commitlog::create_commitlog(std::move(c)).get0()); _schema_commitlog->add_flush_handler([this] (db::cf_id_type id, db::replay_position pos) { - if (!_tables_metadata._column_families.contains(id)) { + if (!_tables_metadata.contains(id)) { // the CF has been removed. _schema_commitlog->discard_completed_segments(id); return; @@ -1017,11 +1017,11 @@ future<> database::add_column_family(keyspace& ks, schema_ptr schema, column_fam cf->set_durable_writes(ks.metadata()->durable_writes()); auto uuid = schema->id(); - if (_tables_metadata._column_families.contains(uuid)) { + if (_tables_metadata.contains(uuid)) { throw std::invalid_argument("UUID " + uuid.to_sstring() + " already mapped"); } auto kscf = std::make_pair(schema->ks_name(), schema->cf_name()); - if (_tables_metadata._ks_cf_to_uuid.contains(kscf)) { + if (_tables_metadata.contains(kscf)) { throw std::invalid_argument("Column family " + schema->cf_name() + " exists"); } ks.add_or_update_column_family(schema); @@ -1283,7 +1283,7 @@ const column_family& database::find_column_family(const table_id& uuid) const { } bool database::column_family_exists(const table_id& uuid) const { - return _tables_metadata._column_families.contains(uuid); + return _tables_metadata.contains(uuid); } future<> @@ -1407,7 +1407,7 @@ schema_ptr database::find_schema(const table_id& uuid) const { } bool database::has_schema(std::string_view ks_name, std::string_view cf_name) const { - return _tables_metadata._ks_cf_to_uuid.contains(std::make_pair(ks_name, cf_name)); + return _tables_metadata.contains(std::make_pair(ks_name, cf_name)); } std::vector database::get_views() const { @@ -2905,6 +2905,14 @@ table_id database::tables_metadata::get_table_id_if_exists(const std::pair kscf) const { + return _ks_cf_to_uuid.contains(kscf); +} + void database::tables_metadata::for_each_table(std::function)> f) const { for (auto& [id, table]: _column_families) { f(id, table); diff --git a/replica/database.hh b/replica/database.hh index b66945c5ad..929d2fd4c8 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1317,6 +1317,8 @@ public: table_id get_table_id(const std::pair& kscf) const; lw_shared_ptr
get_table_if_exists(table_id id) const; table_id get_table_id_if_exists(const std::pair& kscf) const; + bool contains(table_id id) const; + bool contains(std::pair kscf) const; void for_each_table(std::function)> f) const; void for_each_table_id(std::function f) const; future<> for_each_table_gently(std::function(table_id, lw_shared_ptr
)> f); From c5cad803b309978cf74d138d49839ac706dd97a3 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 19 Jul 2023 12:52:35 +0200 Subject: [PATCH 09/10] replica: add methods to get a filtered copy of tables map --- replica/database.cc | 19 ++++++++++--------- replica/database.hh | 5 +++++ service/misc_services.cc | 2 +- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index 28cb49a62d..42e09ed6b7 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1241,11 +1241,9 @@ std::unordered_map databa std::vector> database::get_non_system_column_families() const { return boost::copy_range>>( - get_tables_metadata()._column_families - | boost::adaptors::map_values - | boost::adaptors::filtered([](const lw_shared_ptr& cf) { - return !is_system_keyspace(cf->schema()->ks_name()); - })); + get_tables_metadata().filter([] (auto uuid_and_cf) { + return !is_system_keyspace(uuid_and_cf.second->schema()->ks_name()); + }) | boost::adaptors::map_values); } column_family& database::find_column_family(std::string_view ks_name, std::string_view cf_name) { @@ -1452,11 +1450,10 @@ future<> database::create_keyspace_on_all_shards(sharded& sharded_db, future<> database::drop_caches() const { - std::unordered_map> tables = get_tables_metadata()._column_families; + std::unordered_map> tables = get_tables_metadata().get_column_families_copy(); for (auto&& e : tables) { table& t = *e.second; co_await t.get_row_cache().invalidate(row_cache::external_updater([] {})); - auto sstables = t.get_sstables(); for (sstables::shared_sstable sst : *sstables) { co_await sst->drop_caches(); @@ -2800,7 +2797,7 @@ future<> database::clear_snapshot(sstring tag, std::vector keyspace_nam } future<> database::flush_non_system_column_families() { - auto non_system_cfs = get_tables_metadata()._column_families | boost::adaptors::filtered([this] (auto& uuid_and_cf) { + auto non_system_cfs = get_tables_metadata().filter([this] (auto uuid_and_cf) { auto cf = uuid_and_cf.second; auto& ks = cf->schema()->ks_name(); return !is_system_keyspace(ks) && !_cfg.extensions().is_extension_internal_keyspace(ks); @@ -2822,7 +2819,7 @@ future<> database::flush_non_system_column_families() { } future<> database::flush_system_column_families() { - auto system_cfs = get_tables_metadata()._column_families | boost::adaptors::filtered([this] (auto& uuid_and_cf) { + auto system_cfs = get_tables_metadata().filter([this] (auto uuid_and_cf) { auto cf = uuid_and_cf.second; auto& ks = cf->schema()->ks_name(); return is_system_keyspace(ks) || _cfg.extensions().is_extension_internal_keyspace(ks); @@ -2939,6 +2936,10 @@ future<> database::tables_metadata::parallel_for_each_table(std::function> database::tables_metadata::get_column_families_copy() const { + return _column_families; +} + data_dictionary::database database::as_data_dictionary() const { static constinit data_dictionary_impl _impl; diff --git a/replica/database.hh b/replica/database.hh index 929d2fd4c8..06159b63fb 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1323,6 +1323,11 @@ public: void for_each_table_id(std::function f) const; future<> for_each_table_gently(std::function(table_id, lw_shared_ptr
)> f); future<> parallel_for_each_table(std::function(table_id, lw_shared_ptr
)> f); + const std::unordered_map> get_column_families_copy() const; + + const auto filter(std::function>)> f) const { + return _column_families | boost::adaptors::filtered(std::move(f)); + } }; private: replica::cf_stats _cf_stats; diff --git a/service/misc_services.cc b/service/misc_services.cc index 8a2bbd8a98..3d82534817 100644 --- a/service/misc_services.cc +++ b/service/misc_services.cc @@ -137,7 +137,7 @@ future cache_hitrate_calculator::recalculate_hitrates() }; auto cf_to_cache_hit_stats = [non_system_filter] (replica::database& db) { - return boost::copy_range>(db.get_tables_metadata()._column_families | boost::adaptors::filtered(non_system_filter) | + return boost::copy_range>(db.get_tables_metadata().filter(non_system_filter) | boost::adaptors::transformed([] (const std::pair>& cf) { auto& stats = cf.second->get_row_cache().stats(); return std::make_pair(cf.first, stat{float(stats.reads_with_no_misses.rate().rates[0]), float(stats.reads_with_misses.rate().rates[0])}); From 6e6ba7309e9bf521311f8f4f765923c7005174a0 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 19 Jul 2023 14:07:30 +0200 Subject: [PATCH 10/10] replica: make tables_metadata's attributes private Make _column_families and _ks_cf_to_uuid private to prevent unsafe access. The maps can be accessed only through method which use locks if preemption is possible. --- replica/database.hh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/replica/database.hh b/replica/database.hh index 06159b63fb..f657fdf206 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1305,10 +1305,9 @@ public: flat_hash_map; class tables_metadata { rwlock _cf_lock; - public: // FIXME: change member access to private. std::unordered_map> _column_families; ks_cf_to_uuid_t _ks_cf_to_uuid; - + public: size_t size() const noexcept; future<> add_table(schema_ptr schema);