From f045cec5863f26046e38ce6cdefac3343d328948 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 19 Jun 2020 16:20:20 +0300 Subject: [PATCH] snap: Get rid of storage_service reference in schema.cc Now when the snapshot stopping is correctly handled, we may pull the database reference all the way down to the schema::describe(). One tricky place is in table::napshot() -- the local db reference is pulled through an smp::submit_to call, but thanks to the shard checks in the place where it is needed the db is still "local" Signed-off-by: Pavel Emelyanov --- database.cc | 2 +- database.hh | 4 ++-- db/snapshot-ctl.cc | 4 ++-- schema.cc | 15 +++++++-------- schema.hh | 4 +++- table.cc | 22 ++++++++++++---------- test/boost/cql_query_test.cc | 6 +++--- test/boost/database_test.cc | 2 +- 8 files changed, 31 insertions(+), 28 deletions(-) diff --git a/database.cc b/database.cc index 1c2c34082d..be1d5c9e5c 100644 --- a/database.cc +++ b/database.cc @@ -1830,7 +1830,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun future<> f = make_ready_future<>(); if (auto_snapshot) { auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name()); - f = cf.snapshot(name); + f = cf.snapshot(*this, name); } return f.then([this, &cf, truncated_at, low_mark, should_flush] { return cf.discard_sstables(truncated_at).then([this, &cf, truncated_at, low_mark, should_flush](db::replay_position rp) { diff --git a/database.hh b/database.hh index 946084c1ec..0f0a50f629 100644 --- a/database.hh +++ b/database.hh @@ -810,7 +810,7 @@ public: db::replay_position set_low_replay_position_mark(); - future<> snapshot(sstring name); + future<> snapshot(database& db, sstring name); future> get_snapshot_details(); /*! @@ -829,7 +829,7 @@ public: * CREATE INDEX command. * The same is true for local index and MATERIALIZED VIEW. */ - future<> write_schema_as_cql(sstring dir) const; + future<> write_schema_as_cql(database& db, sstring dir) const; const bool incremental_backups_enabled() const { return _config.enable_incremental_backups; diff --git a/db/snapshot-ctl.cc b/db/snapshot-ctl.cc index 60d6446f33..dfc19b8f74 100644 --- a/db/snapshot-ctl.cc +++ b/db/snapshot-ctl.cc @@ -91,7 +91,7 @@ future<> snapshot_ctl::take_snapshot(sstring tag, std::vector keyspace_ auto& ks = db.find_keyspace(ks_name); return parallel_for_each(ks.metadata()->cf_meta_data(), [&db, tag = std::move(tag)] (auto& pair) { auto& cf = db.find_column_family(pair.second); - return cf.snapshot(tag); + return cf.snapshot(db, tag); }); }); }); @@ -119,7 +119,7 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector< } return _db.invoke_on_all([ks_name, table_name, tag] (database &db) { auto& cf = db.find_column_family(ks_name, table_name); - return cf.snapshot(tag); + return cf.snapshot(db, tag); }); }); }); diff --git a/schema.cc b/schema.cc index 4c9ee0325b..b89642df2d 100644 --- a/schema.cc +++ b/schema.cc @@ -36,7 +36,6 @@ #include "view_info.hh" #include "partition_slice_builder.hh" #include "database.hh" -#include "service/storage_service.hh" #include "dht/i_partitioner.hh" #include "dht/token-sharding.hh" #include "cdc/cdc_extension.hh" @@ -709,22 +708,22 @@ std::ostream& column_definition_as_cql_key(std::ostream& os, const column_defini return os; } -static bool is_global_index(const utils::UUID& id, const schema& s) { - return service::get_local_storage_service().db().local().find_column_family(id).get_index_manager().is_global_index(s); +static bool is_global_index(database& db, const utils::UUID& id, const schema& s) { + return db.find_column_family(id).get_index_manager().is_global_index(s); } -static bool is_index(const utils::UUID& id, const schema& s) { - return service::get_local_storage_service().db().local().find_column_family(id).get_index_manager().is_index(s); +static bool is_index(database& db, const utils::UUID& id, const schema& s) { + return db.find_column_family(id).get_index_manager().is_index(s); } -std::ostream& schema::describe(std::ostream& os) const { +std::ostream& schema::describe(database& db, std::ostream& os) const { os << "CREATE "; int n = 0; if (is_view()) { - if (is_index(view_info()->base_id(), *this)) { - auto is_local = !is_global_index(view_info()->base_id(), *this); + if (is_index(db, view_info()->base_id(), *this)) { + auto is_local = !is_global_index(db, view_info()->base_id(), *this); os << "INDEX " << cql3::util::maybe_quote(secondary_index::index_name_from_table_name(cf_name())) << " ON " << cql3::util::maybe_quote(ks_name()) << "." << cql3::util::maybe_quote(view_info()->base_name()) << "("; diff --git a/schema.hh b/schema.hh index f134caab32..1bd8496363 100644 --- a/schema.hh +++ b/schema.hh @@ -51,6 +51,8 @@ class sharder; } +class database; + using column_count_type = uint32_t; // Column ID, unique within column_kind @@ -945,7 +947,7 @@ public: * Index or Local Index). * */ - std::ostream& describe(std::ostream& os) const; + std::ostream& describe(database& db, std::ostream& os) const; friend bool operator==(const schema&, const schema&); const column_mapping& get_column_mapping() const; friend class schema_registry_entry; diff --git a/table.cc b/table.cc index fdd95a818d..a7ea89d0ee 100644 --- a/table.cc +++ b/table.cc @@ -1523,10 +1523,10 @@ seal_snapshot(sstring jsondir) { }); } -future<> table::write_schema_as_cql(sstring dir) const { +future<> table::write_schema_as_cql(database& db, sstring dir) const { std::ostringstream ss; try { - this->schema()->describe(ss); + this->schema()->describe(db, ss); } catch (...) { return make_exception_future<>(std::current_exception()); } @@ -1546,11 +1546,11 @@ future<> table::write_schema_as_cql(sstring dir) const { } -future<> table::snapshot(sstring name) { - return flush().then([this, name = std::move(name)]() { - return with_semaphore(_sstable_deletion_sem, 1, [this, name = std::move(name)]() { +future<> table::snapshot(database& db, sstring name) { + return flush().then([this, &db, name = std::move(name)]() { + return with_semaphore(_sstable_deletion_sem, 1, [this, &db, name = std::move(name)]() { auto tables = boost::copy_range>(*_sstables->all()); - return do_with(std::move(tables), [this, name](std::vector & tables) { + return do_with(std::move(tables), [this, &db, name](std::vector & tables) { auto jsondir = _config.datadir + "/snapshots/" + name; return io_check([jsondir] { return recursive_touch_directory(jsondir); }).then([this, name, jsondir, &tables] { return parallel_for_each(tables, [name](sstables::shared_sstable sstable) { @@ -1572,7 +1572,7 @@ future<> table::snapshot(sstring name) { }); }).then([jsondir, &tables] { return io_check(sync_directory, std::move(jsondir)); - }).finally([this, &tables, jsondir] { + }).finally([this, &tables, &db, jsondir] { auto shard = std::hash()(jsondir) % smp::count; std::unordered_set table_names; for (auto& sst : tables) { @@ -1580,7 +1580,7 @@ future<> table::snapshot(sstring name) { auto rf = f.substr(sst->get_dir().size() + 1); table_names.insert(std::move(rf)); } - return smp::submit_to(shard, [requester = this_shard_id(), jsondir = std::move(jsondir), this, + return smp::submit_to(shard, [requester = this_shard_id(), jsondir = std::move(jsondir), this, &db, tables = std::move(table_names), datadir = _config.datadir] { if (pending_snapshots.count(jsondir) == 0) { @@ -1595,8 +1595,10 @@ future<> table::snapshot(sstring name) { auto my_work = make_ready_future<>(); if (requester == this_shard_id()) { my_work = snapshot->requests.wait(smp::count).then([jsondir = std::move(jsondir), - snapshot, this] { - return write_schema_as_cql(jsondir).handle_exception([jsondir](std::exception_ptr ptr) { + &db, snapshot, this] { + // this_shard_id() here == requester == this_shard_id() before submit_to() above, + // so the db reference is still local + return write_schema_as_cql(db, jsondir).handle_exception([jsondir](std::exception_ptr ptr) { tlogger.error("Failed writing schema file in snapshot in {} with exception {}", jsondir, ptr); return make_ready_future<>(); }).finally([jsondir = std::move(jsondir), snapshot] () mutable { diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index 59647753a3..0aab36b734 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -4050,7 +4050,7 @@ SEASTAR_TEST_CASE(test_describe_simple_schema) { auto schema = e.local_db().find_schema("ks", ct.first); std::ostringstream ss; - schema->describe(ss); + schema->describe(e.local_db(), ss); BOOST_CHECK_EQUAL(normalize_white_space(ss.str()), normalize_white_space(ct.second)); } }); @@ -4118,12 +4118,12 @@ SEASTAR_TEST_CASE(test_describe_view_schema) { auto schema = e.local_db().find_schema("KS", ct.first); std::ostringstream ss; - schema->describe(ss); + schema->describe(e.local_db(), ss); BOOST_CHECK_EQUAL(normalize_white_space(ss.str()), normalize_white_space(ct.second)); auto base_schema = e.local_db().find_schema("KS", "cF"); std::ostringstream base_ss; - base_schema->describe(base_ss); + base_schema->describe(e.local_db(), base_ss); BOOST_CHECK_EQUAL(normalize_white_space(base_ss.str()), normalize_white_space(base_table)); } }); diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index caf1c749a7..155d51ec0c 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -361,7 +361,7 @@ future<> do_with_some_data(std::function (cql_test_env& env)> func) { future<> take_snapshot(cql_test_env& e) { return e.db().invoke_on_all([] (database& db) { auto& cf = db.find_column_family("ks", "cf"); - return cf.snapshot("test"); + return cf.snapshot(db, "test"); }); }