db: stop the commit log after the tables during shutdown

This allows for system.large_partitions to be updated if a large
partition is found while writing the last sstables.

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
This commit is contained in:
Rafael Ávila de Espíndola
2019-02-28 12:25:42 -08:00
parent a3e1f14134
commit 16ed9a2574
4 changed files with 32 additions and 8 deletions

View File

@@ -1639,9 +1639,30 @@ schema_ptr database::find_indexed_table(const sstring& ks_name, const sstring& i
return nullptr;
}
future<> database::close_tables(table_kind kind_to_close) {
return parallel_for_each(_column_families, [this, kind_to_close](auto& val_pair) {
table_kind k = is_system_table(*val_pair.second->schema()) ? table_kind::system : table_kind::user;
if (k == kind_to_close) {
return val_pair.second->stop();
} else {
return make_ready_future<>();
}
});
}
future<> stop_database(sharded<database>& sdb) {
return sdb.invoke_on_all([](database& db) {
return db.get_compaction_manager().stop();
}).then([&sdb] {
// Closing a table can cause us to find a large partition. Since we want to record that, we have to close
// system.large_partitions after the regular tables.
return sdb.invoke_on_all([](database& db) {
return db.close_tables(database::table_kind::user);
});
}).then([&sdb] {
return sdb.invoke_on_all([](database& db) {
return db.close_tables(database::table_kind::system);
});
}).then([&sdb] {
return sdb.invoke_on_all([](database& db) {
db.stop_large_data_handler();
@@ -1659,10 +1680,6 @@ database::stop() {
// try to ensure that CL has done disk flushing
future<> maybe_shutdown_commitlog = _commitlog != nullptr ? _commitlog->shutdown() : make_ready_future<>();
return maybe_shutdown_commitlog.then([this] {
return parallel_for_each(_column_families, [this] (auto& val_pair) {
return val_pair.second->stop();
});
}).then([this] {
return _view_update_concurrency_sem.wait(max_memory_pending_view_updates());
}).then([this] {
if (_commitlog != nullptr) {

View File

@@ -1171,6 +1171,12 @@ struct database_config {
// use shard_of() for data
class database {
public:
enum class table_kind {
system,
user,
};
private:
::cf_stats _cf_stats;
static const size_t max_count_concurrent_reads{100};
@@ -1357,6 +1363,8 @@ public:
std::optional<sstring> index_name_root) const;
schema_ptr find_indexed_table(const sstring& ks_name, const sstring& index_name) const;
future<> stop();
future<> close_tables(table_kind kind_to_close);
void stop_large_data_handler();
unsigned shard_of(const dht::token& t);
unsigned shard_of(const mutation& m);

View File

@@ -28,7 +28,8 @@
namespace db {
future<> large_data_handler::maybe_update_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size) const {
if (!_stopped && partition_size > _partition_threshold_bytes) {
assert(!_stopped);
if (partition_size > _partition_threshold_bytes) {
++_stats.partitions_bigger_than_threshold;
const schema& s = *sst.get_schema();

View File

@@ -68,9 +68,7 @@ public:
future<> maybe_update_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const;
future<> maybe_delete_large_data_entries(const schema& s, const sstring& filename, uint64_t data_size) const {
if (_stopped) {
return make_ready_future<>();
}
assert(!_stopped);
future<> large_partitions = make_ready_future<>();
if (__builtin_expect(data_size > _partition_threshold_bytes, false)) {
large_partitions = delete_large_partitions_entry(s, filename);