diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 87665e19db..a47c2d6e24 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -578,7 +578,7 @@ future<> read_context::lookup_readers(db::timeout_clock::time_point timeout) noe auto& semaphore = this->semaphore(); auto shard = this_shard_id(); - auto querier_opt = db.get_querier_cache().lookup_shard_mutation_querier(cmd->query_uuid, *schema, *ranges, cmd->slice, gts.get(), timeout); + auto querier_opt = db.get_querier_cache().lookup_shard_mutation_querier(cmd->query_uuid, *schema, *ranges, cmd->slice, semaphore, gts.get(), timeout); if (!querier_opt) { _readers[shard] = reader_meta(reader_state::inexistent); @@ -586,20 +586,6 @@ future<> read_context::lookup_readers(db::timeout_clock::time_point timeout) noe } auto& q = *querier_opt; - - if (&q.permit().semaphore() != &semaphore) { - co_await q.close(); - on_internal_error(mmq_log, format("looked-up reader belongs to different semaphore than the one appropriate for this query class: " - "looked-up reader belongs to {} (0x{:x}) the query class appropriate is {} (0x{:x})", - q.permit().semaphore().name(), - reinterpret_cast(&q.permit().semaphore()), - semaphore.name(), - reinterpret_cast(&semaphore))); - } - - // At this point the readers is passed to the semaphore and we are - // safe w.r.t. exceptions. The code between this point and obtaining - // the querier must take care of closing it if an error happens. auto handle = semaphore.register_inactive_read(std::move(q).reader()); _readers[shard] = reader_meta( reader_state::successful_lookup, diff --git a/querier.cc b/querier.cc index b8a0add5d7..6f7ad9e9b8 100644 --- a/querier.cc +++ b/querier.cc @@ -10,6 +10,7 @@ #include "querier.hh" +#include "reader_concurrency_semaphore.hh" #include "schema/schema.hh" #include "log.hh" @@ -24,7 +25,9 @@ enum class can_use { yes, no_schema_version_mismatch, no_ring_pos_mismatch, - no_clustering_pos_mismatch + no_clustering_pos_mismatch, + no_scheduling_group_mismatch, + no_fatal_semaphore_mismatch }; static sstring cannot_use_reason(can_use cu) @@ -39,6 +42,10 @@ static sstring cannot_use_reason(can_use cu) return "ring pos mismatch"; case can_use::no_clustering_pos_mismatch: return "clustering pos mismatch"; + case can_use::no_scheduling_group_mismatch: + return "scheduling group mismatch"; + case can_use::no_fatal_semaphore_mismatch: + return "fatal semaphore mismatch"; } return "unknown reason"; } @@ -152,11 +159,19 @@ static bool ranges_match(const schema& s, dht::partition_ranges_view original_ra } template -static can_use can_be_used_for_page(const Querier& q, const schema& s, const dht::partition_range& range, const query::partition_slice& slice) { +static can_use can_be_used_for_page(querier_cache::is_user_semaphore_func& is_user_semaphore, Querier& q, const schema& s, const dht::partition_range& range, const query::partition_slice& slice, reader_concurrency_semaphore& current_sem) { if (s.version() != q.schema().version()) { return can_use::no_schema_version_mismatch; } + auto& querier_sem = q.permit().semaphore(); + if (&querier_sem != ¤t_sem) { + if (is_user_semaphore(querier_sem) && is_user_semaphore(current_sem)) { + return can_use::no_scheduling_group_mismatch; + } + return can_use::no_fatal_semaphore_mismatch; + } + const auto pos_opt = q.current_position(); if (!pos_opt) { // There was nothing read so far so we assume we are ok. @@ -198,7 +213,7 @@ static std::unique_ptr find_querier(querier_cache::index& index, q return ptr; } -querier_cache::querier_cache(std::function is_user_semaphore_func, std::chrono::seconds entry_ttl) +querier_cache::querier_cache(is_user_semaphore_func is_user_semaphore_func, std::chrono::seconds entry_ttl) : _entry_ttl(entry_ttl), _is_user_semaphore_func(is_user_semaphore_func) { } @@ -307,6 +322,7 @@ std::optional querier_cache::lookup_querier( const schema& s, dht::partition_ranges_view ranges, const query::partition_slice& slice, + reader_concurrency_semaphore& current_sem, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { auto base_ptr = find_querier(index, key, ranges, trace_state); @@ -330,7 +346,7 @@ std::optional querier_cache::lookup_querier( querier_utils::set_reader(q, std::move(*reader_opt)); --stats.population; - const auto can_be_used = can_be_used_for_page(q, s, ranges.front(), slice); + const auto can_be_used = can_be_used_for_page(_is_user_semaphore_func, q, s, ranges.front(), slice, current_sem); if (can_be_used == can_use::yes) { tracing::trace(trace_state, "Reusing querier"); return std::optional(std::move(q)); @@ -339,6 +355,11 @@ std::optional querier_cache::lookup_querier( tracing::trace(trace_state, "Dropping querier because {}", cannot_use_reason(can_be_used)); ++stats.drops; + // Save semaphore name and address for later to use it in + // error/warning message + auto q_semaphore_name = q.permit().semaphore().name(); + auto q_semaphore_address = reinterpret_cast(&q.permit().semaphore()); + // Close and drop the querier in the background. // It is safe to do so, since _closing_gate is closed and // waited on in querier_cache::stop() @@ -346,6 +367,23 @@ std::optional querier_cache::lookup_querier( return q.close().finally([q = std::move(q)] {}); }); + if (can_be_used == can_use::no_scheduling_group_mismatch) { + qlogger.warn("user semaphores mismatch detected. dropping looked-up reader: " + "looked-up reader belongs to {} (0x{:x}) the query class appropriate is {} (0x{:x})", + q_semaphore_name, + q_semaphore_address, + current_sem.name(), + reinterpret_cast(¤t_sem)); + } + else if (can_be_used == can_use::no_fatal_semaphore_mismatch) { + on_internal_error(qlogger, format("looked-up reader belongs to different semaphore than the one appropriate for this query class: " + "looked-up reader belongs to {} (0x{:x}) the query class appropriate is {} (0x{:x})", + q_semaphore_name, + q_semaphore_address, + current_sem.name(), + reinterpret_cast(¤t_sem))); + } + return std::nullopt; } @@ -353,27 +391,30 @@ std::optional querier_cache::lookup_data_querier(query_id key, const schema& s, const dht::partition_range& range, const query::partition_slice& slice, + reader_concurrency_semaphore& current_sem, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { - return lookup_querier(_data_querier_index, key, s, range, slice, std::move(trace_state), timeout); + return lookup_querier(_data_querier_index, key, s, range, slice, current_sem, std::move(trace_state), timeout); } std::optional querier_cache::lookup_mutation_querier(query_id key, const schema& s, const dht::partition_range& range, const query::partition_slice& slice, + reader_concurrency_semaphore& current_sem, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { - return lookup_querier(_mutation_querier_index, key, s, range, slice, std::move(trace_state), timeout); + return lookup_querier(_mutation_querier_index, key, s, range, slice, current_sem, std::move(trace_state), timeout); } std::optional querier_cache::lookup_shard_mutation_querier(query_id key, const schema& s, const dht::partition_range_vector& ranges, const query::partition_slice& slice, + reader_concurrency_semaphore& current_sem, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { - return lookup_querier(_shard_mutation_querier_index, key, s, ranges, slice, + return lookup_querier(_shard_mutation_querier_index, key, s, ranges, slice, current_sem, std::move(trace_state), timeout); } diff --git a/querier.hh b/querier.hh index 79dc997d88..8fba709ccd 100644 --- a/querier.hh +++ b/querier.hh @@ -311,6 +311,7 @@ public: }; using index = std::unordered_multimap>; + using is_user_semaphore_func = std::function; private: index _data_querier_index; @@ -319,7 +320,7 @@ private: std::chrono::seconds _entry_ttl; stats _stats; gate _closing_gate; - [[maybe_unused]] std::function _is_user_semaphore_func; + is_user_semaphore_func _is_user_semaphore_func; private: template @@ -338,11 +339,12 @@ private: const schema& s, dht::partition_ranges_view ranges, const query::partition_slice& slice, + reader_concurrency_semaphore& current_sem, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout); public: - querier_cache(std::function is_user_semaphore_func, std::chrono::seconds entry_ttl = default_entry_ttl); + querier_cache(is_user_semaphore_func is_user_semaphore_func, std::chrono::seconds entry_ttl = default_entry_ttl); querier_cache(const querier_cache&) = delete; querier_cache& operator=(const querier_cache&) = delete; @@ -374,6 +376,7 @@ public: const schema& s, const dht::partition_range& range, const query::partition_slice& slice, + reader_concurrency_semaphore& current_sem, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout); @@ -384,6 +387,7 @@ public: const schema& s, const dht::partition_range& range, const query::partition_slice& slice, + reader_concurrency_semaphore& current_sem, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout); @@ -394,6 +398,7 @@ public: const schema& s, const dht::partition_range_vector& ranges, const query::partition_slice& slice, + reader_concurrency_semaphore& current_sem, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout); diff --git a/replica/database.cc b/replica/database.cc index 263b32f7f9..4b6282c6a0 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1657,7 +1657,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti std::exception_ptr ex; if (cmd.query_uuid && !cmd.is_first_page) { - querier_opt = _querier_cache.lookup_data_querier(cmd.query_uuid, *s, ranges.front(), cmd.slice, trace_state, timeout); + querier_opt = _querier_cache.lookup_data_querier(cmd.query_uuid, *s, ranges.front(), cmd.slice, semaphore, trace_state, timeout); } auto read_func = [&, this] (reader_permit permit) { @@ -1724,7 +1724,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh std::exception_ptr ex; if (cmd.query_uuid && !cmd.is_first_page) { - querier_opt = _querier_cache.lookup_mutation_querier(cmd.query_uuid, *s, range, cmd.slice, trace_state, timeout); + querier_opt = _querier_cache.lookup_mutation_querier(cmd.query_uuid, *s, range, cmd.slice, semaphore, trace_state, timeout); } auto read_func = [&] (reader_permit permit) { diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index c532dd4c6a..0975144faa 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -306,7 +306,7 @@ public: const dht::partition_range& lookup_range, const query::partition_slice& lookup_slice) { - auto querier_opt = _cache.lookup_data_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, nullptr, db::no_timeout); + auto querier_opt = _cache.lookup_data_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, get_semaphore(), nullptr, db::no_timeout); if (querier_opt) { querier_opt->close().get(); } @@ -319,7 +319,7 @@ public: const dht::partition_range& lookup_range, const query::partition_slice& lookup_slice) { - auto querier_opt = _cache.lookup_mutation_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, nullptr, db::no_timeout); + auto querier_opt = _cache.lookup_mutation_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, get_semaphore(), nullptr, db::no_timeout); if (querier_opt) { querier_opt->close().get(); }