diff --git a/database.cc b/database.cc index e8126449e1..95712abae8 100644 --- a/database.cc +++ b/database.cc @@ -1639,9 +1639,30 @@ schema_ptr database::find_indexed_table(const sstring& ks_name, const sstring& i return nullptr; } +future<> database::close_tables(table_kind kind_to_close) { + return parallel_for_each(_column_families, [this, kind_to_close](auto& val_pair) { + table_kind k = is_system_table(*val_pair.second->schema()) ? table_kind::system : table_kind::user; + if (k == kind_to_close) { + return val_pair.second->stop(); + } else { + return make_ready_future<>(); + } + }); +} + future<> stop_database(sharded& sdb) { return sdb.invoke_on_all([](database& db) { return db.get_compaction_manager().stop(); + }).then([&sdb] { + // Closing a table can cause us to find a large partition. Since we want to record that, we have to close + // system.large_partitions after the regular tables. + return sdb.invoke_on_all([](database& db) { + return db.close_tables(database::table_kind::user); + }); + }).then([&sdb] { + return sdb.invoke_on_all([](database& db) { + return db.close_tables(database::table_kind::system); + }); }).then([&sdb] { return sdb.invoke_on_all([](database& db) { db.stop_large_data_handler(); @@ -1659,10 +1680,6 @@ database::stop() { // try to ensure that CL has done disk flushing future<> maybe_shutdown_commitlog = _commitlog != nullptr ? _commitlog->shutdown() : make_ready_future<>(); return maybe_shutdown_commitlog.then([this] { - return parallel_for_each(_column_families, [this] (auto& val_pair) { - return val_pair.second->stop(); - }); - }).then([this] { return _view_update_concurrency_sem.wait(max_memory_pending_view_updates()); }).then([this] { if (_commitlog != nullptr) { diff --git a/database.hh b/database.hh index ae55c6fd02..79e5699c71 100644 --- a/database.hh +++ b/database.hh @@ -1171,6 +1171,12 @@ struct database_config { // use shard_of() for data class database { +public: + enum class table_kind { + system, + user, + }; + private: ::cf_stats _cf_stats; static const size_t max_count_concurrent_reads{100}; @@ -1357,6 +1363,8 @@ public: std::optional index_name_root) const; schema_ptr find_indexed_table(const sstring& ks_name, const sstring& index_name) const; future<> stop(); + future<> close_tables(table_kind kind_to_close); + void stop_large_data_handler(); unsigned shard_of(const dht::token& t); unsigned shard_of(const mutation& m); diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index 0e6e6d236d..be2f152600 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -28,7 +28,8 @@ namespace db { future<> large_data_handler::maybe_update_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const { - if (!_stopped && partition_size > _partition_threshold_bytes) { + assert(!_stopped); + if (partition_size > _partition_threshold_bytes) { ++_stats.partitions_bigger_than_threshold; const schema& s = *sst.get_schema(); diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index 1d86d9a4e5..cd992347e5 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -68,9 +68,7 @@ public: future<> maybe_update_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const; future<> maybe_delete_large_data_entries(const schema& s, const sstring& filename, uint64_t data_size) const { - if (_stopped) { - return make_ready_future<>(); - } + assert(!_stopped); future<> large_partitions = make_ready_future<>(); if (__builtin_expect(data_size > _partition_threshold_bytes, false)) { large_partitions = delete_large_partitions_entry(s, filename);