Merge 'replica/database: truncate: temporarily disable compaction on table and views before flush' from Benny Halevy

Flushing the base table triggers view building
and corresponding compactions on the view tables.

Temporarily disable compaction on both the base
table and all its view before flush and snapshot
since those flushed sstables are about to be truncated
anyway right after the snapshot is taken.

This should make truncate go faster.

In the process, this series also embeds `database::truncate_views`
into `truncate` and coroutinizes both

Refs #6309

Test: unit(dev)

Closes #10203

* github.com:scylladb/scylla:
  replica/database: truncate: fixup indentation
  replica/database: truncate: temporarily disable compaction on table and views before flush
  replica/database: truncate: coroutinize per-view logic
  replica/database: open-code truncate_view in truncate
  replica/database: truncate: coroutinize run_with_compaction_disabled lambda
  replica/database: coroutinize truncate
  compaction_manager: add disable_compaction method

(cherry picked from commit aab052c0d5)
This commit is contained in:
Avi Kivity
2022-03-17 17:24:20 +02:00
committed by Botond Dénes
parent 02e8336659
commit f5bf4c81d1
4 changed files with 130 additions and 91 deletions

View File

@@ -353,32 +353,50 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact
return task->compaction_done.get_future().then([task] {});
}
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, replica::table* t)
: _cm(cm)
, _table(t)
, _compaction_state(cm.get_compaction_state(_table))
, _holder(_compaction_state.gate.hold())
{
_compaction_state.compaction_disabled_counter++;
cmlog.debug("Temporarily disabled compaction for {}.{}. compaction_disabled_counter={}",
_table->schema()->ks_name(), _table->schema()->cf_name(), _compaction_state.compaction_disabled_counter);
}
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_reenabler&& o) noexcept
: _cm(o._cm)
, _table(std::exchange(o._table, nullptr))
, _compaction_state(o._compaction_state)
, _holder(std::move(o._holder))
{}
compaction_manager::compaction_reenabler::~compaction_reenabler() {
// submit compaction request if we're the last holder of the gate which is still opened.
if (_table && --_compaction_state.compaction_disabled_counter == 0 && !_compaction_state.gate.is_closed()) {
cmlog.debug("Reenabling compaction for {}.{}",
_table->schema()->ks_name(), _table->schema()->cf_name());
try {
_cm.submit(_table);
} catch (...) {
cmlog.warn("compaction_reenabler could not reenable compaction for {}.{}: {}",
_table->schema()->ks_name(), _table->schema()->cf_name(), std::current_exception());
}
}
}
future<compaction_manager::compaction_reenabler>
compaction_manager::stop_and_disable_compaction(replica::table* t) {
compaction_reenabler cre(*this, t);
co_await stop_ongoing_compactions("user-triggered operation", t);
co_return cre;
}
future<>
compaction_manager::run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func) {
auto& c_state = _compaction_state[t];
auto holder = c_state.gate.hold();
compaction_reenabler cre = co_await stop_and_disable_compaction(t);
c_state.compaction_disabled_counter++;
std::exception_ptr err;
try {
co_await stop_ongoing_compactions("user-triggered operation", t);
co_await func();
} catch (...) {
err = std::current_exception();
}
#ifdef DEBUG
assert(_compaction_state.contains(t));
#endif
// submit compaction request if we're the last holder of the gate which is still opened.
if (--c_state.compaction_disabled_counter == 0 && !c_state.gate.is_closed()) {
submit(t);
}
if (err) {
std::rethrow_exception(err);
}
co_return;
co_await func();
}
void compaction_manager::task::setup_new_compaction() {

View File

@@ -269,6 +269,31 @@ public:
// parameter job is a function that will carry the operation
future<> run_custom_job(replica::table* t, sstables::compaction_type type, noncopyable_function<future<>(sstables::compaction_data&)> job);
class compaction_reenabler {
compaction_manager& _cm;
replica::table* _table;
compaction_state& _compaction_state;
gate::holder _holder;
public:
compaction_reenabler(compaction_manager&, replica::table*);
compaction_reenabler(compaction_reenabler&&) noexcept;
~compaction_reenabler();
replica::table* compacting_table() const noexcept {
return _table;
}
const compaction_state& compaction_state() const noexcept {
return _compaction_state;
}
};
// Disable compaction temporarily for a table t.
// Caller should call the compaction_reenabler::reenable
future<compaction_reenabler> stop_and_disable_compaction(replica::table* t);
// Run a function with compaction temporarily disabled for a table T.
future<> run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func);

View File

