Merge 'Guard tables in compaction tasks' from Benny Halevy
Currently, if a compaction function enters the table or compaction_group async_gate, we can't stop it on the table/compaction_group stop path as they co_await their respective async_gate.close(). This series introduces a table_ptr smart pointer to guards the table object by entering its async_gate, and it also defers awaiting the gate.close future till after stopping ongoing compaction so that closing the gate will prevent starting new compactions while ongoing compaction can be stopped and finally awaiting the close() future will wait for them to unwind and exit the gate after being stopped. Fixes #16305 Closes scylladb/scylladb#16351 * github.com:scylladb/scylladb: compaction: run_on_table: skip compaction also on gate_closed_exception compaction: run_on_table: hold table table: add table_holder and hold method table: stop: allow compactions to be stopped while closing async_gate
This commit is contained in:
@@ -193,9 +193,13 @@ future<> run_on_table(sstring op, replica::database& db, std::string keyspace, t
|
||||
std::exception_ptr ex;
|
||||
tasks::tmlogger.debug("Starting {} on {}.{}", op, keyspace, ti.name);
|
||||
try {
|
||||
co_await func(db.find_column_family(ti.id));
|
||||
auto& t = db.find_column_family(ti.id);
|
||||
auto holder = t.hold();
|
||||
co_await func(t);
|
||||
} catch (const replica::no_such_column_family& e) {
|
||||
tasks::tmlogger.warn("Skipping {} of {}.{}: {}", op, keyspace, ti.name, e.what());
|
||||
} catch (const gate_closed_exception& e) {
|
||||
tasks::tmlogger.warn("Skipping {} of {}.{}: {}", op, keyspace, ti.name, e.what());
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
tasks::tmlogger.error("Failed {} of {}.{}: {}", op, keyspace, ti.name, ex);
|
||||
|
||||
@@ -361,6 +361,31 @@ struct table_stats {
|
||||
|
||||
using storage_options = data_dictionary::storage_options;
|
||||
|
||||
// Smart table pointer that guards the table object
|
||||
// while it's being accessed asynchronously
|
||||
class table_holder {
|
||||
gate::holder _holder;
|
||||
lw_shared_ptr<table> _table_ptr;
|
||||
public:
|
||||
explicit table_holder(table&);
|
||||
|
||||
const table& operator*() const noexcept {
|
||||
return *_table_ptr;
|
||||
}
|
||||
|
||||
table& operator*() noexcept {
|
||||
return *_table_ptr;
|
||||
}
|
||||
|
||||
const table* operator->() const noexcept {
|
||||
return _table_ptr.operator->();
|
||||
}
|
||||
|
||||
table* operator->() noexcept {
|
||||
return _table_ptr.operator->();
|
||||
}
|
||||
};
|
||||
|
||||
class table : public enable_lw_shared_from_this<table>
|
||||
, public weakly_referencable<table> {
|
||||
public:
|
||||
@@ -783,6 +808,11 @@ public:
|
||||
|
||||
table(column_family&&) = delete; // 'this' is being captured during construction
|
||||
~table();
|
||||
|
||||
table_holder hold() {
|
||||
return table_holder(*this);
|
||||
}
|
||||
|
||||
const schema_ptr& schema() const { return _schema; }
|
||||
void set_schema(schema_ptr);
|
||||
db::commitlog* commitlog() const;
|
||||
|
||||
@@ -67,6 +67,11 @@ static seastar::metrics::label keyspace_label("ks");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
table_holder::table_holder(table& t)
|
||||
: _holder(t.async_gate())
|
||||
, _table_ptr(t.shared_from_this())
|
||||
{ }
|
||||
|
||||
void table::update_sstables_known_generation(sstables::generation_type generation) {
|
||||
auto gen = generation ? generation.as_int() : 0;
|
||||
if (_sstable_generation_generator) {
|
||||
@@ -1073,10 +1078,13 @@ table::stop() {
|
||||
if (_async_gate.is_closed()) {
|
||||
co_return;
|
||||
}
|
||||
co_await _async_gate.close();
|
||||
// Allow `compaction_group::stop` to stop ongoing compactions
|
||||
// while they may still hold the table _async_gate
|
||||
auto gate_closed_fut = _async_gate.close();
|
||||
co_await await_pending_ops();
|
||||
co_await parallel_foreach_compaction_group(std::mem_fn(&compaction_group::stop));
|
||||
co_await _sstable_deletion_gate.close();
|
||||
co_await std::move(gate_closed_fut);
|
||||
co_await get_row_cache().invalidate(row_cache::external_updater([this] {
|
||||
for (const compaction_group_ptr& cg : compaction_groups()) {
|
||||
cg->clear_sstables();
|
||||
|
||||
Reference in New Issue
Block a user