Merge 'Overhaul truncate and snapshot' from Benny Halevy

This series is aimed at fixing #11132.

To get there, the series untangles the functions that currently depend on the the cross-shard coordination in table::snapshot,
namely database::truncate and consequently database::drop_column_family.

database::get_table_on_all_shards is added here as a helper to get a foreign shared ptr of the the table shard from all shards,
and it is later used by multiple functions to truncate and then take a snapshot of the sharded table.

database::truncate_table_on_all_shards is defined to orchestrate the truncate process end-to-end, flushing or clearing all table shards before taking a snapshot if needed, using the newly defined table::snapshot_on_all_shards, and by that leaving only the discard_sstables job to the per-shard database::truncate function.

The latter, snapshot_on_all_shards, orchestrates the snapshot process on all shards - getting rid of the per-shard table::snapshot function (after refactoring take_snapshot and finalize_snapshot out of it), and the associated dreaded data structures: snapshot_manager and pending_snapshots.

Fixes #11132.

Closes #11133

* github.com:scylladb/scylladb:
  table: reindent write_schema_as_cql
  table: coroutinize write_schema_as_cql
  table: seal_snapshot: maybe_yield when iterating over the table names
  table: reindent seal_snapshot
  table: coroutinize seal_snapshot
  table: delete unused snapshot_manager and pending_snapshots
  table: delete unused snapshot function
  table: snapshot_on_all_shards: orchestrate snapshot process
  table: snapshot: move pending_snapshots.erase from seal_snapshot
  table: finalize_snapshot: take the file sets as a param
  table: make seal_snapshot a static member
  table: finalize_snapshot: reindent
  table: refactor finalize_snapshot out of snapshot
  table: snapshot: keep per-shard file sets in snapshot_manager
  table: take_snapshot: return foreign unique ptr
  table: take_snapshot: maybe yield in per-sstable loop
  table: take_snapshot: simplify tables construction code
  table: take_snapshot: reindent
  table: take_snapshot: simplify error handling
  table: refactor take_snapshot out of snapshot
  utils: get rid of joinpoint
  database: get rid of timestamp_func
  database: truncate: snapshot table in all-shards layer
  database: truncate: flush table and views in all-shards layer
  database: truncate: stop and disable compaction in all-shards layer
  database: truncate: move call to set_low_replay_position_mark to all-shards layer
  database: truncate: enter per-shard table async_gate in all-shards layer
  database: truncate: move check for schema_tables keyspace to all-shards layer.
  database: snapshot_table_on_all_shards: reindent
  table: add snapshot_on_all_shards
  database: add snapshot_table_on_all_shards
  database: rename {flush,snapshot}_on_all and make static
  database: drop_table_on_all_shards: truncate and stop table in upper layer
  database: drop_table_on_all_shards: get all table shards before drop_column_family on each
  database: drop_column_family: define table& cf
  database: drop_column_family: reuse uuid for evict_all_for_table
  database: drop_column_family: move log message up a layer
  database: truncate: get rid of the unused ks param
  database: add truncate_table_on_all_shards
  database: drop_table_on_all_shards: do not accept a truncated_at timestamp_func
  database: truncate: get optional snapshot_name from caller
  database: truncate: fix assert about replay_position low_mark
  database_test: apply_mutation on the correct db shard
This commit is contained in:
Avi Kivity
2022-08-07 19:15:42 +03:00
11 changed files with 415 additions and 431 deletions

View File

@@ -691,11 +691,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
ss::force_keyspace_flush.set(r, [&ctx](std::unique_ptr<request> req) -> future<json::json_return_type> {
auto keyspace = validate_keyspace(ctx, req->param);
auto column_families = parse_tables(keyspace, ctx, req->query_parameters, "cf");
auto &db = ctx.db.local();
auto& db = ctx.db;
if (column_families.empty()) {
co_await db.flush_on_all(keyspace);
co_await replica::database::flush_keyspace_on_all_shards(db, keyspace);
} else {
co_await db.flush_on_all(keyspace, std::move(column_families));
co_await replica::database::flush_tables_on_all_shards(db, keyspace, std::move(column_families));
}
co_return json_void();
});

View File

