diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index 2654eb2ad4..84f4f18f15 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -193,9 +193,13 @@ future<> run_on_table(sstring op, replica::database& db, std::string keyspace, t std::exception_ptr ex; tasks::tmlogger.debug("Starting {} on {}.{}", op, keyspace, ti.name); try { - co_await func(db.find_column_family(ti.id)); + auto& t = db.find_column_family(ti.id); + auto holder = t.hold(); + co_await func(t); } catch (const replica::no_such_column_family& e) { tasks::tmlogger.warn("Skipping {} of {}.{}: {}", op, keyspace, ti.name, e.what()); + } catch (const gate_closed_exception& e) { + tasks::tmlogger.warn("Skipping {} of {}.{}: {}", op, keyspace, ti.name, e.what()); } catch (...) { ex = std::current_exception(); tasks::tmlogger.error("Failed {} of {}.{}: {}", op, keyspace, ti.name, ex); diff --git a/replica/database.hh b/replica/database.hh index bff06cbd54..b0e3e12938 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -361,6 +361,31 @@ struct table_stats { using storage_options = data_dictionary::storage_options; +// Smart table pointer that guards the table object +// while it's being accessed asynchronously +class table_holder { + gate::holder _holder; + lw_shared_ptr _table_ptr; +public: + explicit table_holder(table&); + + const table& operator*() const noexcept { + return *_table_ptr; + } + + table& operator*() noexcept { + return *_table_ptr; + } + + const table* operator->() const noexcept { + return _table_ptr.operator->(); + } + + table* operator->() noexcept { + return _table_ptr.operator->(); + } +}; + class table : public enable_lw_shared_from_this
, public weakly_referencable
{ public: @@ -783,6 +808,11 @@ public: table(column_family&&) = delete; // 'this' is being captured during construction ~table(); + + table_holder hold() { + return table_holder(*this); + } + const schema_ptr& schema() const { return _schema; } void set_schema(schema_ptr); db::commitlog* commitlog() const; diff --git a/replica/table.cc b/replica/table.cc index 2488a507b7..0b86cdc8ff 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -67,6 +67,11 @@ static seastar::metrics::label keyspace_label("ks"); using namespace std::chrono_literals; +table_holder::table_holder(table& t) + : _holder(t.async_gate()) + , _table_ptr(t.shared_from_this()) +{ } + void table::update_sstables_known_generation(sstables::generation_type generation) { auto gen = generation ? generation.as_int() : 0; if (_sstable_generation_generator) { @@ -1073,10 +1078,13 @@ table::stop() { if (_async_gate.is_closed()) { co_return; } - co_await _async_gate.close(); + // Allow `compaction_group::stop` to stop ongoing compactions + // while they may still hold the table _async_gate + auto gate_closed_fut = _async_gate.close(); co_await await_pending_ops(); co_await parallel_foreach_compaction_group(std::mem_fn(&compaction_group::stop)); co_await _sstable_deletion_gate.close(); + co_await std::move(gate_closed_fut); co_await get_row_cache().invalidate(row_cache::external_updater([this] { for (const compaction_group_ptr& cg : compaction_groups()) { cg->clear_sstables();