querier_cache: check semaphore mismatch during querier lookup
Previously semaphore mismatch was checked only in multi-partition queries and if happened, an internal error was thrown. This commit pushed the check down to `querier_cache`, so each `lookup_*_querier` method will check for the mismatch. What's more, if semaphore mismatch occurs, check whether both semaphores belong to user. If so, log a warning and drop cached reader instead of throwing an error. The mismatch can happen if user's scheduling group changed during a query. We don't want to throw an error then, but drop and reset cached reader.
This commit is contained in:
@@ -578,7 +578,7 @@ future<> read_context::lookup_readers(db::timeout_clock::time_point timeout) noe
|
||||
auto& semaphore = this->semaphore();
|
||||
auto shard = this_shard_id();
|
||||
|
||||
auto querier_opt = db.get_querier_cache().lookup_shard_mutation_querier(cmd->query_uuid, *schema, *ranges, cmd->slice, gts.get(), timeout);
|
||||
auto querier_opt = db.get_querier_cache().lookup_shard_mutation_querier(cmd->query_uuid, *schema, *ranges, cmd->slice, semaphore, gts.get(), timeout);
|
||||
|
||||
if (!querier_opt) {
|
||||
_readers[shard] = reader_meta(reader_state::inexistent);
|
||||
@@ -586,20 +586,6 @@ future<> read_context::lookup_readers(db::timeout_clock::time_point timeout) noe
|
||||
}
|
||||
|
||||
auto& q = *querier_opt;
|
||||
|
||||
if (&q.permit().semaphore() != &semaphore) {
|
||||
co_await q.close();
|
||||
on_internal_error(mmq_log, format("looked-up reader belongs to different semaphore than the one appropriate for this query class: "
|
||||
"looked-up reader belongs to {} (0x{:x}) the query class appropriate is {} (0x{:x})",
|
||||
q.permit().semaphore().name(),
|
||||
reinterpret_cast<uintptr_t>(&q.permit().semaphore()),
|
||||
semaphore.name(),
|
||||
reinterpret_cast<uintptr_t>(&semaphore)));
|
||||
}
|
||||
|
||||
// At this point the readers is passed to the semaphore and we are
|
||||
// safe w.r.t. exceptions. The code between this point and obtaining
|
||||
// the querier must take care of closing it if an error happens.
|
||||
auto handle = semaphore.register_inactive_read(std::move(q).reader());
|
||||
_readers[shard] = reader_meta(
|
||||
reader_state::successful_lookup,
|
||||
|
||||
55
querier.cc
55
querier.cc
@@ -10,6 +10,7 @@
|
||||
|
||||
#include "querier.hh"
|
||||
|
||||
#include "reader_concurrency_semaphore.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include "log.hh"
|
||||
|
||||
@@ -24,7 +25,9 @@ enum class can_use {
|
||||
yes,
|
||||
no_schema_version_mismatch,
|
||||
no_ring_pos_mismatch,
|
||||
no_clustering_pos_mismatch
|
||||
no_clustering_pos_mismatch,
|
||||
no_scheduling_group_mismatch,
|
||||
no_fatal_semaphore_mismatch
|
||||
};
|
||||
|
||||
static sstring cannot_use_reason(can_use cu)
|
||||
@@ -39,6 +42,10 @@ static sstring cannot_use_reason(can_use cu)
|
||||
return "ring pos mismatch";
|
||||
case can_use::no_clustering_pos_mismatch:
|
||||
return "clustering pos mismatch";
|
||||
case can_use::no_scheduling_group_mismatch:
|
||||
return "scheduling group mismatch";
|
||||
case can_use::no_fatal_semaphore_mismatch:
|
||||
return "fatal semaphore mismatch";
|
||||
}
|
||||
return "unknown reason";
|
||||
}
|
||||
@@ -152,11 +159,19 @@ static bool ranges_match(const schema& s, dht::partition_ranges_view original_ra
|
||||
}
|
||||
|
||||
template <typename Querier>
|
||||
static can_use can_be_used_for_page(const Querier& q, const schema& s, const dht::partition_range& range, const query::partition_slice& slice) {
|
||||
static can_use can_be_used_for_page(querier_cache::is_user_semaphore_func& is_user_semaphore, Querier& q, const schema& s, const dht::partition_range& range, const query::partition_slice& slice, reader_concurrency_semaphore& current_sem) {
|
||||
if (s.version() != q.schema().version()) {
|
||||
return can_use::no_schema_version_mismatch;
|
||||
}
|
||||
|
||||
auto& querier_sem = q.permit().semaphore();
|
||||
if (&querier_sem != ¤t_sem) {
|
||||
if (is_user_semaphore(querier_sem) && is_user_semaphore(current_sem)) {
|
||||
return can_use::no_scheduling_group_mismatch;
|
||||
}
|
||||
return can_use::no_fatal_semaphore_mismatch;
|
||||
}
|
||||
|
||||
const auto pos_opt = q.current_position();
|
||||
if (!pos_opt) {
|
||||
// There was nothing read so far so we assume we are ok.
|
||||
@@ -198,7 +213,7 @@ static std::unique_ptr<querier_base> find_querier(querier_cache::index& index, q
|
||||
return ptr;
|
||||
}
|
||||
|
||||
querier_cache::querier_cache(std::function<bool(const reader_concurrency_semaphore&)> is_user_semaphore_func, std::chrono::seconds entry_ttl)
|
||||
querier_cache::querier_cache(is_user_semaphore_func is_user_semaphore_func, std::chrono::seconds entry_ttl)
|
||||
: _entry_ttl(entry_ttl), _is_user_semaphore_func(is_user_semaphore_func) {
|
||||
}
|
||||
|
||||
@@ -307,6 +322,7 @@ std::optional<Querier> querier_cache::lookup_querier(
|
||||
const schema& s,
|
||||
dht::partition_ranges_view ranges,
|
||||
const query::partition_slice& slice,
|
||||
reader_concurrency_semaphore& current_sem,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
auto base_ptr = find_querier(index, key, ranges, trace_state);
|
||||
@@ -330,7 +346,7 @@ std::optional<Querier> querier_cache::lookup_querier(
|
||||
querier_utils::set_reader(q, std::move(*reader_opt));
|
||||
--stats.population;
|
||||
|
||||
const auto can_be_used = can_be_used_for_page(q, s, ranges.front(), slice);
|
||||
const auto can_be_used = can_be_used_for_page(_is_user_semaphore_func, q, s, ranges.front(), slice, current_sem);
|
||||
if (can_be_used == can_use::yes) {
|
||||
tracing::trace(trace_state, "Reusing querier");
|
||||
return std::optional<Querier>(std::move(q));
|
||||
@@ -339,6 +355,11 @@ std::optional<Querier> querier_cache::lookup_querier(
|
||||
tracing::trace(trace_state, "Dropping querier because {}", cannot_use_reason(can_be_used));
|
||||
++stats.drops;
|
||||
|
||||
// Save semaphore name and address for later to use it in
|
||||
// error/warning message
|
||||
auto q_semaphore_name = q.permit().semaphore().name();
|
||||
auto q_semaphore_address = reinterpret_cast<uintptr_t>(&q.permit().semaphore());
|
||||
|
||||
// Close and drop the querier in the background.
|
||||
// It is safe to do so, since _closing_gate is closed and
|
||||
// waited on in querier_cache::stop()
|
||||
@@ -346,6 +367,23 @@ std::optional<Querier> querier_cache::lookup_querier(
|
||||
return q.close().finally([q = std::move(q)] {});
|
||||
});
|
||||
|
||||
if (can_be_used == can_use::no_scheduling_group_mismatch) {
|
||||
qlogger.warn("user semaphores mismatch detected. dropping looked-up reader: "
|
||||
"looked-up reader belongs to {} (0x{:x}) the query class appropriate is {} (0x{:x})",
|
||||
q_semaphore_name,
|
||||
q_semaphore_address,
|
||||
current_sem.name(),
|
||||
reinterpret_cast<uintptr_t>(¤t_sem));
|
||||
}
|
||||
else if (can_be_used == can_use::no_fatal_semaphore_mismatch) {
|
||||
on_internal_error(qlogger, format("looked-up reader belongs to different semaphore than the one appropriate for this query class: "
|
||||
"looked-up reader belongs to {} (0x{:x}) the query class appropriate is {} (0x{:x})",
|
||||
q_semaphore_name,
|
||||
q_semaphore_address,
|
||||
current_sem.name(),
|
||||
reinterpret_cast<uintptr_t>(¤t_sem)));
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
@@ -353,27 +391,30 @@ std::optional<querier> querier_cache::lookup_data_querier(query_id key,
|
||||
const schema& s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
reader_concurrency_semaphore& current_sem,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
return lookup_querier<querier>(_data_querier_index, key, s, range, slice, std::move(trace_state), timeout);
|
||||
return lookup_querier<querier>(_data_querier_index, key, s, range, slice, current_sem, std::move(trace_state), timeout);
|
||||
}
|
||||
|
||||
std::optional<querier> querier_cache::lookup_mutation_querier(query_id key,
|
||||
const schema& s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
reader_concurrency_semaphore& current_sem,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
return lookup_querier<querier>(_mutation_querier_index, key, s, range, slice, std::move(trace_state), timeout);
|
||||
return lookup_querier<querier>(_mutation_querier_index, key, s, range, slice, current_sem, std::move(trace_state), timeout);
|
||||
}
|
||||
|
||||
std::optional<shard_mutation_querier> querier_cache::lookup_shard_mutation_querier(query_id key,
|
||||
const schema& s,
|
||||
const dht::partition_range_vector& ranges,
|
||||
const query::partition_slice& slice,
|
||||
reader_concurrency_semaphore& current_sem,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
return lookup_querier<shard_mutation_querier>(_shard_mutation_querier_index, key, s, ranges, slice,
|
||||
return lookup_querier<shard_mutation_querier>(_shard_mutation_querier_index, key, s, ranges, slice, current_sem,
|
||||
std::move(trace_state), timeout);
|
||||
}
|
||||
|
||||
|
||||
@@ -311,6 +311,7 @@ public:
|
||||
};
|
||||
|
||||
using index = std::unordered_multimap<query_id, std::unique_ptr<querier_base>>;
|
||||
using is_user_semaphore_func = std::function<bool(const reader_concurrency_semaphore&)>;
|
||||
|
||||
private:
|
||||
index _data_querier_index;
|
||||
@@ -319,7 +320,7 @@ private:
|
||||
std::chrono::seconds _entry_ttl;
|
||||
stats _stats;
|
||||
gate _closing_gate;
|
||||
[[maybe_unused]] std::function<bool(const reader_concurrency_semaphore&)> _is_user_semaphore_func;
|
||||
is_user_semaphore_func _is_user_semaphore_func;
|
||||
|
||||
private:
|
||||
template <typename Querier>
|
||||
@@ -338,11 +339,12 @@ private:
|
||||
const schema& s,
|
||||
dht::partition_ranges_view ranges,
|
||||
const query::partition_slice& slice,
|
||||
reader_concurrency_semaphore& current_sem,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout);
|
||||
|
||||
public:
|
||||
querier_cache(std::function<bool(const reader_concurrency_semaphore&)> is_user_semaphore_func, std::chrono::seconds entry_ttl = default_entry_ttl);
|
||||
querier_cache(is_user_semaphore_func is_user_semaphore_func, std::chrono::seconds entry_ttl = default_entry_ttl);
|
||||
|
||||
querier_cache(const querier_cache&) = delete;
|
||||
querier_cache& operator=(const querier_cache&) = delete;
|
||||
@@ -374,6 +376,7 @@ public:
|
||||
const schema& s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
reader_concurrency_semaphore& current_sem,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout);
|
||||
|
||||
@@ -384,6 +387,7 @@ public:
|
||||
const schema& s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
reader_concurrency_semaphore& current_sem,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout);
|
||||
|
||||
@@ -394,6 +398,7 @@ public:
|
||||
const schema& s,
|
||||
const dht::partition_range_vector& ranges,
|
||||
const query::partition_slice& slice,
|
||||
reader_concurrency_semaphore& current_sem,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout);
|
||||
|
||||
|
||||
@@ -1657,7 +1657,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti
|
||||
std::exception_ptr ex;
|
||||
|
||||
if (cmd.query_uuid && !cmd.is_first_page) {
|
||||
querier_opt = _querier_cache.lookup_data_querier(cmd.query_uuid, *s, ranges.front(), cmd.slice, trace_state, timeout);
|
||||
querier_opt = _querier_cache.lookup_data_querier(cmd.query_uuid, *s, ranges.front(), cmd.slice, semaphore, trace_state, timeout);
|
||||
}
|
||||
|
||||
auto read_func = [&, this] (reader_permit permit) {
|
||||
@@ -1724,7 +1724,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
|
||||
std::exception_ptr ex;
|
||||
|
||||
if (cmd.query_uuid && !cmd.is_first_page) {
|
||||
querier_opt = _querier_cache.lookup_mutation_querier(cmd.query_uuid, *s, range, cmd.slice, trace_state, timeout);
|
||||
querier_opt = _querier_cache.lookup_mutation_querier(cmd.query_uuid, *s, range, cmd.slice, semaphore, trace_state, timeout);
|
||||
}
|
||||
|
||||
auto read_func = [&] (reader_permit permit) {
|
||||
|
||||
@@ -306,7 +306,7 @@ public:
|
||||
const dht::partition_range& lookup_range,
|
||||
const query::partition_slice& lookup_slice) {
|
||||
|
||||
auto querier_opt = _cache.lookup_data_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, nullptr, db::no_timeout);
|
||||
auto querier_opt = _cache.lookup_data_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, get_semaphore(), nullptr, db::no_timeout);
|
||||
if (querier_opt) {
|
||||
querier_opt->close().get();
|
||||
}
|
||||
@@ -319,7 +319,7 @@ public:
|
||||
const dht::partition_range& lookup_range,
|
||||
const query::partition_slice& lookup_slice) {
|
||||
|
||||
auto querier_opt = _cache.lookup_mutation_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, nullptr, db::no_timeout);
|
||||
auto querier_opt = _cache.lookup_mutation_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, get_semaphore(), nullptr, db::no_timeout);
|
||||
if (querier_opt) {
|
||||
querier_opt->close().get();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user