From f3528ede6541407bfb8c211ecf2efe84d44c457b Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 2 Nov 2016 14:33:27 -0400 Subject: [PATCH] database: change find_column_families signature so it returns a lw_shared_ptr There are places in which we need to use the column family object many times, with deferring points in between. Because the column family may have been destroyed in the deferring point, we need to go and find it again. If we use lw_shared_ptr, however, we'll be able to at least guarantee that the object will be alive. Some users will still need to check, if they want to guarantee that the column family wasn't removed. But others that only need to make sure we don't access an invalid object will be able to avoid the cost of re-finding it just fine. Signed-off-by: Glauber Costa Message-Id: <722bf49e158da77ff509372c2034e5707706e5bf.1478111467.git.glauber@scylladb.com> --- api/column_family.cc | 22 ++++++------ api/column_family.hh | 4 +-- api/storage_service.cc | 16 ++++----- database.cc | 56 +++++++++++++++--------------- database.hh | 12 +++---- db/commitlog/commitlog_replayer.cc | 14 ++++---- db/schema_tables.cc | 16 ++++----- db/system_keyspace.cc | 4 +-- repair/repair.cc | 8 ++--- service/migration_manager.cc | 2 +- service/storage_service.cc | 50 +++++++++++++------------- streaming/stream_session.cc | 24 ++++++------- streaming/stream_session.hh | 2 +- streaming/stream_transfer_task.cc | 4 +-- tests/cql_test_env.cc | 10 +++--- 15 files changed, 122 insertions(+), 122 deletions(-) diff --git a/api/column_family.cc b/api/column_family.cc index e8754ea143..12e10056e3 100644 --- a/api/column_family.cc +++ b/api/column_family.cc @@ -58,7 +58,7 @@ future<> foreach_column_family(http_context& ctx, const sstring& name, function< auto uuid = get_uuid(name, ctx.db.local()); return ctx.db.invoke_on_all([f, uuid](database& db) { - f(db.find_column_family(uuid)); + f(*(db.find_column_family(uuid))); }); } @@ -91,8 +91,8 @@ static future get_cf_stats_sum(http_context& ctx, const // so to get an estimation of sum, we multiply the mean // with count. The information is gather in nano second, // but reported in micro - column_family& cf = db.find_column_family(uuid); - return ((cf.get_stats().*f).hist.count/1000.0) * (cf.get_stats().*f).hist.mean; + auto cf = db.find_column_family(uuid); + return ((cf->get_stats().*f).hist.count/1000.0) * (cf->get_stats().*f).hist.mean; }, 0.0, std::plus()).then([](double res) { return make_ready_future((int64_t)res); }); @@ -110,7 +110,7 @@ static future get_cf_histogram(http_context& ctx, const utils::timed_rate_moving_average_and_histogram column_family::stats::*f) { utils::UUID uuid = get_uuid(name, ctx.db.local()); return ctx.db.map_reduce0([f, uuid](const database& p) { - return (p.find_column_family(uuid).get_stats().*f).hist;}, + return (p.find_column_family(uuid)->get_stats().*f).hist;}, utils::ihistogram(), std::plus()) .then([](const utils::ihistogram& val) { @@ -137,7 +137,7 @@ static future get_cf_rate_and_histogram(http_context& c utils::timed_rate_moving_average_and_histogram column_family::stats::*f) { utils::UUID uuid = get_uuid(name, ctx.db.local()); return ctx.db.map_reduce0([f, uuid](const database& p) { - return (p.find_column_family(uuid).get_stats().*f).rate();}, + return (p.find_column_family(uuid)->get_stats().*f).rate();}, utils::rate_moving_average_and_histogram(), std::plus()) .then([](const utils::rate_moving_average_and_histogram& val) { @@ -219,8 +219,8 @@ static future sum_sstable(http_context& ctx, const sstr auto uuid = get_uuid(name, ctx.db.local()); return ctx.db.map_reduce0([uuid, total](database& db) { std::unordered_map m; - auto sstables = (total) ? db.find_column_family(uuid).get_sstables_including_compacted_undeleted() : - db.find_column_family(uuid).get_sstables(); + auto sstables = (total) ? db.find_column_family(uuid)->get_sstables_including_compacted_undeleted() : + db.find_column_family(uuid)->get_sstables(); for (auto t : *sstables) { m[t->get_filename()] = t->bytes_on_disk(); } @@ -723,7 +723,7 @@ void set_column_family(http_context& ctx, routes& r) { cf::get_true_snapshots_size.set(r, [&ctx] (std::unique_ptr req) { auto uuid = get_uuid(req->param["name"], ctx.db.local()); - return ctx.db.local().find_column_family(uuid).get_snapshot_details().then([]( + return ctx.db.local().find_column_family(uuid)->get_snapshot_details().then([]( const std::unordered_map& sd) { int64_t res = 0; for (auto i : sd) { @@ -861,8 +861,8 @@ void set_column_family(http_context& ctx, routes& r) { auto uuid = get_uuid(req->param["name"], ctx.db.local()); return ctx.db.map_reduce(sum_ratio(), [uuid](database& db) { - column_family& cf = db.find_column_family(uuid); - return make_ready_future(get_compression_ratio(cf)); + auto cf = db.find_column_family(uuid); + return make_ready_future(get_compression_ratio(*cf)); }).then([] (const double& result) { return make_ready_future(result); }); @@ -892,7 +892,7 @@ void set_column_family(http_context& ctx, routes& r) { }); cf::get_compaction_strategy_class.set(r, [&ctx](const_req req) { - return ctx.db.local().find_column_family(get_uuid(req.param["name"], ctx.db.local())).get_compaction_strategy().name(); + return ctx.db.local().find_column_family(get_uuid(req.param["name"], ctx.db.local()))->get_compaction_strategy().name(); }); cf::set_compression_parameters.set(r, [&ctx](std::unique_ptr req) { diff --git a/api/column_family.hh b/api/column_family.hh index 00d173e94a..803d12e1db 100644 --- a/api/column_family.hh +++ b/api/column_family.hh @@ -38,7 +38,7 @@ future map_reduce_cf_raw(http_context& ctx, const sstring& name, I init, Mapper mapper, Reducer reducer) { auto uuid = get_uuid(name, ctx.db.local()); return ctx.db.map_reduce0([mapper, uuid](database& db) { - return mapper(db.find_column_family(uuid)); + return mapper(*(db.find_column_family(uuid))); }, init, reducer); } @@ -56,7 +56,7 @@ future map_reduce_cf_raw(http_context& ctx, const sstring& name, I init, Mapper mapper, Reducer reducer, Result result) { auto uuid = get_uuid(name, ctx.db.local()); return ctx.db.map_reduce0([mapper, uuid](database& db) { - return mapper(db.find_column_family(uuid)); + return mapper(*(db.find_column_family(uuid))); }, init, reducer); } diff --git a/api/storage_service.cc b/api/storage_service.cc index 37b80a7cff..0cd92a8475 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -263,11 +263,11 @@ void set_storage_service(http_context& ctx, routes& r) { column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data()); } return ctx.db.invoke_on_all([keyspace, column_families] (database& db) { - std::vector column_families_vec; + std::vector> column_families_vec; for (auto cf : column_families) { - column_families_vec.push_back(&db.find_column_family(keyspace, cf)); + column_families_vec.push_back(db.find_column_family(keyspace, cf)); } - return parallel_for_each(column_families_vec, [] (column_family* cf) { + return parallel_for_each(column_families_vec, [] (lw_shared_ptr cf) { return cf->compact_all_sstables(); }); }).then([]{ @@ -282,13 +282,13 @@ void set_storage_service(http_context& ctx, routes& r) { column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data()); } return ctx.db.invoke_on_all([keyspace, column_families] (database& db) { - std::vector column_families_vec; + std::vector> column_families_vec; auto& cm = db.get_compaction_manager(); for (auto cf : column_families) { - column_families_vec.push_back(&db.find_column_family(keyspace, cf)); + column_families_vec.push_back(db.find_column_family(keyspace, cf)); } - return parallel_for_each(column_families_vec, [&cm] (column_family* cf) { - return cm.perform_cleanup(cf); + return parallel_for_each(column_families_vec, [&cm] (lw_shared_ptr cf) { + return cm.perform_cleanup(&*cf); }); }).then([]{ return make_ready_future(0); @@ -322,7 +322,7 @@ void set_storage_service(http_context& ctx, routes& r) { } return ctx.db.invoke_on_all([keyspace, column_families] (database& db) { return parallel_for_each(column_families, [&db, keyspace](const sstring& cf) mutable { - return db.find_column_family(keyspace, cf).flush(); + return db.find_column_family(keyspace, cf)->flush(); }); }).then([]{ return make_ready_future(json_void()); diff --git a/database.cc b/database.cc index 9dd008c463..41460dcee9 100644 --- a/database.cc +++ b/database.cc @@ -1924,8 +1924,8 @@ database::init_system_keyspace() { auto& ks = find_keyspace(db::system_keyspace::NAME); return parallel_for_each(ks.metadata()->cf_meta_data(), [this] (auto& pair) { auto cfm = pair.second; - auto& cf = this->find_column_family(cfm); - cf.mark_ready_for_writes(); + auto cf = this->find_column_family(cfm); + cf->mark_ready_for_writes(); return make_ready_future<>(); }); }); @@ -2089,7 +2089,7 @@ std::vector> database::get_non_system_column_famili })); } -column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) { +lw_shared_ptr database::find_column_family(const sstring& ks_name, const sstring& cf_name) { try { return find_column_family(find_uuid(ks_name, cf_name)); } catch (...) { @@ -2097,7 +2097,7 @@ column_family& database::find_column_family(const sstring& ks_name, const sstrin } } -const column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) const { +const lw_shared_ptr database::find_column_family(const sstring& ks_name, const sstring& cf_name) const { try { return find_column_family(find_uuid(ks_name, cf_name)); } catch (...) { @@ -2105,17 +2105,17 @@ const column_family& database::find_column_family(const sstring& ks_name, const } } -column_family& database::find_column_family(const utils::UUID& uuid) { +lw_shared_ptr database::find_column_family(const utils::UUID& uuid) { try { - return *_column_families.at(uuid); + return _column_families.at(uuid); } catch (...) { std::throw_with_nested(no_such_column_family(uuid)); } } -const column_family& database::find_column_family(const utils::UUID& uuid) const { +const lw_shared_ptr database::find_column_family(const utils::UUID& uuid) const { try { - return *_column_families.at(uuid); + return _column_families.at(uuid); } catch (...) { std::throw_with_nested(no_such_column_family(uuid)); } @@ -2209,11 +2209,11 @@ no_such_column_family::no_such_column_family(const sstring& ks_name, const sstri { } -column_family& database::find_column_family(const schema_ptr& schema) { +lw_shared_ptr database::find_column_family(const schema_ptr& schema) { return find_column_family(schema->id()); } -const column_family& database::find_column_family(const schema_ptr& schema) const { +const lw_shared_ptr database::find_column_family(const schema_ptr& schema) const { return find_column_family(schema->id()); } @@ -2233,7 +2233,7 @@ schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name) } schema_ptr database::find_schema(const utils::UUID& uuid) const { - return find_column_family(uuid).schema(); + return find_column_family(uuid)->schema(); } bool database::has_schema(const sstring& ks_name, const sstring& cf_name) const { @@ -2382,8 +2382,8 @@ column_family::as_mutation_source(tracing::trace_state_ptr trace_state) const { future> database::query(schema_ptr s, const query::read_command& cmd, query::result_request request, const std::vector& ranges, tracing::trace_state_ptr trace_state) { - column_family& cf = find_column_family(cmd.cf_id); - return cf.query(std::move(s), cmd, request, ranges, std::move(trace_state)).then([this, s = _stats] (auto&& res) { + auto cf = find_column_family(cmd.cf_id); + return cf->query(std::move(s), cmd, request, ranges, std::move(trace_state)).then([this, s = _stats] (auto&& res) { ++s->total_reads; return std::move(res); }); @@ -2391,8 +2391,8 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_requ future database::query_mutations(schema_ptr s, const query::read_command& cmd, const query::partition_range& range, tracing::trace_state_ptr trace_state) { - column_family& cf = find_column_family(cmd.cf_id); - return mutation_query(std::move(s), cf.as_mutation_source(std::move(trace_state)), range, cmd.slice, cmd.row_limit, cmd.partition_limit, + auto cf = find_column_family(cmd.cf_id); + return mutation_query(std::move(s), cf->as_mutation_source(std::move(trace_state)), range, cmd.slice, cmd.row_limit, cmd.partition_limit, cmd.timestamp).then([this, s = _stats] (auto&& res) { ++s->total_reads; return std::move(res); @@ -2543,8 +2543,8 @@ void dirty_memory_manager::maybe_do_active_flush() { // However, since we'll very soon have a mechanism in place to account for the memory // that was already written in one form or another, that disadvantage is mitigated. memtable& biggest_memtable = memtable::from_region(*_region_group.get_largest_region()); - auto& biggest_cf = _db->find_column_family(biggest_memtable.schema()); - memtable_list& mtlist = get_memtable_list(biggest_cf); + auto biggest_cf = _db->find_column_family(biggest_memtable.schema()); + memtable_list& mtlist = get_memtable_list(*biggest_cf); // Please note that this will eventually take the semaphore and prevent two concurrent flushes. // We don't need any other extra protection. mtlist.seal_active_memtable(memtable_list::flush_behavior::immediate); @@ -2565,8 +2565,8 @@ void dirty_memory_manager::start_reclaiming() { future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) { return _dirty_memory_manager.region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] { try { - auto& cf = find_column_family(m.column_family_id()); - cf.apply(m, m_schema, rp); + auto cf = find_column_family(m.column_family_id()); + cf->apply(m, m_schema, rp); } catch (no_such_column_family&) { dblog.error("Attempting to mutate non-existent table {}", m.column_family_id()); } @@ -2578,14 +2578,14 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m) { // is a little in flux and commitlog is created only when db is // initied from datadir. auto uuid = m.column_family_id(); - auto& cf = find_column_family(uuid); + auto cf = find_column_family(uuid); if (!s->is_synced()) { throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } - if (cf.commitlog() != nullptr) { + if (cf->commitlog() != nullptr) { commitlog_entry_writer cew(s, m); - return cf.commitlog()->add_entry(uuid, cew).then([&m, this, s](auto rp) { + return cf->commitlog()->add_entry(uuid, cew).then([&m, this, s](auto rp) { return this->apply_in_memory(m, s, rp).handle_exception([this, s, &m] (auto ep) { try { std::rethrow_exception(ep); @@ -2620,8 +2620,8 @@ future<> database::apply_streaming_mutation(schema_ptr s, utils::UUID plan_id, c } return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, plan_id, fragmented, s = std::move(s)] { auto uuid = m.column_family_id(); - auto& cf = find_column_family(uuid); - cf.apply_streaming_mutation(s, plan_id, std::move(m), fragmented); + auto cf = find_column_family(uuid); + cf->apply_streaming_mutation(s, plan_id, std::move(m), fragmented); }); } @@ -2743,8 +2743,8 @@ future<> database::flush_all_memtables() { future<> database::truncate(sstring ksname, sstring cfname, timestamp_func tsf) { auto& ks = find_keyspace(ksname); - auto& cf = find_column_family(ksname, cfname); - return truncate(ks, cf, std::move(tsf)); + auto cf = find_column_family(ksname, cfname); + return truncate(ks, *cf, std::move(tsf)); } future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_func tsf) @@ -2810,8 +2810,8 @@ future<> database::clear_snapshot(sstring tag, std::vector keyspace_nam return parallel_for_each(keyspaces, [this, tag] (auto& ks) { return parallel_for_each(ks.get().metadata()->cf_meta_data(), [this, tag] (auto& pair) { - auto& cf = this->find_column_family(pair.second); - return cf.clear_snapshot(tag); + auto cf = this->find_column_family(pair.second); + return cf->clear_snapshot(tag); }).then_wrapped([] (future<> f) { dblog.debug("Cleared out snapshot directories"); }); diff --git a/database.hh b/database.hh index 0e728098ee..a547648a51 100644 --- a/database.hh +++ b/database.hh @@ -1061,12 +1061,12 @@ public: void drop_keyspace(const sstring& name); const auto& keyspaces() const { return _keyspaces; } std::vector get_non_system_keyspaces() const; - column_family& find_column_family(const sstring& ks, const sstring& name); - const column_family& find_column_family(const sstring& ks, const sstring& name) const; - column_family& find_column_family(const utils::UUID&); - const column_family& find_column_family(const utils::UUID&) const; - column_family& find_column_family(const schema_ptr&); - const column_family& find_column_family(const schema_ptr&) const; + lw_shared_ptr find_column_family(const sstring& ks, const sstring& name); + const lw_shared_ptr find_column_family(const sstring& ks, const sstring& name) const; + lw_shared_ptr find_column_family(const utils::UUID&); + const lw_shared_ptr find_column_family(const utils::UUID&) const; + lw_shared_ptr find_column_family(const schema_ptr&); + const lw_shared_ptr find_column_family(const schema_ptr&) const; bool column_family_exists(const utils::UUID& uuid) const; schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name) const; schema_ptr find_schema(const utils::UUID&) const; diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index 3751a3c04c..ea4864013e 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -251,25 +251,25 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer // TODO: might need better verification that the deserialized mutation // is schema compatible. My guess is that just applying the mutation // will not do this. - auto& cf = db.find_column_family(fm.column_family_id()); + auto cf = db.find_column_family(fm.column_family_id()); if (logger.is_enabled(logging::log_level::debug)) { logger.debug("replaying at {} v={} {}:{} at {}", fm.column_family_id(), fm.schema_version(), - cf.schema()->ks_name(), cf.schema()->cf_name(), rp); + cf->schema()->ks_name(), cf->schema()->cf_name(), rp); } // Removed forwarding "new" RP. Instead give none/empty. // This is what origin does, and it should be fine. // The end result should be that once sstables are flushed out // their "replay_position" attribute will be empty, which is // lower than anything the new session will produce. - if (cf.schema()->version() != fm.schema_version()) { + if (cf->schema()->version() != fm.schema_version()) { const column_mapping& cm = cm_it->second; - mutation m(fm.decorated_key(*cf.schema()), cf.schema()); - converting_mutation_partition_applier v(cm, *cf.schema(), m.partition()); + mutation m(fm.decorated_key(*cf->schema()), cf->schema()); + converting_mutation_partition_applier v(cm, *cf->schema(), m.partition()); fm.partition().accept(cm, v); - cf.apply(std::move(m)); + cf->apply(std::move(m)); } else { - cf.apply(fm, cf.schema()); + cf->apply(fm, cf->schema()); } s->applied_mutations++; return make_ready_future<>(); diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 1b814a38b5..296ed21bdd 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -615,8 +615,8 @@ future<> do_merge_schema(distributed& proxy, std::vector if (do_flush) { proxy.local().get_db().invoke_on_all([s, cfs = std::move(column_families)] (database& db) { return parallel_for_each(cfs.begin(), cfs.end(), [&db] (auto& id) { - auto& cf = db.find_column_family(id); - return cf.flush(); + auto cf = db.find_column_family(id); + return cf->flush(); }); }).get(); } @@ -699,17 +699,17 @@ future> merge_keyspaces(distributed& p } static future<> update_column_family(database& db, schema_ptr new_schema) { - column_family& cfm = db.find_column_family(new_schema->id()); + auto cfm = db.find_column_family(new_schema->id()); keyspace& ks = db.find_keyspace(new_schema->ks_name()); - bool columns_changed = !cfm.schema()->equal_columns(*new_schema); + bool columns_changed = !cfm->schema()->equal_columns(*new_schema); auto s = local_schema_registry().learn(new_schema); s->registry_entry()->mark_synced(); - cfm.set_schema(std::move(s)); + cfm->set_schema(std::move(s)); ks.metadata()->add_or_update_column_family(new_schema); - return service::get_local_migration_manager().notify_update_column_family(cfm.schema(), columns_changed); + return service::get_local_migration_manager().notify_update_column_family(cfm->schema(), columns_changed); } // see the comments for merge_keyspaces() @@ -751,8 +751,8 @@ static void merge_tables(distributed& proxy, auto& ks = db.find_keyspace(s->ks_name()); auto cfg = ks.make_column_family_config(*s, db.get_config()); db.add_column_family(s, cfg); - auto& cf = db.find_column_family(s); - cf.mark_ready_for_writes(); + auto cf = db.find_column_family(s); + cf->mark_ready_for_writes(); ks.make_directory_for_column_family(s->cf_name(), s->id()).get(); service::get_local_migration_manager().notify_create_column_family(s).get(); } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 5cdf4a6053..9f1cc34911 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -919,8 +919,8 @@ future<> force_blocking_flush(sstring cfname) { assert(qctx); return qctx->_db.invoke_on_all([cfname = std::move(cfname)](database& db) { // if (!Boolean.getBoolean("cassandra.unsafesystem")) - column_family& cf = db.find_column_family(NAME, cfname); - return cf.flush(); + auto cf = db.find_column_family(NAME, cfname); + return cf->flush(); }); } diff --git a/repair/repair.cc b/repair/repair.cc index 67c477db9c..4e15b1b7a3 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -359,9 +359,9 @@ std::ostream& operator<<(std::ostream& out, const partition_checksum& c) { static future checksum_range_shard(database &db, const sstring& keyspace_name, const sstring& cf_name, const ::nonwrapping_range& range, repair_checksum hash_version) { - auto& cf = db.find_column_family(keyspace_name, cf_name); - return do_with(dht::to_partition_range(range), [&cf, hash_version] (const auto& partition_range) { - auto reader = cf.make_streaming_reader(cf.schema(), partition_range); + auto cf = db.find_column_family(keyspace_name, cf_name); + return do_with(dht::to_partition_range(range), [cf, hash_version] (const auto& partition_range) { + auto reader = cf->make_streaming_reader(cf->schema(), partition_range); return do_with(std::move(reader), partition_checksum(), [hash_version] (auto& reader, auto& checksum) { return repeat([&reader, &checksum, hash_version] () { @@ -485,7 +485,7 @@ static future<> repair_cf_range(seastar::sharded& db, // FIXME: column_family should have a method to estimate the number of // partitions (and of course it should use cardinality estimation bitmaps, // not trivial sum). We shouldn't have this ugly code here... - auto sstables = db.local().find_column_family(keyspace, cf).get_sstables(); + auto sstables = db.local().find_column_family(keyspace, cf)->get_sstables(); uint64_t estimated_partitions = 0; for (auto sst : *sstables) { estimated_partitions += sst->get_estimated_key_count(); diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 65b8563ad1..ccdd0dec4a 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -453,7 +453,7 @@ future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool f #endif try { auto& db = get_local_storage_proxy().get_db().local(); - auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name()).schema(); // FIXME: Should we lookup by id? + auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name())->schema(); // FIXME: Should we lookup by id? #if 0 oldCfm.validateCompatility(cfm); #endif diff --git a/service/storage_service.cc b/service/storage_service.cc index 31274eb025..d7ff428c45 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1684,8 +1684,8 @@ future<> storage_service::do_stop_stream_manager() { future<> check_snapshot_not_exist(database& db, sstring ks_name, sstring name) { auto& ks = db.find_keyspace(ks_name); return parallel_for_each(ks.metadata()->cf_meta_data(), [&db, ks_name = std::move(ks_name), name = std::move(name)] (auto& pair) { - auto& cf = db.find_column_family(pair.second); - return cf.snapshot_exists(name).then([ks_name = std::move(ks_name), name] (bool exists) { + auto cf = db.find_column_family(pair.second); + return cf->snapshot_exists(name).then([ks_name = std::move(ks_name), name] (bool exists) { if (exists) { throw std::runtime_error(sprint("Keyspace %s: snapshot %s already exists.", ks_name, name)); } @@ -1715,8 +1715,8 @@ future<> storage_service::take_snapshot(sstring tag, std::vector keyspa return parallel_for_each(keyspace_names, [&db, tag = std::move(tag)] (auto& ks_name) { auto& ks = db.find_keyspace(ks_name); return parallel_for_each(ks.metadata()->cf_meta_data(), [&db, tag = std::move(tag)] (auto& pair) { - auto& cf = db.find_column_family(pair.second); - return cf.snapshot(tag); + auto cf = db.find_column_family(pair.second); + return cf->snapshot(tag); }); }); }); @@ -1747,8 +1747,8 @@ future<> storage_service::take_column_family_snapshot(sstring ks_name, sstring c }).then([this, ks_name = std::move(ks_name), cf_name = std::move(cf_name), tag = std::move(tag)] { return check_snapshot_not_exist(_db.local(), ks_name, tag).then([this, ks_name, cf_name, tag] { return _db.invoke_on_all([ks_name, cf_name, tag] (database &db) { - auto& cf = db.find_column_family(ks_name, cf_name); - return cf.snapshot(tag); + auto cf = db.find_column_family(ks_name, cf_name); + return cf->snapshot(tag); }); }); }); @@ -1813,8 +1813,8 @@ storage_service::get_snapshot_details() { std::vector details; for (auto&& snap_map: pair.second) { - auto& cf = _db.local().find_column_family(snap_map.first); - details.push_back({ snap_map.second.live, snap_map.second.total, cf.schema()->cf_name(), cf.schema()->ks_name() }); + auto cf = _db.local().find_column_family(snap_map.first); + details.push_back({ snap_map.second.live, snap_map.second.total, cf->schema()->cf_name(), cf->schema()->ks_name() }); } result.emplace(pair.first, std::move(details)); } @@ -2652,8 +2652,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { // The statement above is valid at least from the Scylla side of things: it is still totally possible // that someones just copies the table over existing ones. There isn't much we can do about it. return _db.map_reduce(max_element(), [ks_name, cf_name] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - return cf.disable_sstable_write(); + auto cf = db.find_column_family(ks_name, cf_name); + return cf->disable_sstable_write(); }).then([this, cf_name, ks_name] (int64_t max_seen_sstable) { // Then, we will reshuffle the tables to make sure that the generation numbers don't go too high. // We will do all of it the same CPU, to make sure that we won't have two parallel shufflers stepping @@ -2674,17 +2674,17 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { // We provide to reshuffle_sstables() the generation of all existing sstables, such that it will // easily know which sstables are new. return _db.map_reduce(all_generations(), [ks_name, cf_name] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); + auto cf = db.find_column_family(ks_name, cf_name); std::set generations; - for (auto& p : *(cf.get_sstables())) { + for (auto& p : *(cf->get_sstables())) { generations.insert(p->generation()); } return make_ready_future>(std::move(generations)); }).then([this, max_seen_sstable, ks_name, cf_name] (std::set all_generations) { auto shard = std::hash()(cf_name) % smp::count; return _db.invoke_on(shard, [ks_name, cf_name, max_seen_sstable, all_generations = std::move(all_generations)] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - return cf.reshuffle_sstables(std::move(all_generations), max_seen_sstable + 1); + auto cf = db.find_column_family(ks_name, cf_name); + return cf->reshuffle_sstables(std::move(all_generations), max_seen_sstable + 1); }); }); }).then_wrapped([this, ks_name, cf_name] (future> f) { @@ -2708,8 +2708,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { logger.debug("Now accepting writes for sstables with generation larger or equal than {}", new_gen); return _db.invoke_on_all([ks_name, cf_name, new_gen] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - auto disabled = std::chrono::duration_cast(cf.enable_sstable_write(new_gen)).count(); + auto cf = db.find_column_family(ks_name, cf_name); + auto disabled = std::chrono::duration_cast(cf->enable_sstable_write(new_gen)).count(); logger.info("CF {}.{} at shard {} had SSTables writes disabled for {} usec", ks_name, cf_name, engine().cpu_id(), disabled); return make_ready_future<>(); }).then([new_tables = std::move(new_tables), eptr = std::move(eptr)] { @@ -2721,8 +2721,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { }).then([this, ks_name, cf_name] (std::vector new_tables) { auto shard = std::hash()(cf_name) % smp::count; return _db.invoke_on(shard, [ks_name, cf_name] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - return cf.flush_upload_dir(); + auto cf = db.find_column_family(ks_name, cf_name); + return cf->flush_upload_dir(); }).then([new_tables = std::move(new_tables), ks_name, cf_name] (std::vector new_tables_from_upload) mutable { if (new_tables.empty() && new_tables_from_upload.empty()) { logger.info("No new SSTables were found for {}.{}", ks_name, cf_name); @@ -2733,8 +2733,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { }); }).then([this, ks_name, cf_name] (std::vector new_tables) { return _db.invoke_on_all([ks_name = std::move(ks_name), cf_name = std::move(cf_name), new_tables = std::move(new_tables)] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - return cf.load_new_sstables(new_tables).then([ks_name = std::move(ks_name), cf_name = std::move(cf_name)] { + auto cf = db.find_column_family(ks_name, cf_name); + return cf->load_new_sstables(new_tables).then([ks_name = std::move(ks_name), cf_name = std::move(cf_name)] { logger.info("Done loading new SSTables for {}.{}", ks_name, cf_name); }); }); @@ -3212,9 +3212,9 @@ calculate_splits(std::vector tokens, uint32_t split_count, column_fa std::vector, uint64_t>> storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, range range, uint32_t keys_per_split) { using range_type = nonwrapping_range; - auto& cf = _db.local().find_column_family(ks_name, cf_name); - auto schema = cf.schema(); - auto sstables = cf.get_sstables(); + auto cf = _db.local().find_column_family(ks_name, cf_name); + auto schema = cf->schema(); + auto sstables = cf->get_sstables(); uint64_t total_row_count_estimate = 0; std::vector tokens; std::vector unwrapped; @@ -3230,7 +3230,7 @@ storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, rang std::vector range_tokens; for (auto &&sst : *sstables) { total_row_count_estimate += sst->estimated_keys_for_range(r); - auto keys = sst->get_key_samples(*cf.schema(), r); + auto keys = sst->get_key_samples(*cf->schema(), r); std::transform(keys.begin(), keys.end(), std::back_inserter(range_tokens), [](auto&& k) { return std::move(k.token()); }); } std::sort(range_tokens.begin(), range_tokens.end()); @@ -3243,7 +3243,7 @@ storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, rang uint64_t max_split_count = tokens.size() / min_samples_per_split + 1; uint32_t split_count = std::max(uint32_t(1), static_cast(std::min(max_split_count, total_row_count_estimate / keys_per_split))); - return calculate_splits(std::move(tokens), split_count, cf); + return calculate_splits(std::move(tokens), split_count, *cf); }; } // namespace service diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 991ea760be..25b981d12d 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -158,12 +158,12 @@ void stream_session::init_messaging_service_handler() { } std::vector query_ranges; try { - auto& cf = db.find_column_family(cf_id); + auto cf = db.find_column_family(cf_id); query_ranges.reserve(ranges.size()); for (auto& range : ranges) { query_ranges.push_back(dht::to_partition_range(range)); } - return cf.flush_streaming_mutations(plan_id, std::move(query_ranges)); + return cf->flush_streaming_mutations(plan_id, std::move(query_ranges)); } catch (no_such_column_family) { sslog.warn("[Stream #{}] STREAM_MUTATION_DONE from {}: cf_id={} is missing, assume the table is dropped", plan_id, from, cf_id); @@ -386,26 +386,26 @@ void stream_session::start_streaming_files() { } } -std::vector stream_session::get_column_family_stores(const sstring& keyspace, const std::vector& column_families) { +std::vector> stream_session::get_column_family_stores(const sstring& keyspace, const std::vector& column_families) { // if columnfamilies are not specified, we add all cf under the keyspace - std::vector stores; + std::vector> stores; auto& db = get_local_db(); if (column_families.empty()) { for (auto& x : db.get_column_families()) { - column_family& cf = *(x.second); - auto cf_name = cf.schema()->cf_name(); - auto ks_name = cf.schema()->ks_name(); + auto cf = x.second; + auto cf_name = cf->schema()->cf_name(); + auto ks_name = cf->schema()->ks_name(); if (ks_name == keyspace) { sslog.debug("Find ks={} cf={}", ks_name, cf_name); - stores.push_back(&cf); + stores.push_back(cf); } } } else { // TODO: We can move this to database class and use shared_ptr instead for (auto& cf_name : column_families) { try { - auto& x = db.find_column_family(keyspace, cf_name); - stores.push_back(&x); + auto x = db.find_column_family(keyspace, cf_name); + stores.push_back(x); } catch (no_such_column_family) { sslog.warn("stream_session: {}.{} does not exist: {}\n", keyspace, cf_name, std::current_exception()); continue; @@ -434,8 +434,8 @@ future<> stream_session::receiving_failed(UUID cf_id) { return get_db().invoke_on_all([cf_id, plan_id = plan_id()] (database& db) { try { - auto& cf = db.find_column_family(cf_id); - return cf.fail_streaming_mutations(plan_id); + auto cf = db.find_column_family(cf_id); + return cf->fail_streaming_mutations(plan_id); } catch (no_such_column_family) { return make_ready_future<>(); } diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index fc253d58ac..21be3ad00d 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -255,7 +255,7 @@ public: */ void add_transfer_ranges(sstring keyspace, std::vector> ranges, std::vector column_families); - std::vector get_column_family_stores(const sstring& keyspace, const std::vector& column_families); + std::vector> get_column_family_stores(const sstring& keyspace, const std::vector& column_families); void close_session(stream_session_state final_state); diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index bdbfc6eb2a..bb3c550ab4 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -108,8 +108,8 @@ future<> do_send_mutations(auto si, auto fm, bool fragmented) { } future<> send_mutations(auto si) { - auto& cf = si->db.find_column_family(si->cf_id); - return do_with(cf.make_streaming_reader(cf.schema(), si->pr), [si] (auto& reader) { + auto cf = si->db.find_column_family(si->cf_id); + return do_with(cf->make_streaming_reader(cf->schema(), si->pr), [si] (auto& reader) { return repeat([si, &reader] () { return reader().then([si] (auto smopt) { if (smopt && si->db.column_family_exists(si->cf_id)) { diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index 34c3f9fa51..67f4e9ffcd 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -176,8 +176,8 @@ public: const sstring& column_name, data_value expected) override { auto& db = _db->local(); - auto& cf = db.find_column_family(ks_name, table_name); - auto schema = cf.schema(); + auto cf = db.find_column_family(ks_name, table_name); + auto schema = cf->schema(); auto pkey = partition_key::from_deeply_exploded(*schema, pk); auto ckey = clustering_key::from_deeply_exploded(*schema, ck); auto exp = expected.type()->decompose(expected); @@ -189,9 +189,9 @@ public: column_name = std::move(column_name), exp = std::move(exp), table_name = std::move(table_name)] (database& db) mutable { - auto& cf = db.find_column_family(ks_name, table_name); - auto schema = cf.schema(); - return cf.find_partition_slow(schema, pkey) + auto cf = db.find_column_family(ks_name, table_name); + auto schema = cf->schema(); + return cf->find_partition_slow(schema, pkey) .then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) { assert(p != nullptr); auto row = p->find_row(ckey);