querier_cache: implement stop

Close the _closing_gate to wait on background
close of dropped queries, and close all remaining queriers.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2021-02-14 16:28:12 +02:00
parent 87c62b5f59
commit 7c7569f0ad
4 changed files with 23 additions and 2 deletions

View File

@@ -2016,7 +2016,9 @@ database::stop() {
_read_concurrency_sem.stop(),
_streaming_concurrency_sem.stop(),
_compaction_concurrency_sem.stop(),
_system_read_concurrency_sem.stop()).discard_result();
_system_read_concurrency_sem.stop()).discard_result().finally([this] {
return _querier_cache.stop();
});
});
}

View File

@@ -348,7 +348,8 @@ std::optional<Querier> querier_cache::lookup_querier(
++stats.drops;
// Close and drop the querier in the background.
// TODO: wait on _closing_gate in querier_cache::close()
// It is safe to do so, since _closing_gate is closed and
// waited on in querier_cache::stop()
(void)with_gate(_closing_gate, [this, q = std::move(q)] () mutable {
return q.close().finally([q = std::move(q)] {});
});
@@ -437,6 +438,18 @@ future<> querier_cache::evict_all_for_table(const utils::UUID& schema_id) noexce
co_return;
}
future<> querier_cache::stop() noexcept {
co_await _closing_gate.close();
for (auto* ip : {&_data_querier_index, &_mutation_querier_index, &_shard_mutation_querier_index}) {
auto& idx = *ip;
for (auto it = idx.begin(); it != idx.end(); it = idx.erase(it)) {
co_await it->second->close();
--_stats.population;
}
}
}
querier_cache_context::querier_cache_context(querier_cache& cache, utils::UUID key, query::is_first_page is_first_page)
: _cache(&cache)
, _key(key)

View File

@@ -450,6 +450,11 @@ public:
/// Should be used when dropping a table.
future<> evict_all_for_table(const utils::UUID& schema_id) noexcept;
/// Close all queriers and wait on background work.
///
/// Should be used before destroying the querier_cache.
future<> stop() noexcept;
const stats& get_stats() const {
return _stats;
}

View File

@@ -177,6 +177,7 @@ public:
}
~test_querier_cache() {
_cache.stop().get();
_sem.stop().get();
}