@@ -36,7 +36,6 @@
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/util.hh"
#include "utils/joinpoint.hh"
#include "types/user.hh"
static seastar::logger mlogger("legacy_schema_migrator");
@@ -534,10 +533,9 @@ public:
future<> drop_legacy_tables() {
mlogger.info("Dropping legacy schema tables");
auto ts = db_clock::now();
auto with_snapshot = !_keyspaces.empty();
return parallel_for_each(legacy_schema_tables, [this, ts, with_snapshot](const sstring& cfname) {
return replica::database::drop_table_on_all_shards(_db, db::system_keyspace::NAME, cfname, [ts] { return make_ready_future<db_clock::time_point>(ts); }, with_snapshot);
return parallel_for_each(legacy_schema_tables, [this, with_snapshot](const sstring& cfname) {
return replica::database::drop_table_on_all_shards(_db, db::system_keyspace::NAME, cfname, with_snapshot);
});
}
@@ -568,8 +566,8 @@ public:
}
future<> flush_schemas() {
auto& db = _qp.db().real_database();
return db.flush_on_all(db::schema_tables::NAME, db::schema_tables::all_table_names(schema_features::full()));
auto& db = _qp.db().real_database().container();
return replica::database::flush_tables_on_all_shards(db, db::schema_tables::NAME, db::schema_tables::all_table_names(schema_features::full()));
}
future<> migrate() {

View File

@@ -54,7 +54,6 @@
#include <boost/range/join.hpp>
#include "compaction/compaction_strategy.hh"
#include "utils/joinpoint.hh"
#include "view_info.hh"
#include "cql_type_parser.hh"
#include "db/timeout_clock.hh"
@@ -1114,9 +1113,9 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
co_await proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
if (do_flush) {
auto& db = proxy.local().local_db();
auto& db = proxy.local().get_db();
co_await coroutine::parallel_for_each(column_families, [&db] (const utils::UUID& id) -> future<> {
return db.flush_on_all(id);
return replica::database::flush_table_on_all_shards(db, id);
});
}
}
@@ -1213,9 +1212,6 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
struct schema_diff {
struct dropped_schema {
global_schema_ptr schema;
utils::joinpoint<db_clock::time_point> jp{[] {
return make_ready_future<db_clock::time_point>(db_clock::now());
}};
};
struct altered_schema {
@@ -1326,13 +1322,14 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
// to a mv not finding its schema when snapshoting since the main table
// was already dropped (see https://github.com/scylladb/scylla/issues/5614)
auto& db = proxy.local().get_db();
co_await coroutine::parallel_for_each(views_diff.dropped, [&db] (schema_diff::dropped_schema& dt) {
auto ts = db_clock::now();
co_await coroutine::parallel_for_each(views_diff.dropped, [&db, ts] (schema_diff::dropped_schema& dt) {
auto& s = *dt.schema.get();
return replica::database::drop_table_on_all_shards(db, s.ks_name(), s.cf_name(), [&] { return dt.jp.value(); });
return replica::database::drop_table_on_all_shards(db, s.ks_name(), s.cf_name());
});
co_await coroutine::parallel_for_each(tables_diff.dropped, [&db] (schema_diff::dropped_schema& dt) -> future<> {
co_await coroutine::parallel_for_each(tables_diff.dropped, [&db, ts] (schema_diff::dropped_schema& dt) -> future<> {
auto& s = *dt.schema.get();
return replica::database::drop_table_on_all_shards(db, s.ks_name(), s.cf_name(), [&] { return dt.jp.value(); });
return replica::database::drop_table_on_all_shards(db, s.ks_name(), s.cf_name());
});
co_await proxy.local().get_db().invoke_on_all([&] (replica::database& db) -> future<> {

View File

@@ -72,7 +72,7 @@ future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspa
return check_snapshot_not_exist(ks_name, tag);
});
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), sf] (const auto& ks_name) {
return _db.local().snapshot_on_all(ks_name, tag, bool(sf));
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, bool(sf));
});
}
@@ -101,7 +101,7 @@ future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vect
throw std::invalid_argument("Do not take a snapshot of a materialized view or a secondary index by itself. Run snapshot on the base table instead.");
}
}
co_await _db.local().snapshot_on_all(ks_name, std::move(tables), std::move(tag), bool(sf));
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), bool(sf));
}
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, sstring cf_name, sstring tag, skip_flush sf, allow_view_snapshots av) {

View File

@@ -1003,31 +1003,49 @@ void database::remove(const table& cf) noexcept {
}
}
future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func tsf, bool snapshot) {
auto& ks = find_keyspace(ks_name);
auto uuid = find_uuid(ks_name, cf_name);
lw_shared_ptr<table> cf;
try {
cf = _column_families.at(uuid);
drop_repair_history_map_for_table(uuid);
} catch (std::out_of_range&) {
on_internal_error(dblog, fmt::format("drop_column_family {}.{}: UUID={} not found", ks_name, cf_name, uuid));
}
dblog.debug("Dropping {}.{}", ks_name, cf_name);
remove(*cf);
cf->clear_views();
co_await cf->await_pending_ops();
co_await _querier_cache.evict_all_for_table(cf->schema()->id());
auto f = co_await coroutine::as_future(truncate(ks, *cf, std::move(tsf), snapshot));
co_await cf->stop();
f.get(); // re-throw exception from truncate() if any
future<> database::detach_column_family(table& cf) {
auto uuid = cf.schema()->id();
drop_repair_history_map_for_table(uuid);
remove(cf);
cf.clear_views();
co_await cf.await_pending_ops();
co_await _querier_cache.evict_all_for_table(uuid);
}
future<> database::drop_table_on_all_shards(sharded<database>& sharded_db, sstring ks_name, sstring cf_name, timestamp_func tsf, bool with_snapshot) {
auto table_dir = fs::path(sharded_db.local().find_column_family(ks_name, cf_name).dir());
co_await sharded_db.invoke_on_all([&] (database& db) {
return db.drop_column_family(ks_name, cf_name, tsf, with_snapshot);
future<std::vector<foreign_ptr<lw_shared_ptr<table>>>> database::get_table_on_all_shards(sharded<database>& sharded_db, utils::UUID uuid) {
std::vector<foreign_ptr<lw_shared_ptr<table>>> table_shards;
table_shards.resize(smp::count);
co_await coroutine::parallel_for_each(boost::irange(0u, smp::count), [&] (unsigned shard) -> future<> {
table_shards[shard] = co_await smp::submit_to(shard, [&] {
try {
return make_foreign(sharded_db.local()._column_families.at(uuid));
} catch (std::out_of_range&) {
on_internal_error(dblog, fmt::format("Table UUID={} not found", uuid));
}
});
});
co_return table_shards;
}
future<> database::drop_table_on_all_shards(sharded<database>& sharded_db, sstring ks_name, sstring cf_name, bool with_snapshot) {
auto auto_snapshot = sharded_db.local().get_config().auto_snapshot();
dblog.info("Dropping {}.{} {}snapshot", ks_name, cf_name, with_snapshot && auto_snapshot ? "with auto-" : "without ");
auto uuid = sharded_db.local().find_uuid(ks_name, cf_name);
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
auto table_dir = fs::path(table_shards[this_shard_id()]->dir());
std::optional<sstring> snapshot_name_opt;
if (with_snapshot) {
snapshot_name_opt = format("pre-drop-{}", db_clock::now().time_since_epoch().count());
}
co_await sharded_db.invoke_on_all([&] (database& db) {
return db.detach_column_family(*table_shards[this_shard_id()]);
});
auto f = co_await coroutine::as_future(truncate_table_on_all_shards(sharded_db, table_shards, db_clock::time_point::max(), with_snapshot, std::move(snapshot_name_opt)));
co_await smp::invoke_on_all([&] {
return table_shards[this_shard_id()]->stop();
});
f.get(); // re-throw exception from truncate() if any
co_await sstables::remove_table_directory_if_has_no_snapshots(table_dir);
}
@@ -2274,112 +2292,158 @@ future<> database::flush(const sstring& ksname, const sstring& cfname) {
return cf.flush();
}
future<> database::flush_on_all(utils::UUID id) {
return container().invoke_on_all([id] (replica::database& db) {
future<> database::flush_table_on_all_shards(sharded<database>& sharded_db, utils::UUID id) {
return sharded_db.invoke_on_all([id] (replica::database& db) {
return db.find_column_family(id).flush();
});
}
future<> database::flush_on_all(std::string_view ks_name, std::string_view table_name) {
return flush_on_all(find_uuid(ks_name, table_name));
future<> database::flush_table_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::string_view table_name) {
return flush_table_on_all_shards(sharded_db, sharded_db.local().find_uuid(ks_name, table_name));
}
future<> database::flush_on_all(std::string_view ks_name, std::vector<sstring> table_names) {
return parallel_for_each(table_names, [this, ks_name] (const auto& table_name) {
return flush_on_all(ks_name, table_name);
future<> database::flush_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names) {
return parallel_for_each(table_names, [&, ks_name] (const auto& table_name) {
return flush_table_on_all_shards(sharded_db, ks_name, table_name);
});
}
future<> database::flush_on_all(std::string_view ks_name) {
return parallel_for_each(find_keyspace(ks_name).metadata()->cf_meta_data(), [this] (auto& pair) {
return flush_on_all(pair.second->id());
future<> database::flush_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name) {
auto& ks = sharded_db.local().find_keyspace(ks_name);
return parallel_for_each(ks.metadata()->cf_meta_data(), [&] (auto& pair) {
return flush_table_on_all_shards(sharded_db, pair.second->id());
});
}
future<> database::snapshot_on_all(std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush) {
co_await coroutine::parallel_for_each(table_names, [this, ks_name, tag = std::move(tag), skip_flush] (const auto& table_name) -> future<> {
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring table_name, sstring tag, bool skip_flush) {
if (!skip_flush) {
co_await flush_table_on_all_shards(sharded_db, ks_name, table_name);
}
auto uuid = sharded_db.local().find_uuid(ks_name, table_name);
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag);
}
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush) {
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), skip_flush] (auto& table_name) {
return snapshot_table_on_all_shards(sharded_db, ks_name, std::move(table_name), tag, skip_flush);
});
}
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush) {
auto& ks = sharded_db.local().find_keyspace(ks_name);
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), skip_flush] (const auto& pair) -> future<> {
auto uuid = pair.second->id();
if (!skip_flush) {
co_await flush_on_all(ks_name, table_name);
co_await flush_table_on_all_shards(sharded_db, uuid);
}
co_await container().invoke_on_all([ks_name, &table_name, tag, skip_flush] (replica::database& db) {
auto& t = db.find_column_family(ks_name, table_name);
return t.snapshot(db, tag);
});
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag);
});
}
future<> database::snapshot_on_all(std::string_view ks_name, sstring tag, bool skip_flush) {
auto& ks = find_keyspace(ks_name);
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [this, tag = std::move(tag), skip_flush] (const auto& pair) -> future<> {
if (!skip_flush) {
co_await flush_on_all(pair.second->id());
}
co_await container().invoke_on_all([id = pair.second, tag, skip_flush] (replica::database& db) {
auto& t = db.find_column_family(id);
return t.snapshot(db, tag);
});
});
future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, sstring ks_name, sstring cf_name, std::optional<db_clock::time_point> truncated_at_opt, bool with_snapshot, std::optional<sstring> snapshot_name_opt) {
auto uuid = sharded_db.local().find_uuid(ks_name, cf_name);
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
co_return co_await truncate_table_on_all_shards(sharded_db, table_shards, truncated_at_opt, with_snapshot, std::move(snapshot_name_opt));
}
future<> database::truncate(sstring ksname, sstring cfname, timestamp_func tsf) {
auto& ks = find_keyspace(ksname);
auto& cf = find_column_family(ksname, cfname);
return truncate(ks, cf, std::move(tsf));
}
future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_func tsf, bool with_snapshot) {
dblog.debug("Truncating {}.{}: with_snapshot={} auto_snapshot={}", cf.schema()->ks_name(), cf.schema()->cf_name(), with_snapshot, get_config().auto_snapshot());
auto holder = cf.async_gate().hold();
const auto auto_snapshot = with_snapshot && get_config().auto_snapshot();
const auto should_flush = auto_snapshot;
future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, const std::vector<foreign_ptr<lw_shared_ptr<table>>>& table_shards, std::optional<db_clock::time_point> truncated_at_opt, bool with_snapshot, std::optional<sstring> snapshot_name_opt) {
auto& cf = *table_shards[this_shard_id()];
auto s = cf.schema();
// Schema tables changed commitlog domain at some point and this node will refuse to boot with
// truncation record present for schema tables to protect against misinterpreting of replay positions.
// Also, the replay_position returned by discard_sstables() may refer to old commit log domain.
if (cf.schema()->ks_name() == db::schema_tables::NAME) {
throw std::runtime_error(format("Truncating of {}.{} is not allowed.", cf.schema()->ks_name(), cf.schema()->cf_name()));
if (s->ks_name() == db::schema_tables::NAME) {
throw std::runtime_error(format("Truncating of {}.{} is not allowed.", s->ks_name(), s->cf_name()));
}
// Force mutations coming in to re-acquire higher rp:s
// This creates a "soft" ordering, in that we will guarantee that
// any sstable written _after_ we issue the flush below will
// only have higher rp:s than we will get from the discard_sstable
// call.
auto low_mark = cf.set_low_replay_position_mark();
auto auto_snapshot = sharded_db.local().get_config().auto_snapshot();
dblog.info("Truncating {}.{} {}snapshot", s->ks_name(), s->cf_name(), with_snapshot && auto_snapshot ? "with auto-" : "without ");
std::vector<foreign_ptr<std::unique_ptr<table_truncate_state>>> table_states;
table_states.resize(smp::count);
co_await coroutine::parallel_for_each(boost::irange(0u, smp::count), [&] (unsigned shard) -> future<> {
table_states[shard] = co_await smp::submit_to(shard, [&] () -> future<foreign_ptr<std::unique_ptr<table_truncate_state>>> {
auto& cf = *table_shards[this_shard_id()];
auto st = std::make_unique<table_truncate_state>();
st->holder = cf.async_gate().hold();
// Force mutations coming in to re-acquire higher rp:s
// This creates a "soft" ordering, in that we will guarantee that
// any sstable written _after_ we issue the flush below will
// only have higher rp:s than we will get from the discard_sstable
// call.
st->low_mark_at = db_clock::now();
st->low_mark = cf.set_low_replay_position_mark();
st->cres.reserve(1 + cf.views().size());
auto& db = sharded_db.local();
auto& cm = db.get_compaction_manager();
st->cres.emplace_back(co_await cm.stop_and_disable_compaction(cf.as_table_state()));
co_await coroutine::parallel_for_each(cf.views(), [&] (view_ptr v) -> future<> {
auto& vcf = db.find_column_family(v);
st->cres.emplace_back(co_await cm.stop_and_disable_compaction(vcf.as_table_state()));
});
co_return make_foreign(std::move(st));
});
});
const auto should_snapshot = with_snapshot && auto_snapshot;
const auto should_flush = should_snapshot && cf.can_flush();
dblog.trace("{} {}.{} and views on all shards", should_flush ? "Flushing" : "Clearing", s->ks_name(), s->cf_name());
std::function<future<>(replica::table&)> flush_or_clear = should_flush ?
[] (replica::table& cf) {
// TODO:
// this is not really a guarantee at all that we've actually
// gotten all things to disk. Again, need queue-ish or something.
return cf.flush();
} :
[] (replica::table& cf) {
return cf.clear();
};
co_await sharded_db.invoke_on_all([&] (replica::database& db) -> future<> {
unsigned shard = this_shard_id();
auto& cf = *table_shards[shard];
auto& st = *table_states[shard];
co_await flush_or_clear(cf);
co_await coroutine::parallel_for_each(cf.views(), [&] (view_ptr v) -> future<> {
auto& vcf = db.find_column_family(v);
co_await flush_or_clear(vcf);
});
st.did_flush = should_flush;
});
auto truncated_at = truncated_at_opt.value_or(db_clock::now());
if (should_snapshot) {
auto name = snapshot_name_opt.value_or(
format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name()));
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name);
}
co_await sharded_db.invoke_on_all([&] (database& db) {
auto shard = this_shard_id();
auto& cf = *table_shards[shard];
auto& st = *table_states[shard];
return db.truncate(cf, st, truncated_at);
});
}
future<> database::truncate(column_family& cf, const table_truncate_state& st, db_clock::time_point truncated_at) {
dblog.trace("Truncating {}.{} on shard", cf.schema()->ks_name(), cf.schema()->cf_name());
const auto uuid = cf.schema()->id();
std::vector<compaction_manager::compaction_reenabler> cres;
cres.reserve(1 + cf.views().size());
cres.emplace_back(co_await _compaction_manager.stop_and_disable_compaction(cf.as_table_state()));
co_await coroutine::parallel_for_each(cf.views(), [&, this] (view_ptr v) -> future<> {
auto& vcf = find_column_family(v);
cres.emplace_back(co_await _compaction_manager.stop_and_disable_compaction(vcf.as_table_state()));
});
bool did_flush = false;
if (should_flush && cf.can_flush()) {
// TODO:
// this is not really a guarantee at all that we've actually
// gotten all things to disk. Again, need queue-ish or something.
co_await cf.flush();
did_flush = true;
} else {
co_await cf.clear();
}
dblog.debug("Discarding sstable data for truncated CF + indexes");
// TODO: notify truncation
db_clock::time_point truncated_at = co_await tsf();
if (auto_snapshot) {
auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name());
co_await cf.snapshot(*this, name);
}
db::replay_position rp = co_await cf.discard_sstables(truncated_at);
// TODO: indexes.
// Note: since discard_sstables was changed to only count tables owned by this shard,
@@ -2388,15 +2452,15 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
// We nowadays do not flush tables with sstables but autosnapshot=false. This means
// the low_mark assertion does not hold, because we maybe/probably never got around to
// creating the sstables that would create them.
assert(!did_flush || low_mark <= rp || rp == db::replay_position());
rp = std::max(low_mark, rp);
co_await coroutine::parallel_for_each(cf.views(), [this, truncated_at, should_flush] (view_ptr v) -> future<> {
// If truncated_at is earlier than the time low_mark was taken
// then the replay_position returned by discard_sstables may be
// smaller than low_mark.
assert(!st.did_flush || rp == db::replay_position() || (truncated_at <= st.low_mark_at ? rp <= st.low_mark : st.low_mark <= rp));
if (rp == db::replay_position()) {
rp = st.low_mark;
}
co_await coroutine::parallel_for_each(cf.views(), [this, truncated_at] (view_ptr v) -> future<> {
auto& vcf = find_column_family(v);
if (should_flush) {
co_await vcf.flush();
} else {
co_await vcf.clear();
}
db::replay_position rp = co_await vcf.discard_sstables(truncated_at);
co_await db::system_keyspace::save_truncation_record(vcf, truncated_at, rp);
});

View File

@@ -70,6 +70,7 @@
#include "db/per_partition_rate_limit_info.hh"
#include "db/operation_type.hh"
#include "utils/serialized_action.hh"
#include "compaction/compaction_manager.hh"
class cell_locker;
class cell_locker_stats;
@@ -99,8 +100,6 @@ class compaction_data;
}
class compaction_manager;
namespace compaction {
class table_state;
}
@@ -851,10 +850,16 @@ public:
db::replay_position set_low_replay_position_mark();
private:
future<> snapshot(database& db, sstring name);
using snapshot_file_set = foreign_ptr<std::unique_ptr<std::unordered_set<sstring>>>;
future<snapshot_file_set> take_snapshot(database& db, sstring jsondir);
// Writes the table schema and the manifest of all files in the snapshot directory.
future<> finalize_snapshot(database& db, sstring jsondir, std::vector<snapshot_file_set> file_sets);
static future<> seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets);
friend class database;
public:
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const std::vector<foreign_ptr<lw_shared_ptr<table>>>& table_shards, sstring name);
future<std::unordered_map<sstring, snapshot_details>> get_snapshot_details();
/*!
@@ -1624,31 +1629,42 @@ public:
future<> flush_all_memtables();
future<> flush(const sstring& ks, const sstring& cf);
// flush a table identified by the given id on all shards.
future<> flush_on_all(utils::UUID id);
static future<> flush_table_on_all_shards(sharded<database>& sharded_db, utils::UUID id);
// flush a single table in a keyspace on all shards.
future<> flush_on_all(std::string_view ks_name, std::string_view table_name);
static future<> flush_table_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::string_view table_name);
// flush a list of tables in a keyspace on all shards.
future<> flush_on_all(std::string_view ks_name, std::vector<sstring> table_names);
static future<> flush_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names);
// flush all tables in a keyspace on all shards.
future<> flush_on_all(std::string_view ks_name);
static future<> flush_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);
future<> snapshot_on_all(std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush);
future<> snapshot_on_all(std::string_view ks_name, sstring tag, bool skip_flush);
// See #937. Truncation now requires a callback to get a time stamp
// that must be guaranteed to be the same for all shards.
typedef std::function<future<db_clock::time_point>()> timestamp_func;
/** Truncates the given column family */
future<> truncate(sstring ksname, sstring cfname, timestamp_func);
future<> truncate(const keyspace& ks, column_family& cf, timestamp_func, bool with_snapshot = true);
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring table_name, sstring tag, bool skip_flush);
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush);
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush);
public:
bool update_column_family(schema_ptr s);
private:
future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func, bool with_snapshot = true);
future<> detach_column_family(table& cf);
static future<std::vector<foreign_ptr<lw_shared_ptr<table>>>> get_table_on_all_shards(sharded<database>& db, utils::UUID uuid);
struct table_truncate_state {
gate::holder holder;
db_clock::time_point low_mark_at;
db::replay_position low_mark;
std::vector<compaction_manager::compaction_reenabler> cres;
bool did_flush;
};
static future<> truncate_table_on_all_shards(sharded<database>& db, const std::vector<foreign_ptr<lw_shared_ptr<table>>>&, std::optional<db_clock::time_point> truncated_at_opt, bool with_snapshot, std::optional<sstring> snapshot_name_opt);
future<> truncate(column_family& cf, const table_truncate_state&, db_clock::time_point truncated_at);
public:
/** Truncates the given column family */
// If truncated_at_opt is not given, it is set to db_clock::now right after flush/clear.
static future<> truncate_table_on_all_shards(sharded<database>& db, sstring ks_name, sstring cf_name, std::optional<db_clock::time_point> truncated_at_opt = {}, bool with_snapshot = true, std::optional<sstring> snapshot_name_opt = {});
// drops the table on all shards and removes the table directory if there are no snapshots
static future<> drop_table_on_all_shards(sharded<database>& db, sstring ks_name, sstring cf_name, timestamp_func, bool with_snapshot = true);
static future<> drop_table_on_all_shards(sharded<database>& db, sstring ks_name, sstring cf_name, bool with_snapshot = true);
const dirty_memory_manager_logalloc::region_group& dirty_memory_region_group() const {
return _dirty_memory_manager.region_group();

View File

@@ -1326,47 +1326,19 @@ logalloc::occupancy_stats table::occupancy() const {
}
// Snapshots: snapshotting the files themselves is easy: if more than one CF
// happens to link an SSTable twice, all but one will fail, and we will end up
// with one copy.
//
// The problem for us, is that the snapshot procedure is supposed to leave a
// manifest file inside its directory. So if we just call snapshot() from
// multiple shards, only the last one will succeed, writing its own SSTables to
// the manifest leaving all other shards' SSTables unaccounted for.
//
// Moreover, for things like drop table, the operation should only proceed when the
// snapshot is complete. That includes the manifest file being correctly written,
// and for this reason we need to wait for all shards to finish their snapshotting
// before we can move on.
//
// To know which files we must account for in the manifest, we will keep an
// SSTable set. Theoretically, we could just rescan the snapshot directory and
// see what's in there. But we would need to wait for all shards to finish
// before we can do that anyway. That is the hard part, and once that is done
// keeping the files set is not really a big deal.
//
// This code assumes that all shards will be snapshotting at the same time. So
// far this is a safe assumption, but if we ever want to take snapshots from a
// group of shards only, this code will have to be updated to account for that.
struct snapshot_manager {
std::unordered_set<sstring> files;
named_semaphore requests = {0, named_semaphore_exception_factory{"snapshot manager requests"}};
named_semaphore manifest_write = {0, named_semaphore_exception_factory{"snapshot manager manifest write"}};
snapshot_manager() {}
};
static thread_local std::unordered_map<sstring, lw_shared_ptr<snapshot_manager>> pending_snapshots;
static future<>
seal_snapshot(sstring jsondir) {
future<>
table::seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets) {
std::ostringstream ss;
int n = 0;
ss << "{" << std::endl << "\t\"files\" : [ ";
for (auto&& rf: pending_snapshots.at(jsondir)->files) {
for (const auto& fsp : file_sets) {
for (const auto& rf : *fsp) {
if (n++ > 0) {
ss << ", ";
}
ss << "\"" << rf << "\"";
co_await coroutine::maybe_yield();
}
}
ss << " ]" << std::endl << "}" << std::endl;
@@ -1375,130 +1347,118 @@ seal_snapshot(sstring jsondir) {
tlogger.debug("Storing manifest {}", jsonfile);
return io_check([jsondir] { return recursive_touch_directory(jsondir); }).then([jsonfile, json = std::move(json)] {
return open_checked_file_dma(general_disk_error_handler, jsonfile, open_flags::wo | open_flags::create | open_flags::truncate).then([json](file f) {
return make_file_output_stream(std::move(f)).then([json](output_stream<char>&& out) {
return do_with(std::move(out), [json] (output_stream<char>& out) {
return out.write(json.c_str(), json.size()).then([&out] {
return out.flush();
}).then([&out] {
return out.close();
});
});
});
});
}).then([jsondir] {
return io_check(sync_directory, std::move(jsondir));
}).finally([jsondir] {
pending_snapshots.erase(jsondir);
return make_ready_future<>();
});
co_await io_check([jsondir] { return recursive_touch_directory(jsondir); });
auto f = co_await open_checked_file_dma(general_disk_error_handler, jsonfile, open_flags::wo | open_flags::create | open_flags::truncate);
auto out = co_await make_file_output_stream(std::move(f));
std::exception_ptr ex;
try {
co_await out.write(json.c_str(), json.size());
co_await out.flush();
} catch (...) {
ex = std::current_exception();
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
co_await io_check(sync_directory, std::move(jsondir));
}
future<> table::write_schema_as_cql(database& db, sstring dir) const {
std::ostringstream ss;
try {
this->schema()->describe(db, ss);
} catch (...) {
return make_exception_future<>(std::current_exception());
}
this->schema()->describe(db, ss);
auto schema_description = ss.str();
auto schema_file_name = dir + "/schema.cql";
return open_checked_file_dma(general_disk_error_handler, schema_file_name, open_flags::wo | open_flags::create | open_flags::truncate).then([schema_description = std::move(schema_description)](file f) {
return make_file_output_stream(std::move(f)).then([schema_description = std::move(schema_description)] (output_stream<char>&& out) mutable {
return do_with(std::move(out), [schema_description = std::move(schema_description)] (output_stream<char>& out) {
return out.write(schema_description.c_str(), schema_description.size()).then([&out] {
return out.flush();
}).then([&out] {
return out.close();
});
});
});
});
auto f = co_await open_checked_file_dma(general_disk_error_handler, schema_file_name, open_flags::wo | open_flags::create | open_flags::truncate);
auto out = co_await make_file_output_stream(std::move(f));
std::exception_ptr ex;
try {
co_await out.write(schema_description.c_str(), schema_description.size());
co_await out.flush();
} catch (...) {
ex = std::current_exception();
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
}
future<> table::snapshot(database& db, sstring name) {
auto jsondir = _config.datadir + "/snapshots/" + name;
tlogger.debug("snapshot {}", jsondir);
// Runs the orchestration code on an arbitrary shard to balance the load.
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const std::vector<foreign_ptr<lw_shared_ptr<table>>>& table_shards, sstring name) {
auto jsondir = table_shards[this_shard_id()]->_config.datadir + "/snapshots/" + name;
auto orchestrator = std::hash<sstring>()(jsondir) % smp::count;
co_await smp::submit_to(orchestrator, [&] () -> future<> {
auto& t = *table_shards[this_shard_id()];
auto s = t.schema();
tlogger.debug("Taking snapshot of {}.{}: directory={}", s->ks_name(), s->cf_name(), jsondir);
std::vector<table::snapshot_file_set> file_sets;
file_sets.reserve(smp::count);
co_await coroutine::parallel_for_each(boost::irange(0u, smp::count), [&] (unsigned shard) -> future<> {
file_sets.emplace_back(co_await smp::submit_to(shard, [&] {
return table_shards[this_shard_id()]->take_snapshot(sharded_db.local(), jsondir);
}));
});
co_await t.finalize_snapshot(sharded_db.local(), std::move(jsondir), std::move(file_sets));
});
}
future<table::snapshot_file_set> table::take_snapshot(database& db, sstring jsondir) {
tlogger.trace("take_snapshot {}", jsondir);
auto sstable_deletion_guard = co_await get_units(_sstable_deletion_sem, 1);
std::exception_ptr ex;
std::vector<sstables::shared_sstable> tables;
try {
tables = boost::copy_range<std::vector<sstables::shared_sstable>>(*_sstables->all());
co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); });
co_await max_concurrent_for_each(tables, db.get_config().initial_sstable_loading_concurrency(), [&db, &jsondir] (sstables::shared_sstable sstable) {
return with_semaphore(db.get_sharded_sst_dir_semaphore().local(), 1, [&jsondir, sstable] {
return io_check([sstable, &dir = jsondir] {
return sstable->create_links(dir);
});
auto tables = boost::copy_range<std::vector<sstables::shared_sstable>>(*_sstables->all());
co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); });
co_await max_concurrent_for_each(tables, db.get_config().initial_sstable_loading_concurrency(), [&db, &jsondir] (sstables::shared_sstable sstable) {
return with_semaphore(db.get_sharded_sst_dir_semaphore().local(), 1, [&jsondir, sstable] {
return io_check([sstable, &dir = jsondir] {
return sstable->create_links(dir);
});
});
co_await io_check(sync_directory, jsondir);
} catch (...) {
ex = std::current_exception();
}
auto shard = std::hash<sstring>()(jsondir) % smp::count;
std::unordered_set<sstring> table_names;
try {
for (auto& sst : tables) {
auto f = sst->get_filename();
auto rf = f.substr(sst->get_dir().size() + 1);
table_names.insert(std::move(rf));
}
} catch (...) {
ex = std::current_exception();
}
co_await smp::submit_to(shard, [requester = this_shard_id(), &jsondir, this, &db,
tables = std::move(table_names), datadir = _config.datadir, ex = std::move(ex)] () mutable -> future<> {
if (!pending_snapshots.contains(jsondir)) {
try {
pending_snapshots.emplace(jsondir, make_lw_shared<snapshot_manager>());
} catch (...) {
// abort since the process will hang if we can't coordinate
// snapshot across shards, similar to failing to allocation a continuation.
tlogger.error("Failed allocating snapshot_manager: {}. Aborting.", std::current_exception());
abort();
}
}
auto snapshot = pending_snapshots.at(jsondir);
try {
for (auto&& sst: tables) {
snapshot->files.insert(std::move(sst));
}
} catch (...) {
ex = std::current_exception();
}
tlogger.debug("snapshot {}: signal requests", jsondir);
snapshot->requests.signal(1);
if (requester == this_shard_id()) {
tlogger.debug("snapshot {}: waiting for all shards", jsondir);
co_await snapshot->requests.wait(smp::count);
// this_shard_id() here == requester == this_shard_id() before submit_to() above,
// so the db reference is still local
tlogger.debug("snapshot {}: writing schema.cql", jsondir);
co_await write_schema_as_cql(db, jsondir).handle_exception([&] (std::exception_ptr ptr) {
tlogger.error("Failed writing schema file in snapshot in {} with exception {}", jsondir, ptr);
ex = std::move(ptr);
});
tlogger.debug("snapshot {}: seal_snapshot", jsondir);
co_await seal_snapshot(jsondir).handle_exception([&] (std::exception_ptr ptr) {
tlogger.error("Failed to seal snapshot in {}: {}.", jsondir, ptr);
ex = std::move(ptr);
});
snapshot->manifest_write.signal(smp::count);
}
tlogger.debug("snapshot {}: waiting for manifest on behalf of shard {}", jsondir, requester);
co_await snapshot->manifest_write.wait(1);
tlogger.debug("snapshot {}: done: error={}", jsondir, ex);
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
});
co_await io_check(sync_directory, jsondir);
auto table_names = std::make_unique<std::unordered_set<sstring>>();
table_names->reserve(tables.size());
for (auto& sst : tables) {
auto f = sst->get_filename();
auto rf = f.substr(sst->get_dir().size() + 1);
table_names->insert(std::move(rf));
co_await coroutine::maybe_yield();
}
co_return make_foreign(std::move(table_names));
}
future<> table::finalize_snapshot(database& db, sstring jsondir, std::vector<snapshot_file_set> file_sets) {
std::exception_ptr ex;
tlogger.debug("snapshot {}: writing schema.cql", jsondir);
co_await write_schema_as_cql(db, jsondir).handle_exception([&] (std::exception_ptr ptr) {
tlogger.error("Failed writing schema file in snapshot in {} with exception {}", jsondir, ptr);
ex = std::move(ptr);
});
tlogger.debug("snapshot {}: seal_snapshot", jsondir);
co_await seal_snapshot(jsondir, std::move(file_sets)).handle_exception([&] (std::exception_ptr ptr) {
tlogger.error("Failed to seal snapshot in {}: {}.", jsondir, ptr);
ex = std::move(ptr);
});
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
}
future<bool> table::snapshot_exists(sstring tag) {

View File

@@ -55,7 +55,6 @@
#include "utils/latency.hh"
#include "schema.hh"
#include "schema_registry.hh"
#include "utils/joinpoint.hh"
#include <seastar/util/lazy.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/execution_stage.hh>
@@ -690,11 +689,7 @@ private:
}
future<> handle_truncate(rpc::opt_time_point timeout, sstring ksname, sstring cfname) {
return do_with(utils::make_joinpoint([] { return db_clock::now();}), [this, ksname, cfname] (auto& tsf) {
return _sp.container().invoke_on_all(_sp._write_smp_service_group, [ksname, cfname, &tsf] (storage_proxy& sp) {
return sp._db.local().truncate(ksname, cfname, [&tsf] { return tsf.value(); });
});
});
return replica::database::truncate_table_on_all_shards(_sp._db, ksname, cfname);
}
future<foreign_ptr<std::unique_ptr<service::paxos::prepare_response>>>

View File

@@ -54,41 +54,69 @@ public:
}
};
static future<> apply_mutation(sharded<replica::database>& sharded_db, utils::UUID uuid, const mutation& m, bool do_flush = false,
db::commitlog::force_sync fs = db::commitlog::force_sync::no, db::timeout_clock::time_point timeout = db::no_timeout) {
auto shard = m.shard_of();
return sharded_db.invoke_on(shard, [uuid, fm = freeze(m), do_flush, fs, timeout] (replica::database& db) {
auto& t = db.find_column_family(uuid);
return db.apply(t.schema(), fm, tracing::trace_state_ptr(), fs, timeout).then([do_flush, &t] {
return do_flush ? t.flush() : make_ready_future<>();
});
});
}
SEASTAR_TEST_CASE(test_safety_after_truncate) {
auto cfg = make_shared<db::config>();
cfg->auto_snapshot.set(false);
return do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
auto& db = e.local_db();
auto s = db.find_schema("ks", "cf");
dht::partition_range_vector pranges;
sstring ks_name = "ks";
sstring cf_name = "cf";
auto s = db.find_schema(ks_name, cf_name);
auto uuid = s->id();
std::vector<size_t> keys_per_shard;
std::vector<dht::partition_range_vector> pranges_per_shard;
keys_per_shard.resize(smp::count);
pranges_per_shard.resize(smp::count);
for (uint32_t i = 1; i <= 1000; ++i) {
auto pkey = partition_key::from_single_value(*s, to_bytes(fmt::format("key{}", i)));
mutation m(s, pkey);
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {});
pranges.emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
db.apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
auto shard = m.shard_of();
keys_per_shard[shard]++;
pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
apply_mutation(e.db(), uuid, m).get();
}
auto assert_query_result = [&] (size_t expected_size) {
auto assert_query_result = [&] (const std::vector<size_t>& expected_sizes) {
auto max_size = std::numeric_limits<size_t>::max();
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size), query::row_limit(1000));
auto&& [result, cache_tempature] = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, db::no_timeout).get0();
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_size);
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
auto shard = this_shard_id();
auto s = db.find_schema(uuid);
auto&& [result, cache_tempature] = co_await db.query(s, cmd, query::result_options::only_result(), pranges_per_shard[shard], nullptr, db::no_timeout);
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_sizes[shard]);
}).get();
};
assert_query_result(1000);
assert_query_result(keys_per_shard);
db.truncate("ks", "cf", [] { return make_ready_future<db_clock::time_point>(db_clock::now()); }).get();
replica::database::truncate_table_on_all_shards(e.db(), "ks", "cf").get();
assert_query_result(0);
for (auto it = keys_per_shard.begin(); it < keys_per_shard.end(); ++it) {
*it = 0;
}
assert_query_result(keys_per_shard);
auto cl = db.commitlog();
auto rp = db::commitlog_replayer::create_replayer(e.db()).get0();
auto paths = cl->list_existing_segments().get0();
rp.recover(paths, db::commitlog::descriptor::FILENAME_PREFIX).get();
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
auto cl = db.commitlog();
auto rp = co_await db::commitlog_replayer::create_replayer(e.db());
auto paths = co_await cl->list_existing_segments();
co_await rp.recover(paths, db::commitlog::descriptor::FILENAME_PREFIX);
}).get();
assert_query_result(0);
assert_query_result(keys_per_shard);
return make_ready_future<>();
}, cfg);
}
@@ -114,12 +142,7 @@ SEASTAR_TEST_CASE(test_truncate_without_snapshot_during_writes) {
mutation m(s, pkey);
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {});
auto shard = m.shard_of();
return e.db().invoke_on(shard, [&, fm = freeze(m)] (replica::database& db) -> future<> {
auto& t = db.find_column_family(uuid);
return db.apply(t.schema(), fm, tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).then([&t] {
return t.flush();
});
}).finally([&] {
return apply_mutation(e.db(), uuid, m, true /* do_flush */).finally([&] {
++count;
});
});
@@ -128,12 +151,8 @@ SEASTAR_TEST_CASE(test_truncate_without_snapshot_during_writes) {
uint32_t num_keys = 1000;
auto f0 = insert_data(0, num_keys);
auto f1 = do_until([&] { return count >= num_keys; }, [&] () -> future<> {
return e.db().invoke_on_all([&, ts = db_clock::now()] (replica::database& db) {
auto& ks = db.find_keyspace(ks_name);
auto& cf = db.find_column_family(uuid);
return db.truncate(ks, cf, [ts] { return make_ready_future<db_clock::time_point>(ts); }, false /* with_snapshot */);
}).then([] {
auto f1 = do_until([&] { return count >= num_keys; }, [&, ts = db_clock::now()] {
return replica::database::truncate_table_on_all_shards(e.db(), "ks", "cf", ts, false /* with_snapshot */).then([] {
return yield();
});
});
@@ -148,40 +167,63 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
auto& db = e.local_db();
auto s = db.find_schema("ks", "cf");
dht::partition_range_vector pranges;
for (uint32_t i = 1; i <= 3; ++i) {
auto uuid = s->id();
std::vector<size_t> keys_per_shard;
std::vector<dht::partition_range_vector> pranges_per_shard;
keys_per_shard.resize(smp::count);
pranges_per_shard.resize(smp::count);
for (uint32_t i = 1; i <= 3 * smp::count; ++i) {
auto pkey = partition_key::from_single_value(*s, to_bytes(format("key{:d}", i)));
mutation m(s, pkey);
m.partition().apply(tombstone(api::timestamp_type(1), gc_clock::now()));
db.apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
apply_mutation(e.db(), uuid, m).get();
auto shard = m.shard_of();
pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
}
for (uint32_t i = 3; i <= 8; ++i) {
for (uint32_t i = 3 * smp::count; i <= 8 * smp::count; ++i) {
auto pkey = partition_key::from_single_value(*s, to_bytes(format("key{:d}", i)));
mutation m(s, pkey);
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), 1);
db.apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
pranges.emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
apply_mutation(e.db(), uuid, m).get();
auto shard = m.shard_of();
keys_per_shard[shard]++;
pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
}
auto max_size = std::numeric_limits<size_t>::max();
{
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size), query::row_limit(3));
auto result = std::get<0>(db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, db::no_timeout).get0());
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(3);
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
auto shard = this_shard_id();
auto s = db.find_schema(uuid);
auto result = std::get<0>(co_await db.query(s, cmd, query::result_options::only_result(), pranges_per_shard[shard], nullptr, db::no_timeout));
auto expected_size = std::min<size_t>(keys_per_shard[shard], 3);
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_size);
}).get();
}
{
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
query::row_limit(query::max_rows), query::partition_limit(5));
auto result = std::get<0>(db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, db::no_timeout).get0());
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(5);
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
auto shard = this_shard_id();
auto s = db.find_schema(uuid);
auto result = std::get<0>(co_await db.query(s, cmd, query::result_options::only_result(), pranges_per_shard[shard], nullptr, db::no_timeout));
auto expected_size = std::min<size_t>(keys_per_shard[shard], 5);
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_size);
}).get();
}
{
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
query::row_limit(query::max_rows), query::partition_limit(3));
auto result = std::get<0>(db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, db::no_timeout).get0());
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(3);
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
auto shard = this_shard_id();
auto s = db.find_schema(uuid);
auto result = std::get<0>(co_await db.query(s, cmd, query::result_options::only_result(), pranges_per_shard[shard], nullptr, db::no_timeout));
auto expected_size = std::min<size_t>(keys_per_shard[shard], 3);
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_size);
}).get();
}
});
});
@@ -203,8 +245,9 @@ static void test_database(void (*run_tests)(populate_fn_ex, bool)) {
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(s, ts).get(), std::move(group0_guard)).get();
replica::column_family& cf = e.local_db().find_column_family(s);
auto uuid = cf.schema()->id();
for (auto&& m : partitions) {
e.local_db().apply(cf.schema(), freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
apply_mutation(e.db(), uuid, m).get();
}
cf.flush().get();
cf.get_row_cache().invalidate(row_cache::external_updater([] {})).get();
@@ -436,7 +479,7 @@ future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<>
future<> take_snapshot(sharded<replica::database>& db, bool skip_flush = false, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test") {
try {
co_await db.local().snapshot_on_all(ks_name, {cf_name}, snapshot_name, skip_flush);
co_await replica::database::snapshot_table_on_all_shards(db, ks_name, cf_name, snapshot_name, skip_flush);
} catch (...) {
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={}: {}",
ks_name, cf_name, snapshot_name, skip_flush, std::current_exception());
@@ -657,7 +700,7 @@ SEASTAR_TEST_CASE(clear_multiple_snapshots) {
// existing snapshots expected to remain after dropping the table
testlog.debug("Dropping table {}.{}", ks_name, table_name);
replica::database::drop_table_on_all_shards(e.db(), ks_name, table_name, [ts = db_clock::now()] { return make_ready_future<db_clock::time_point>(ts); }).get();
replica::database::drop_table_on_all_shards(e.db(), ks_name, table_name).get();
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(num_snapshots)), true);
// clear all tags
@@ -1172,7 +1215,7 @@ SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
default_priority_class(),
nullptr);
auto f = replica::database::drop_table_on_all_shards(e.db(), "ks", "cf", [ts] { return make_ready_future<db_clock::time_point>(ts); });
auto f = replica::database::drop_table_on_all_shards(e.db(), "ks", "cf");
// we add a querier to the querier cache while the drop is ongoing
auto& qc = db.get_querier_cache();
@@ -1201,7 +1244,7 @@ static future<> test_drop_table_with_auto_snapshot(bool auto_snapshot) {
// Pass `with_snapshot=true` to drop_table_on_all
// to allow auto_snapshot (based on the configuration above).
// The table directory should therefore exist after the table is dropped if auto_snapshot is disabled in the configuration.
co_await replica::database::drop_table_on_all_shards(e.db(), ks_name, table_name, [ts = db_clock::now()] { return make_ready_future<db_clock::time_point>(ts); }, true);
co_await replica::database::drop_table_on_all_shards(e.db(), ks_name, table_name, true);
auto cf_dir_exists = co_await file_exists(cf_dir);
BOOST_REQUIRE_EQUAL(cf_dir_exists, auto_snapshot);
co_return;
@@ -1226,7 +1269,7 @@ SEASTAR_TEST_CASE(drop_table_with_no_snapshot) {
// Pass `with_snapshot=false` to drop_table_on_all
// to disallow auto_snapshot.
// The table directory should therefore not exist after the table is dropped.
co_await replica::database::drop_table_on_all_shards(e.db(), ks_name, table_name, [ts = db_clock::now()] { return make_ready_future<db_clock::time_point>(ts); }, false);
co_await replica::database::drop_table_on_all_shards(e.db(), ks_name, table_name, false);
auto cf_dir_exists = co_await file_exists(cf_dir);
BOOST_REQUIRE_EQUAL(cf_dir_exists, false);
co_return;
@@ -1239,13 +1282,13 @@ SEASTAR_TEST_CASE(drop_table_with_explicit_snapshot) {
co_await do_with_some_data({table_name}, [&] (cql_test_env& e) -> future<> {
auto snapshot_tag = format("test-{}", db_clock::now().time_since_epoch().count());
co_await e.local_db().snapshot_on_all(ks_name, snapshot_tag, false);
co_await replica::database::snapshot_table_on_all_shards(e.db(), ks_name, table_name, snapshot_tag, false);
auto cf_dir = e.local_db().find_column_family(ks_name, table_name).dir();
// With explicit snapshot and with_snapshot=false
// dir should still be kept, regardless of the
// with_snapshot parameter and auto_snapshot config.
co_await replica::database::drop_table_on_all_shards(e.db(), ks_name, table_name, [ts = db_clock::now()] { return make_ready_future<db_clock::time_point>(ts); }, false);
co_await replica::database::drop_table_on_all_shards(e.db(), ks_name, table_name, false);
auto cf_dir_exists = co_await file_exists(cf_dir);
BOOST_REQUIRE_EQUAL(cf_dir_exists, true);
co_return;

View File

@@ -89,7 +89,7 @@ SEASTAR_TEST_CASE(simple_sstable_extension) {
// minimal data
return e.execute_cql("insert into ks.cf (id, value) values (1, 100);").discard_result().then([&e] {
// flush all shards
return e.db().local().flush_on_all("ks", "cf").then([] {
return replica::database::flush_table_on_all_shards(e.db(), "ks", "cf").then([] {
BOOST_REQUIRE(counter > 1);
});
});

View File

@@ -1,89 +0,0 @@
/*
* Copyright (C) 2016-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <memory>
#include <optional>
#include <seastar/core/semaphore.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/smp.hh>
#include "seastarx.hh"
/**
* Joinpoint:
*
* Helper type for letting operations working on all shards "join" and acquire
* the same value of something, with that value being based on whenever that
* join takes place. (Obvious use case: time stamp after one set of per-shard ops, but
* before final ones).
* The generation of the value is guaranteed to happen on the shards that created the
* join point.
*/
namespace utils {
template<typename T>
class joinpoint {
public:
typedef typename futurize<T>::type type;
typedef std::function<type()> func_type;
joinpoint(func_type f)
: _func(std::move(f))
, _shard(this_shard_id())
, _enter(0)
, _wait(0)
{}
type value() {
return smp::submit_to(_shard, [this, id = this_shard_id()] {
_enter.signal();
if (id == _shard) {
// We should not generate to common value until all shards
// have reached this point. Thus the two semaphores.
return _enter.wait(smp::count).then([this] {
return _func().then([this](T v) {
_value = std::move(v);
_wait.signal(smp::count - 1); // we don't wait
return make_ready_future<T>(*_value);
}).handle_exception([this](auto ep) {
_wait.broken(ep);
return make_exception_future<T>(ep);
});
});
}
return _wait.wait().then([this] {
assert(_value);
return make_ready_future<T>(*_value);
});
});
}
private:
func_type _func;
shard_id _shard;
semaphore _enter;
semaphore _wait;
std::optional<T> _value;
};
/**
* Based on the join-code in cf::snapshot.
* An object that allows us to generate a value for-all-shards
* at some point down the execution in multiple shards.
*
* T type must be copyable, and preferable primitive/trivial
* or at the very least shard-copy safe.
*/
template<typename Func, typename T = std::result_of_t<Func()>>
joinpoint<T> make_joinpoint(Func && f) {
return joinpoint<T>([f = std::forward<Func>(f)] {
return futurize_invoke(f);
});
}
}