snap: Get rid of storage_service reference in schema.cc
Now when the snapshot stopping is correctly handled, we may pull the database reference all the way down to the schema::describe(). One tricky place is in table::napshot() -- the local db reference is pulled through an smp::submit_to call, but thanks to the shard checks in the place where it is needed the db is still "local" Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -1830,7 +1830,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
|
||||
future<> f = make_ready_future<>();
|
||||
if (auto_snapshot) {
|
||||
auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name());
|
||||
f = cf.snapshot(name);
|
||||
f = cf.snapshot(*this, name);
|
||||
}
|
||||
return f.then([this, &cf, truncated_at, low_mark, should_flush] {
|
||||
return cf.discard_sstables(truncated_at).then([this, &cf, truncated_at, low_mark, should_flush](db::replay_position rp) {
|
||||
|
||||
@@ -810,7 +810,7 @@ public:
|
||||
|
||||
db::replay_position set_low_replay_position_mark();
|
||||
|
||||
future<> snapshot(sstring name);
|
||||
future<> snapshot(database& db, sstring name);
|
||||
future<std::unordered_map<sstring, snapshot_details>> get_snapshot_details();
|
||||
|
||||
/*!
|
||||
@@ -829,7 +829,7 @@ public:
|
||||
* CREATE INDEX command.
|
||||
* The same is true for local index and MATERIALIZED VIEW.
|
||||
*/
|
||||
future<> write_schema_as_cql(sstring dir) const;
|
||||
future<> write_schema_as_cql(database& db, sstring dir) const;
|
||||
|
||||
const bool incremental_backups_enabled() const {
|
||||
return _config.enable_incremental_backups;
|
||||
|
||||
@@ -91,7 +91,7 @@ future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_
|
||||
auto& ks = db.find_keyspace(ks_name);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [&db, tag = std::move(tag)] (auto& pair) {
|
||||
auto& cf = db.find_column_family(pair.second);
|
||||
return cf.snapshot(tag);
|
||||
return cf.snapshot(db, tag);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -119,7 +119,7 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
|
||||
}
|
||||
return _db.invoke_on_all([ks_name, table_name, tag] (database &db) {
|
||||
auto& cf = db.find_column_family(ks_name, table_name);
|
||||
return cf.snapshot(tag);
|
||||
return cf.snapshot(db, tag);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
15
schema.cc
15
schema.cc
@@ -36,7 +36,6 @@
|
||||
#include "view_info.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "database.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "dht/token-sharding.hh"
|
||||
#include "cdc/cdc_extension.hh"
|
||||
@@ -709,22 +708,22 @@ std::ostream& column_definition_as_cql_key(std::ostream& os, const column_defini
|
||||
return os;
|
||||
}
|
||||
|
||||
static bool is_global_index(const utils::UUID& id, const schema& s) {
|
||||
return service::get_local_storage_service().db().local().find_column_family(id).get_index_manager().is_global_index(s);
|
||||
static bool is_global_index(database& db, const utils::UUID& id, const schema& s) {
|
||||
return db.find_column_family(id).get_index_manager().is_global_index(s);
|
||||
}
|
||||
|
||||
static bool is_index(const utils::UUID& id, const schema& s) {
|
||||
return service::get_local_storage_service().db().local().find_column_family(id).get_index_manager().is_index(s);
|
||||
static bool is_index(database& db, const utils::UUID& id, const schema& s) {
|
||||
return db.find_column_family(id).get_index_manager().is_index(s);
|
||||
}
|
||||
|
||||
|
||||
std::ostream& schema::describe(std::ostream& os) const {
|
||||
std::ostream& schema::describe(database& db, std::ostream& os) const {
|
||||
os << "CREATE ";
|
||||
int n = 0;
|
||||
|
||||
if (is_view()) {
|
||||
if (is_index(view_info()->base_id(), *this)) {
|
||||
auto is_local = !is_global_index(view_info()->base_id(), *this);
|
||||
if (is_index(db, view_info()->base_id(), *this)) {
|
||||
auto is_local = !is_global_index(db, view_info()->base_id(), *this);
|
||||
|
||||
os << "INDEX " << cql3::util::maybe_quote(secondary_index::index_name_from_table_name(cf_name())) << " ON "
|
||||
<< cql3::util::maybe_quote(ks_name()) << "." << cql3::util::maybe_quote(view_info()->base_name()) << "(";
|
||||
|
||||
@@ -51,6 +51,8 @@ class sharder;
|
||||
|
||||
}
|
||||
|
||||
class database;
|
||||
|
||||
using column_count_type = uint32_t;
|
||||
|
||||
// Column ID, unique within column_kind
|
||||
@@ -945,7 +947,7 @@ public:
|
||||
* Index or Local Index).
|
||||
*
|
||||
*/
|
||||
std::ostream& describe(std::ostream& os) const;
|
||||
std::ostream& describe(database& db, std::ostream& os) const;
|
||||
friend bool operator==(const schema&, const schema&);
|
||||
const column_mapping& get_column_mapping() const;
|
||||
friend class schema_registry_entry;
|
||||
|
||||
22
table.cc
22
table.cc
@@ -1523,10 +1523,10 @@ seal_snapshot(sstring jsondir) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> table::write_schema_as_cql(sstring dir) const {
|
||||
future<> table::write_schema_as_cql(database& db, sstring dir) const {
|
||||
std::ostringstream ss;
|
||||
try {
|
||||
this->schema()->describe(ss);
|
||||
this->schema()->describe(db, ss);
|
||||
} catch (...) {
|
||||
return make_exception_future<>(std::current_exception());
|
||||
}
|
||||
@@ -1546,11 +1546,11 @@ future<> table::write_schema_as_cql(sstring dir) const {
|
||||
|
||||
}
|
||||
|
||||
future<> table::snapshot(sstring name) {
|
||||
return flush().then([this, name = std::move(name)]() {
|
||||
return with_semaphore(_sstable_deletion_sem, 1, [this, name = std::move(name)]() {
|
||||
future<> table::snapshot(database& db, sstring name) {
|
||||
return flush().then([this, &db, name = std::move(name)]() {
|
||||
return with_semaphore(_sstable_deletion_sem, 1, [this, &db, name = std::move(name)]() {
|
||||
auto tables = boost::copy_range<std::vector<sstables::shared_sstable>>(*_sstables->all());
|
||||
return do_with(std::move(tables), [this, name](std::vector<sstables::shared_sstable> & tables) {
|
||||
return do_with(std::move(tables), [this, &db, name](std::vector<sstables::shared_sstable> & tables) {
|
||||
auto jsondir = _config.datadir + "/snapshots/" + name;
|
||||
return io_check([jsondir] { return recursive_touch_directory(jsondir); }).then([this, name, jsondir, &tables] {
|
||||
return parallel_for_each(tables, [name](sstables::shared_sstable sstable) {
|
||||
@@ -1572,7 +1572,7 @@ future<> table::snapshot(sstring name) {
|
||||
});
|
||||
}).then([jsondir, &tables] {
|
||||
return io_check(sync_directory, std::move(jsondir));
|
||||
}).finally([this, &tables, jsondir] {
|
||||
}).finally([this, &tables, &db, jsondir] {
|
||||
auto shard = std::hash<sstring>()(jsondir) % smp::count;
|
||||
std::unordered_set<sstring> table_names;
|
||||
for (auto& sst : tables) {
|
||||
@@ -1580,7 +1580,7 @@ future<> table::snapshot(sstring name) {
|
||||
auto rf = f.substr(sst->get_dir().size() + 1);
|
||||
table_names.insert(std::move(rf));
|
||||
}
|
||||
return smp::submit_to(shard, [requester = this_shard_id(), jsondir = std::move(jsondir), this,
|
||||
return smp::submit_to(shard, [requester = this_shard_id(), jsondir = std::move(jsondir), this, &db,
|
||||
tables = std::move(table_names), datadir = _config.datadir] {
|
||||
|
||||
if (pending_snapshots.count(jsondir) == 0) {
|
||||
@@ -1595,8 +1595,10 @@ future<> table::snapshot(sstring name) {
|
||||
auto my_work = make_ready_future<>();
|
||||
if (requester == this_shard_id()) {
|
||||
my_work = snapshot->requests.wait(smp::count).then([jsondir = std::move(jsondir),
|
||||
snapshot, this] {
|
||||
return write_schema_as_cql(jsondir).handle_exception([jsondir](std::exception_ptr ptr) {
|
||||
&db, snapshot, this] {
|
||||
// this_shard_id() here == requester == this_shard_id() before submit_to() above,
|
||||
// so the db reference is still local
|
||||
return write_schema_as_cql(db, jsondir).handle_exception([jsondir](std::exception_ptr ptr) {
|
||||
tlogger.error("Failed writing schema file in snapshot in {} with exception {}", jsondir, ptr);
|
||||
return make_ready_future<>();
|
||||
}).finally([jsondir = std::move(jsondir), snapshot] () mutable {
|
||||
|
||||
@@ -4050,7 +4050,7 @@ SEASTAR_TEST_CASE(test_describe_simple_schema) {
|
||||
auto schema = e.local_db().find_schema("ks", ct.first);
|
||||
std::ostringstream ss;
|
||||
|
||||
schema->describe(ss);
|
||||
schema->describe(e.local_db(), ss);
|
||||
BOOST_CHECK_EQUAL(normalize_white_space(ss.str()), normalize_white_space(ct.second));
|
||||
}
|
||||
});
|
||||
@@ -4118,12 +4118,12 @@ SEASTAR_TEST_CASE(test_describe_view_schema) {
|
||||
auto schema = e.local_db().find_schema("KS", ct.first);
|
||||
std::ostringstream ss;
|
||||
|
||||
schema->describe(ss);
|
||||
schema->describe(e.local_db(), ss);
|
||||
BOOST_CHECK_EQUAL(normalize_white_space(ss.str()), normalize_white_space(ct.second));
|
||||
|
||||
auto base_schema = e.local_db().find_schema("KS", "cF");
|
||||
std::ostringstream base_ss;
|
||||
base_schema->describe(base_ss);
|
||||
base_schema->describe(e.local_db(), base_ss);
|
||||
BOOST_CHECK_EQUAL(normalize_white_space(base_ss.str()), normalize_white_space(base_table));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -361,7 +361,7 @@ future<> do_with_some_data(std::function<future<> (cql_test_env& env)> func) {
|
||||
future<> take_snapshot(cql_test_env& e) {
|
||||
return e.db().invoke_on_all([] (database& db) {
|
||||
auto& cf = db.find_column_family("ks", "cf");
|
||||
return cf.snapshot("test");
|
||||
return cf.snapshot(db, "test");
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user