@@ -2062,80 +2062,77 @@ future<> database::truncate(sstring ksname, sstring cfname, timestamp_func tsf)
future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_func tsf, bool with_snapshot) {
dblog.debug("Truncating {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name());
return with_gate(cf.async_gate(), [this, &ks, &cf, tsf = std::move(tsf), with_snapshot] () mutable -> future<> {
const auto auto_snapshot = with_snapshot && get_config().auto_snapshot();
const auto should_flush = auto_snapshot;
auto holder = cf.async_gate().hold();
// Force mutations coming in to re-acquire higher rp:s
// This creates a "soft" ordering, in that we will guarantee that
// any sstable written _after_ we issue the flush below will
// only have higher rp:s than we will get from the discard_sstable
// call.
auto low_mark = cf.set_low_replay_position_mark();
const auto auto_snapshot = with_snapshot && get_config().auto_snapshot();
const auto should_flush = auto_snapshot;
const auto uuid = cf.schema()->id();
// Force mutations coming in to re-acquire higher rp:s
// This creates a "soft" ordering, in that we will guarantee that
// any sstable written _after_ we issue the flush below will
// only have higher rp:s than we will get from the discard_sstable
// call.
auto low_mark = cf.set_low_replay_position_mark();
return _compaction_manager->run_with_compaction_disabled(&cf, [this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable {
future<> f = make_ready_future<>();
bool did_flush = false;
if (should_flush && cf.can_flush()) {
// TODO:
// this is not really a guarantee at all that we've actually
// gotten all things to disk. Again, need queue-ish or something.
f = cf.flush();
did_flush = true;
} else {
f = cf.clear();
}
return f.then([this, &cf, auto_snapshot, tsf = std::move(tsf), low_mark, should_flush, did_flush] {
dblog.debug("Discarding sstable data for truncated CF + indexes");
// TODO: notify truncation
const auto uuid = cf.schema()->id();
return tsf().then([this, &cf, auto_snapshot, low_mark, should_flush, did_flush](db_clock::time_point truncated_at) {
future<> f = make_ready_future<>();
if (auto_snapshot) {
auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name());
f = cf.snapshot(*this, name);
}
return f.then([this, &cf, truncated_at, low_mark, should_flush, did_flush] {
return cf.discard_sstables(truncated_at).then([this, &cf, truncated_at, low_mark, should_flush, did_flush](db::replay_position rp) {
// TODO: indexes.
// Note: since discard_sstables was changed to only count tables owned by this shard,
// we can get zero rp back. Changed assert, and ensure we save at least low_mark.
// #6995 - the assert below was broken in c2c6c71 and remained so for many years.
// We nowadays do not flush tables with sstables but autosnapshot=false. This means
// the low_mark assertion does not hold, because we maybe/probably never got around to
// creating the sstables that would create them.
assert(!did_flush || low_mark <= rp || rp == db::replay_position());
rp = std::max(low_mark, rp);
return truncate_views(cf, truncated_at, should_flush).then([&cf, truncated_at, rp] {
// save_truncation_record() may actually fail after we cached the truncation time
// but this is not be worse that if failing without caching: at least the correct time
// will be available until next reboot and a client will have to retry truncation anyway.
cf.cache_truncation_record(truncated_at);
return db::system_keyspace::save_truncation_record(cf, truncated_at, rp);
});
});
});
});
});
}).then([this, uuid] {
drop_repair_history_map_for_table(uuid);
});
});
}
std::vector<compaction_manager::compaction_reenabler> cres;
cres.reserve(1 + cf.views().size());
future<> database::truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush) {
return parallel_for_each(base.views(), [this, truncated_at, should_flush] (view_ptr v) {
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&cf));
co_await parallel_for_each(cf.views(), [&, this] (view_ptr v) -> future<> {
auto& vcf = find_column_family(v);
return _compaction_manager->run_with_compaction_disabled(&vcf, [&vcf, truncated_at, should_flush] {
return (should_flush ? vcf.flush() : vcf.clear()).then([&vcf, truncated_at, should_flush] {
return vcf.discard_sstables(truncated_at).then([&vcf, truncated_at, should_flush](db::replay_position rp) {
return db::system_keyspace::save_truncation_record(vcf, truncated_at, rp);
});
});
});
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&vcf));
});
bool did_flush = false;
if (should_flush && cf.can_flush()) {
// TODO:
// this is not really a guarantee at all that we've actually
// gotten all things to disk. Again, need queue-ish or something.
co_await cf.flush();
did_flush = true;
} else {
co_await cf.clear();
}
dblog.debug("Discarding sstable data for truncated CF + indexes");
// TODO: notify truncation
db_clock::time_point truncated_at = co_await tsf();
if (auto_snapshot) {
auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name());
co_await cf.snapshot(*this, name);
}
db::replay_position rp = co_await cf.discard_sstables(truncated_at);
// TODO: indexes.
// Note: since discard_sstables was changed to only count tables owned by this shard,
// we can get zero rp back. Changed assert, and ensure we save at least low_mark.
// #6995 - the assert below was broken in c2c6c71 and remained so for many years.
// We nowadays do not flush tables with sstables but autosnapshot=false. This means
// the low_mark assertion does not hold, because we maybe/probably never got around to
// creating the sstables that would create them.
assert(!did_flush || low_mark <= rp || rp == db::replay_position());
rp = std::max(low_mark, rp);
co_await parallel_for_each(cf.views(), [this, truncated_at, should_flush] (view_ptr v) -> future<> {
auto& vcf = find_column_family(v);
if (should_flush) {
co_await vcf.flush();
} else {
co_await vcf.clear();
}
db::replay_position rp = co_await vcf.discard_sstables(truncated_at);
co_await db::system_keyspace::save_truncation_record(vcf, truncated_at, rp);
});
// save_truncation_record() may actually fail after we cached the truncation time
// but this is not be worse that if failing without caching: at least the correct time
// will be available until next reboot and a client will have to retry truncation anyway.
cf.cache_truncation_record(truncated_at);
co_await db::system_keyspace::save_truncation_record(cf, truncated_at, rp);
drop_repair_history_map_for_table(uuid);
}
const sstring& database::get_snitch_name() const {

View File

@@ -1568,7 +1568,6 @@ public:
/** Truncates the given column family */
future<> truncate(sstring ksname, sstring cfname, timestamp_func);
future<> truncate(const keyspace& ks, column_family& cf, timestamp_func, bool with_snapshot = true);
future<> truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush);
bool update_column_family(schema_ptr s);
future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func, bool with_snapshot = true);