From cdbfa0b2f5eccbec129823842213753313fa6193 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Tue, 18 Jul 2023 18:12:42 +0200 Subject: [PATCH] replica: iterate safely over tables related maps Loops over _column_families and _ks_cf_to_uuid which may preempt are protected by reader mode of rwlock so that iterators won't get invalid. --- api/column_family.cc | 28 +++++++------- api/column_family.hh | 7 ++-- api/compaction_manager.cc | 4 +- api/storage_service.cc | 11 +++--- cdc/generation.cc | 10 ++--- db/commitlog/commitlog_replayer.cc | 9 ++--- db/view/view_update_generator.cc | 6 +-- db/virtual_tables.cc | 14 +++---- main.cc | 17 ++++---- repair/repair.cc | 15 ++++---- repair/row_level.cc | 10 ++--- replica/data_dictionary_impl.hh | 10 ++--- replica/database.cc | 62 ++++++++++++++++++++++-------- replica/database.hh | 6 +++ service/misc_services.cc | 6 +-- service/storage_service.cc | 8 ++-- streaming/stream_session.cc | 6 +-- test/boost/cql_query_large_test.cc | 4 +- test/lib/cql_test_env.cc | 6 +-- 19 files changed, 135 insertions(+), 104 deletions(-) diff --git a/api/column_family.cc b/api/column_family.cc index 030ab20fe6..22b5e36fad 100644 --- a/api/column_family.cc +++ b/api/column_family.cc @@ -135,9 +135,9 @@ static future get_cf_histogram(http_context& ctx, const static future get_cf_histogram(http_context& ctx, utils::timed_rate_moving_average_summary_and_histogram replica::column_family_stats::*f) { std::function fun = [f] (const replica::database& db) { utils::ihistogram res; - for (auto i : db.get_tables_metadata()._column_families) { - res += (i.second->get_stats().*f).hist; - } + db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) mutable { + res += (table->get_stats().*f).hist; + }); return res; }; return ctx.db.map(fun).then([](const std::vector &res) { @@ -162,9 +162,9 @@ static future get_cf_rate_and_histogram(http_context& c static future get_cf_rate_and_histogram(http_context& ctx, utils::timed_rate_moving_average_summary_and_histogram replica::column_family_stats::*f) { std::function fun = [f] (const replica::database& db) { utils::rate_moving_average_and_histogram res; - for (auto i : db.get_tables_metadata()._column_families) { - res += (i.second->get_stats().*f).rate(); - } + db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) { + res += (table->get_stats().*f).rate(); + }); return res; }; return ctx.db.map(fun).then([](const std::vector &res) { @@ -306,21 +306,21 @@ ratio_holder filter_recent_false_positive_as_ratio_holder(const sstables::shared void set_column_family(http_context& ctx, routes& r, sharded& sys_ks) { cf::get_column_family_name.set(r, [&ctx] (const_req req){ std::vector res; - for (auto i: ctx.db.local().get_tables_metadata()._ks_cf_to_uuid) { - res.push_back(i.first.first + ":" + i.first.second); - } + ctx.db.local().get_tables_metadata().for_each_table_id([&] (const std::pair& kscf, table_id) { + res.push_back(kscf.first + ":" + kscf.second); + }); return res; }); cf::get_column_family.set(r, [&ctx] (std::unique_ptr req){ - std::list res; - for (auto i: ctx.db.local().get_tables_metadata()._ks_cf_to_uuid) { + std::list res; + ctx.db.local().get_tables_metadata().for_each_table_id([&] (const std::pair& kscf, table_id) { cf::column_family_info info; - info.ks = i.first.first; - info.cf = i.first.second; + info.ks = kscf.first; + info.cf = kscf.second; info.type = "ColumnFamilies"; res.push_back(info); - } + }); return make_ready_future(json::stream_range_as_array(std::move(res), std::identity())); }); diff --git a/api/column_family.hh b/api/column_family.hh index 0418c97fac..a90229e545 100644 --- a/api/column_family.hh +++ b/api/column_family.hh @@ -68,9 +68,10 @@ struct map_reduce_column_families_locally { std::function(std::unique_ptr, std::unique_ptr)> reducer; future> operator()(replica::database& db) const { auto res = seastar::make_lw_shared>(std::make_unique(init)); - return do_for_each(db.get_tables_metadata()._column_families, [res, this](const std::pair>& i) { - *res = reducer(std::move(*res), mapper(*i.second.get())); - }).then([res] { + return db.get_tables_metadata().for_each_table_gently([res, this] (table_id, seastar::lw_shared_ptr table) { + *res = reducer(std::move(*res), mapper(*table.get())); + return make_ready_future(); + }).then([res] () { return std::move(*res); }); } diff --git a/api/compaction_manager.cc b/api/compaction_manager.cc index 461b396f19..0f2d0ec184 100644 --- a/api/compaction_manager.cc +++ b/api/compaction_manager.cc @@ -68,8 +68,8 @@ void set_compaction_manager(http_context& ctx, routes& r) { cm::get_pending_tasks_by_table.set(r, [&ctx] (std::unique_ptr req) { return ctx.db.map_reduce0([](replica::database& db) { return do_with(std::unordered_map, uint64_t, utils::tuple_hash>(), [&db](std::unordered_map, uint64_t, utils::tuple_hash>& tasks) { - return do_for_each(db.get_tables_metadata()._column_families, [&tasks](const std::pair>& i) -> future<> { - replica::table& cf = *i.second.get(); + return db.get_tables_metadata().for_each_table_gently([&tasks] (table_id, lw_shared_ptr table) { + replica::table& cf = *table.get(); tasks[std::make_pair(cf.schema()->ks_name(), cf.schema()->cf_name())] = cf.estimate_pending_compactions(); return make_ready_future<>(); }).then([&tasks] { diff --git a/api/storage_service.cc b/api/storage_service.cc index 2ef549646f..e0fec01544 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -980,10 +980,9 @@ void set_storage_service(http_context& ctx, routes& r, shardedset_incremental_backups(value); - } + db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) { + table->set_incremental_backups(value); + }); }).then([] { return make_ready_future(json_void()); }); @@ -1258,7 +1257,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded t) { auto& schema = t->schema(); if ((ks.empty() || ks == schema->ks_name()) && (cf.empty() || cf == schema->cf_name())) { // at most Nsstables long @@ -1339,7 +1338,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded generation_service::maybe_rewrite_streams_descriptions() { // For each CDC log table get the TTL setting (from CDC options) and the table's creation time std::vector times_and_ttls; - for (auto& [_, cf] : _db.get_tables_metadata()._column_families) { - auto& s = *cf->schema(); + _db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr t) { + auto& s = *t->schema(); auto base = cdc::get_base_table(_db, s.ks_name(), s.cf_name()); if (!base) { // Not a CDC log table. - continue; + return; } auto& cdc_opts = base->cdc_options(); if (!cdc_opts.enabled()) { // This table is named like a CDC log table but it's not one. - continue; + return; } times_and_ttls.push_back(time_and_ttl{as_timepoint(s.id().uuid()), cdc_opts.ttl()}); - } + }); if (times_and_ttls.empty()) { // There's no point in rewriting old generations' streams (they don't contain any data). diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index d582733614..35f97c856e 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -126,8 +126,7 @@ future<> db::commitlog_replayer::impl::init() { } }, [this](replica::database& db) { return do_with(shard_rpm_map{}, [this, &db](shard_rpm_map& map) { - return parallel_for_each(db.get_tables_metadata()._column_families, [this, &map](auto& cfp) { - auto uuid = cfp.first; + return db.get_tables_metadata().parallel_for_each_table([this, &map] (table_id uuid, lw_shared_ptr) { // We do this on each cpu, for each CF, which technically is a little wasteful, but the values are // cached, this is only startup, and it makes the code easier. // Get all truncation records for the CF and initialize max rps if @@ -156,13 +155,13 @@ future<> db::commitlog_replayer::impl::init() { // existing sstables-per-shard. // So, go through all CF:s and check, if a shard mapping does not // have data for it, assume we must set global pos to zero. - for (auto&p : _db.local().get_tables_metadata()._column_families) { + _db.local().get_tables_metadata().for_each_table([&] (table_id id, lw_shared_ptr) { for (auto&p1 : _rpm) { // for each shard - if (!p1.second.contains(p.first)) { + if (!p1.second.contains(id)) { _min_pos[p1.first] = replay_position(); } } - } + }); for (auto&p : _min_pos) { rlogger.debug("minimum position for shard {}: {}", p.first, p.second); } diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index d062aff210..ec50f31583 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -265,8 +265,8 @@ void view_update_generator::setup_metrics() { } void view_update_generator::discover_staging_sstables() { - for (auto& x : _db.get_tables_metadata()._column_families) { - auto t = x.second->shared_from_this(); + _db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) { + auto t = table->shared_from_this(); for (auto sstables = t->get_sstables(); sstables::shared_sstable sst : *sstables) { if (sst->requires_view_building()) { _progress_tracker->on_sstable_registration(sst); @@ -276,7 +276,7 @@ void view_update_generator::discover_staging_sstables() { _registration_sem.consume(1); } } - } + }); } } diff --git a/db/virtual_tables.cc b/db/virtual_tables.cc index 44a1aea1e1..90241431d7 100644 --- a/db/virtual_tables.cc +++ b/db/virtual_tables.cc @@ -283,13 +283,13 @@ public: const auto snapshots_by_tables = co_await _db.map_reduce(snapshot_reducer(), [ks_name_ = ks_data.name] (replica::database& db) mutable -> future { auto ks_name = std::move(ks_name_); snapshots_by_tables_map snapshots_by_tables; - for (auto& [_, table] : db.get_tables_metadata()._column_families) { + co_await db.get_tables_metadata().for_each_table_gently(coroutine::lambda([&] (table_id, lw_shared_ptr table) -> future<> { if (table->schema()->ks_name() != ks_name) { - continue; + co_return; } const auto unordered_snapshots = co_await table->get_snapshot_details(); snapshots_by_tables.emplace(table->schema()->cf_name(), std::map(unordered_snapshots.begin(), unordered_snapshots.end())); - } + })); co_return snapshots_by_tables; }); @@ -433,9 +433,9 @@ private: }; co_return co_await _db.map_reduce(shard_reducer(reduce), [map, reduce] (replica::database& db) { T val = {}; - for (auto& [_, table] : db.get_tables_metadata()._column_families) { + db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) { val = reduce(val, map(*table)); - } + }); return val; }); } @@ -560,13 +560,13 @@ public: res.total = occupancy.total_space(); res.free = occupancy.free_space(); res.entries = db.row_cache_tracker().partitions(); - for (const auto& [_, t] : db.get_tables_metadata()._column_families) { + db.get_tables_metadata().for_each_table([&] (table_id id, lw_shared_ptr t) { auto& cache_stats = t->get_row_cache().stats(); res.hits += cache_stats.hits.count(); res.misses += cache_stats.misses.count(); res.hits_moving_average += cache_stats.hits.rate(); res.requests_moving_average += (cache_stats.hits.rate() + cache_stats.misses.rate()); - } + }); return res; }, stats{}, stats::reduce).then([] (stats s) { return std::vector>{ diff --git a/main.cc b/main.cc index b73c9cfd45..8c3c68e7d5 100644 --- a/main.cc +++ b/main.cc @@ -1346,8 +1346,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // Needs to happen before replaying the schema commitlog, which interprets // replay position in the truncation record. // Needs to happen before system_keyspace::setup(), which reads truncation records. - for (auto&& e : db.local().get_tables_metadata()._column_families) { - auto table_ptr = e.second; + db.local().get_tables_metadata().for_each_table([] (table_id, lw_shared_ptr table_ptr) { if (table_ptr->schema()->ks_name() == db::schema_tables::NAME) { if (table_ptr->get_truncation_record() != db_clock::time_point::min()) { // replay_position stored in the truncation record may belong to @@ -1360,7 +1359,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl table_ptr->schema()->ks_name(), table_ptr->schema()->cf_name())); } } - } + }); auto sch_cl = db.local().schema_commitlog(); if (sch_cl != nullptr) { @@ -1405,10 +1404,10 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl } db.invoke_on_all([] (replica::database& db) { - for (auto& x : db.get_tables_metadata()._column_families) { - replica::table& t = *(x.second); + db.get_tables_metadata().for_each_table([] (table_id, lw_shared_ptr table) { + replica::table& t = *table; t.enable_auto_compaction(); - } + }); }).get(); // If the same sstable is shared by several shards, it cannot be @@ -1423,10 +1422,10 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // streaming db.invoke_on_all([] (replica::database& db) { - for (auto& x : db.get_tables_metadata()._column_families) { - replica::column_family& cf = *(x.second); + db.get_tables_metadata().for_each_table([] (table_id, lw_shared_ptr table) { + replica::column_family& cf = *table; cf.trigger_compaction(); - } + }); }).get(); api::set_server_gossip(ctx, gossiper).get(); api::set_server_snitch(ctx, snitch).get(); diff --git a/repair/repair.cc b/repair/repair.cc index bfad21f924..79af1dbb9a 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -127,19 +127,20 @@ std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo } static size_t get_nr_tables(const replica::database& db, const sstring& keyspace) { - auto& m = db.get_tables_metadata()._ks_cf_to_uuid; - return std::count_if(m.begin(), m.end(), [&keyspace] (auto& e) { - return e.first.first == keyspace; + size_t tables = 0; + db.get_tables_metadata().for_each_table_id([&keyspace, &tables] (const std::pair& kscf, table_id) { + tables += kscf.first == keyspace; }); + return tables; } static std::vector list_column_families(const replica::database& db, const sstring& keyspace) { std::vector ret; - for (auto &&e : db.get_tables_metadata()._ks_cf_to_uuid) { - if (e.first.first == keyspace) { - ret.push_back(e.first.second); + db.get_tables_metadata().for_each_table_id([&] (const std::pair& kscf, table_id) { + if (kscf.first == keyspace) { + ret.push_back(kscf.second); } - } + }); return ret; } diff --git a/repair/row_level.cc b/repair/row_level.cc index 7419ba9c84..a9afa3fc6b 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -3050,13 +3050,10 @@ future<> repair_service::cleanup_history(tasks::task_id repair_id) { } future<> repair_service::load_history() { - auto tables = get_db().local().get_tables_metadata()._column_families; - for (const auto& x : tables) { - auto& table_uuid = x.first; - auto& table = x.second; + co_await get_db().local().get_tables_metadata().for_each_table_gently(coroutine::lambda([&] (table_id table_uuid, lw_shared_ptr table) -> future<> { auto shard = unsigned(table_uuid.uuid().get_most_significant_bits()) % smp::count; if (shard != this_shard_id()) { - continue; + co_return; } rlogger.info("Loading repair history for keyspace={}, table={}, table_uuid={}", table->schema()->ks_name(), table->schema()->cf_name(), table_uuid); @@ -3077,8 +3074,7 @@ future<> repair_service::load_history() { entry.ks, entry.cf, range, repair_time); } }); - } - co_return; + })); } repair_meta_ptr repair_service::get_repair_meta(gms::inet_address from, uint32_t repair_meta_id) { diff --git a/replica/data_dictionary_impl.hh b/replica/data_dictionary_impl.hh index de84aa16f5..58ce6e72ad 100644 --- a/replica/data_dictionary_impl.hh +++ b/replica/data_dictionary_impl.hh @@ -66,11 +66,11 @@ public: } virtual std::vector get_tables(data_dictionary::database db) const override { std::vector ret; - auto&& tables = unwrap(db).get_tables_metadata()._column_families; - ret.reserve(tables.size()); - for (auto&& [uuid, cf] : tables) { - ret.push_back(wrap(*cf)); - } + auto& tmd = unwrap(db).get_tables_metadata(); + ret.reserve(tmd.size()); + tmd.for_each_table([&] (table_id, const lw_shared_ptr table) { + ret.push_back(wrap(*table)); + }); return ret; } virtual std::optional try_find_table(data_dictionary::database db, std::string_view ks, std::string_view table) const override { diff --git a/replica/database.cc b/replica/database.cc index 6da0b7bc13..80cad24dc8 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -154,20 +154,20 @@ phased_barrier_top_10_counts(const database::tables_metadata& tables_metadata, s boost::container::static_vector res; count_and_tables* min_element = nullptr; - for (const auto& [tid, table] : tables_metadata._column_families) { + tables_metadata.for_each_table([&] (table_id tid, lw_shared_ptr
table) { const auto count = op_count_getter(*table); if (!count) { - continue; + return; } if (res.size() < res.capacity()) { auto& elem = res.emplace_back(count, table_list({table.get()})); if (!min_element || min_element->first > count) { min_element = &elem; } - continue; + return; } if (min_element->first > count) { - continue; + return; } auto it = boost::find_if(res, [count] (const count_and_tables& x) { @@ -175,13 +175,13 @@ phased_barrier_top_10_counts(const database::tables_metadata& tables_metadata, s }); if (it != res.end()) { it->second.push_back(table.get()); - continue; + return; } // If we are here, min_element->first < count *min_element = {count, table_list({table.get()})}; min_element = &*boost::min_element(res, less); - } + }); boost::sort(res, less); @@ -1802,10 +1802,10 @@ std::ostream& operator<<(std::ostream& out, const column_family& cf) { std::ostream& operator<<(std::ostream& out, const database& db) { out << "{\n"; - for (auto&& e : db._tables_metadata._column_families) { - auto&& cf = *e.second; - out << "(" << e.first.to_sstring() << ", " << cf.schema()->cf_name() << ", " << cf.schema()->ks_name() << "): " << cf << "\n"; - } + db._tables_metadata.for_each_table([&] (table_id id, const lw_shared_ptr
tp) { + auto&& cf = *tp; + out << "(" << id.to_sstring() << ", " << cf.schema()->cf_name() << ", " << cf.schema()->ks_name() << "): " << cf << "\n"; + }); out << "}"; return out; } @@ -2310,13 +2310,13 @@ schema_ptr database::find_indexed_table(const sstring& ks_name, const sstring& i future<> database::close_tables(table_kind kind_to_close) { auto b = defer([this] { _stop_barrier.abort(); }); - co_await coroutine::parallel_for_each(_tables_metadata._column_families, [this, kind_to_close](auto& val_pair) -> future<> { - auto& s = val_pair.second->schema(); + co_await _tables_metadata.parallel_for_each_table(coroutine::lambda([this, kind_to_close] (table_id, lw_shared_ptr
table) -> future<> { + auto& s = table->schema(); table_kind k = is_system_table(*s) || _cfg.extensions().is_extension_internal_keyspace(s->ks_name()) ? table_kind::system : table_kind::user; if (k == kind_to_close) { - co_await val_pair.second->stop(); + co_await table->stop(); } - }); + })); co_await _stop_barrier.arrive_and_wait(); b.cancel(); } @@ -2399,8 +2399,8 @@ future<> database::stop() { } future<> database::flush_all_memtables() { - return parallel_for_each(_tables_metadata._column_families, [] (auto& cfp) { - return cfp.second->flush(); + return _tables_metadata.parallel_for_each_table([] (table_id, lw_shared_ptr
table) { + return table->flush(); }); } @@ -2855,6 +2855,10 @@ future<> database::drain() { b.cancel(); } +size_t database::tables_metadata::size() const noexcept { + return _column_families.size(); +} + future<> database::tables_metadata::add_table(schema_ptr schema) { auto holder = co_await _cf_lock.hold_write_lock(); auto id = schema->id(); @@ -2879,6 +2883,32 @@ future<> database::tables_metadata::remove_table(schema_ptr schema) noexcept { } } +void database::tables_metadata::for_each_table(std::function)> f) const { + for (auto& [id, table]: _column_families) { + f(id, table); + } +} + +void database::tables_metadata::for_each_table_id(std::function f) const { + for (auto& [kscf, id]: _ks_cf_to_uuid) { + f(kscf, id); + } +} + +future<> database::tables_metadata::for_each_table_gently(std::function(table_id, lw_shared_ptr
)> f) { + auto holder = co_await _cf_lock.hold_read_lock(); + for (auto& [id, table]: _column_families) { + co_await f(id, table); + } +} + +future<> database::tables_metadata::parallel_for_each_table(std::function(table_id, lw_shared_ptr
)> f) { + auto holder = co_await _cf_lock.hold_read_lock(); + co_await coroutine::parallel_for_each(_column_families, [f = std::move(f)] (auto& table) { + return f(table.first, table.second); + }); +} + data_dictionary::database database::as_data_dictionary() const { static constinit data_dictionary_impl _impl; diff --git a/replica/database.hh b/replica/database.hh index 0cf1a357ac..554d6b57cc 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1309,8 +1309,14 @@ public: std::unordered_map> _column_families; ks_cf_to_uuid_t _ks_cf_to_uuid; + size_t size() const noexcept; + future<> add_table(schema_ptr schema); future<> remove_table(schema_ptr schema) noexcept; + void for_each_table(std::function)> f) const; + void for_each_table_id(std::function f) const; + future<> for_each_table_gently(std::function(table_id, lw_shared_ptr
)> f); + future<> parallel_for_each_table(std::function(table_id, lw_shared_ptr
)> f); }; private: replica::cf_stats _cf_stats; diff --git a/service/misc_services.cc b/service/misc_services.cc index cc72f9e9b0..ed9497bb80 100644 --- a/service/misc_services.cc +++ b/service/misc_services.cc @@ -78,9 +78,9 @@ void load_broadcaster::start_broadcasting() { llogger.debug("Disseminating load info ..."); _done = _db.map_reduce0([](replica::database& db) { int64_t res = 0; - for (auto i : db.get_tables_metadata()._column_families) { - res += i.second->get_stats().live_disk_space_used; - } + db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) { + res += table->get_stats().live_disk_space_used; + }); return res; }, int64_t(0), std::plus()).then([this] (int64_t size) { return _gossiper.add_local_application_state(gms::application_state::LOAD, diff --git a/service/storage_service.cc b/service/storage_service.cc index e249e52d22..4100a1e8d1 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3093,16 +3093,16 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt co_await container().invoke_on_all([&] (storage_service& ss) { auto& db = ss._db.local(); auto tmptr = pending_token_metadata_ptr[this_shard_id()]; - for (auto&& [id, cf] : db.get_tables_metadata()._column_families) { // Safe because we iterate without preemption - auto rs = db.find_keyspace(cf->schema()->keypace_name()).get_replication_strategy_ptr(); + db.get_tables_metadata().for_each_table([&] (table_id id, lw_shared_ptr table) { + auto rs = db.find_keyspace(table->schema()->keypace_name()).get_replication_strategy_ptr(); locator::effective_replication_map_ptr erm; if (auto pt_rs = rs->maybe_as_per_table()) { erm = pt_rs->make_replication_map(id, tmptr); } else { - erm = pending_effective_replication_maps[this_shard_id()][cf->schema()->keypace_name()]; + erm = pending_effective_replication_maps[this_shard_id()][table->schema()->keypace_name()]; } pending_table_erms[this_shard_id()].emplace(id, std::move(erm)); - } + }); }); } catch (...) { ex = std::current_exception(); diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 4b27f40e13..e5764353f9 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -462,15 +462,15 @@ std::vector stream_session::get_column_family_stores(co std::vector stores; auto& db = manager().db(); if (column_families.empty()) { - for (auto& x : db.get_tables_metadata()._column_families) { - replica::column_family& cf = *(x.second); + db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr tp) { + replica::column_family& cf = *tp; auto cf_name = cf.schema()->cf_name(); auto ks_name = cf.schema()->ks_name(); if (ks_name == keyspace) { sslog.debug("Find ks={} cf={}", ks_name, cf_name); stores.push_back(&cf); } - } + }); } else { // TODO: We can move this to database class and use shared_ptr instead for (auto& cf_name : column_families) { diff --git a/test/boost/cql_query_large_test.cc b/test/boost/cql_query_large_test.cc index 36ebf52a81..c08f2fa8b9 100644 --- a/test/boost/cql_query_large_test.cc +++ b/test/boost/cql_query_large_test.cc @@ -116,8 +116,8 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) { // and the old sstable is deleted. flush(e); e.db().invoke_on_all([] (replica::database& dbi) { - return parallel_for_each(dbi.get_tables_metadata()._column_families, [&dbi] (auto& table) { - return dbi.get_compaction_manager().perform_major_compaction((table.second)->as_table_state()); + return dbi.get_tables_metadata().parallel_for_each_table([&dbi] (table_id, lw_shared_ptr t) { + return dbi.get_compaction_manager().perform_major_compaction(t->as_table_state()); }); }).get(); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index f0ae855cce..2c69de282a 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -860,10 +860,10 @@ public: replica::distributed_loader::init_non_system_keyspaces(db, proxy, sys_ks).get(); db.invoke_on_all([] (replica::database& db) { - for (auto& x : db.get_tables_metadata()._column_families) { - replica::table& t = *(x.second); + db.get_tables_metadata().for_each_table([] (table_id, lw_shared_ptr table) { + replica::table& t = *table; t.enable_auto_compaction(); - } + }); }).get(); if (raft_gr.local().is_enabled()) {