diff --git a/querier.cc b/querier.cc index b19cae8507..453bae265c 100644 --- a/querier.cc +++ b/querier.cc @@ -160,7 +160,7 @@ static bool ranges_match(const schema& s, dht::partition_ranges_view original_ra template static can_use can_be_used_for_page(const Querier& q, const schema& s, const dht::partition_range& range, const query::partition_slice& slice) { - if (s.version() != q.schema()->version()) { + if (s.version() != q.schema().version()) { return can_use::no_schema_version_mismatch; } @@ -191,7 +191,7 @@ void querier_cache::scan_cache_entries() { while (it != end && it->is_expired(now)) { ++_stats.time_based_evictions; --_stats.population; - it->permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle()); + it->value().permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle()); it = _entries.erase(it); } } @@ -206,7 +206,7 @@ static querier_cache::entries::iterator find_querier(querier_cache::entries& ent } const auto it = std::find_if(queriers.first, queriers.second, [&] (const querier_cache::entry& e) { - return ranges_match(e.schema(), e.ranges(), ranges); + return ranges_match(e.value().schema(), e.value().ranges(), ranges); }); if (it == queriers.second) { @@ -264,7 +264,8 @@ static void insert_querier( tracing::trace(trace_state, "Caching querier with key {}", key); - auto memory_usage = boost::accumulate(entries | boost::adaptors::transformed(std::mem_fn(&querier_cache::entry::memory_usage)), size_t(0)); + auto memory_usage = boost::accumulate(entries | boost::adaptors::transformed( + [] (const querier_cache::entry& e) { return e.value().memory_usage(); }), size_t(0)); // We add the memory-usage of the to-be added querier to the memory-usage // of all the cached queriers. We now need to makes sure this number is @@ -276,8 +277,8 @@ static void insert_querier( if (memory_usage >= max_queriers_memory_usage) { auto it = entries.begin(); while (it != entries.end() && memory_usage >= max_queriers_memory_usage) { - memory_usage -= it->memory_usage(); - it->permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle()); + memory_usage -= it->value().memory_usage(); + it->value().permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle()); it = entries.erase(it); --stats.population; ++stats.memory_based_evictions; @@ -328,7 +329,11 @@ static std::optional lookup_querier( return std::nullopt; } - auto q = std::move(*it).template value(); + auto* q_ptr = dynamic_cast(&it->value()); + if (!q_ptr) { + throw std::runtime_error("lookup_querier(): found querier is not of the expected type"); + } + auto q = std::move(*q_ptr); q.permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle()); entries.erase(it); --stats.population; @@ -381,7 +386,7 @@ bool querier_cache::evict_one() { ++_stats.resource_based_evictions; --_stats.population; - auto& sem = _entries.front().permit().semaphore(); + auto& sem = _entries.front().value().permit().semaphore(); sem.unregister_inactive_read(std::move(_entries.front()).get_inactive_handle()); _entries.pop_front(); @@ -392,9 +397,9 @@ void querier_cache::evict_all_for_table(const utils::UUID& schema_id) { auto it = _entries.begin(); const auto end = _entries.end(); while (it != end) { - if (it->schema().id() == schema_id) { + if (it->value().schema().id() == schema_id) { --_stats.population; - it->permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle()); + it->value().permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle()); it = _entries.erase(it); } else { ++it; diff --git a/querier.hh b/querier.hh index 4f81056c10..1e3f5691f5 100644 --- a/querier.hh +++ b/querier.hh @@ -116,6 +116,64 @@ struct position_view { const clustering_key_prefix* clustering_key; }; +class querier_base { +protected: + schema_ptr _schema; + reader_permit _permit; + std::unique_ptr _range; + std::unique_ptr _slice; + flat_mutation_reader _reader; + dht::partition_ranges_view _query_ranges; + +public: + querier_base(reader_permit permit, std::unique_ptr range, + std::unique_ptr slice, flat_mutation_reader reader, dht::partition_ranges_view query_ranges) + : _schema(reader.schema()) + , _permit(std::move(permit)) + , _range(std::move(range)) + , _slice(std::move(slice)) + , _reader(std::move(reader)) + , _query_ranges(query_ranges) + { } + + querier_base(schema_ptr schema, reader_permit permit, dht::partition_range range, + query::partition_slice slice, const mutation_source& ms, const io_priority_class& pc, tracing::trace_state_ptr trace_ptr) + : _schema(std::move(schema)) + , _permit(std::move(permit)) + , _range(std::make_unique(std::move(range))) + , _slice(std::make_unique(std::move(slice))) + , _reader(ms.make_reader(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) + , _query_ranges(*_range) + { } + + querier_base(querier_base&&) = default; + querier_base& operator=(querier_base&&) = default; + + virtual ~querier_base() = default; + + const ::schema& schema() const { + return *_schema; + } + + reader_permit& permit() { + return _permit; + } + + bool is_reversed() const { + return _slice->options.contains(query::partition_slice::option::reversed); + } + + virtual position_view current_position() const = 0; + + dht::partition_ranges_view ranges() const { + return _query_ranges; + } + + size_t memory_usage() const { + return _reader.buffer_size(); + } +}; + /// One-stop object for serving queries. /// /// Encapsulates all state and logic for serving all pages for a given range @@ -138,12 +196,7 @@ struct position_view { /// page. It should be dropped instead and a new one should be created /// instead. template -class querier { - schema_ptr _schema; - reader_permit _permit; - std::unique_ptr _range; - std::unique_ptr _slice; - flat_mutation_reader _reader; +class querier : public querier_base { lw_shared_ptr> _compaction_state; std::optional _last_ckey; @@ -155,19 +208,10 @@ public: query::partition_slice slice, const io_priority_class& pc, tracing::trace_state_ptr trace_ptr) - : _schema(schema) - , _permit(permit) - , _range(std::make_unique(std::move(range))) - , _slice(std::make_unique(std::move(slice))) - , _reader(ms.make_reader(schema, std::move(permit), *_range, *_slice, pc, std::move(trace_ptr), - streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) + : querier_base(schema, permit, std::move(range), std::move(slice), ms, pc, std::move(trace_ptr)) , _compaction_state(make_lw_shared>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) { } - bool is_reversed() const { - return _slice->options.contains(query::partition_slice::option::reversed); - } - bool are_limits_reached() const { return _compaction_state->are_limits_reached(); } @@ -194,27 +238,11 @@ public: }); } - size_t memory_usage() const { - return _reader.buffer_size(); - } - - schema_ptr schema() const { - return _schema; - } - - reader_permit& permit() { - return _permit; - } - - position_view current_position() const { + virtual position_view current_position() const override { const dht::decorated_key* dk = _compaction_state->current_partition(); const clustering_key_prefix* clustering_key = _last_ckey ? &*_last_ckey : nullptr; return {dk, clustering_key}; } - - dht::partition_ranges_view ranges() const { - return *_range; - } }; using data_querier = querier; @@ -229,15 +257,27 @@ using mutation_querier = querier; /// created with (similar to other queriers). /// For position validation purposes (at lookup) the reader's position is /// considered to be the same as that of the query. -class shard_mutation_querier { - dht::partition_range_vector _query_ranges; - std::unique_ptr _reader_range; - std::unique_ptr _reader_slice; - flat_mutation_reader _reader; - reader_permit _permit; +class shard_mutation_querier : public querier_base { + std::unique_ptr _query_ranges; dht::decorated_key _nominal_pkey; std::optional _nominal_ckey; +private: + shard_mutation_querier( + std::unique_ptr query_ranges, + std::unique_ptr reader_range, + std::unique_ptr reader_slice, + flat_mutation_reader reader, + reader_permit permit, + dht::decorated_key nominal_pkey, + std::optional nominal_ckey) + : querier_base(permit, std::move(reader_range), std::move(reader_slice), std::move(reader), *query_ranges) + , _query_ranges(std::move(query_ranges)) + , _nominal_pkey(std::move(nominal_pkey)) + , _nominal_ckey(std::move(nominal_ckey)) { + } + + public: shard_mutation_querier( const dht::partition_range_vector query_ranges, @@ -247,45 +287,20 @@ public: reader_permit permit, dht::decorated_key nominal_pkey, std::optional nominal_ckey) - : _query_ranges(std::move(query_ranges)) - , _reader_range(std::move(reader_range)) - , _reader_slice(std::move(reader_slice)) - , _reader(std::move(reader)) - , _permit(std::move(permit)) - , _nominal_pkey(std::move(nominal_pkey)) - , _nominal_ckey(std::move(nominal_ckey)) { + : shard_mutation_querier(std::make_unique(std::move(query_ranges)), std::move(reader_range), + std::move(reader_slice), std::move(reader), std::move(permit), std::move(nominal_pkey), std::move(nominal_ckey)) { } - bool is_reversed() const { - return _reader_slice->options.contains(query::partition_slice::option::reversed); - } - - size_t memory_usage() const { - return _reader.buffer_size(); - } - - schema_ptr schema() const { - return _reader.schema(); - } - - reader_permit& permit() { - return _permit; - } - - position_view current_position() const { + virtual position_view current_position() const override { return {&_nominal_pkey, _nominal_ckey ? &*_nominal_ckey : nullptr}; } - dht::partition_ranges_view ranges() const { - return _query_ranges; - } - std::unique_ptr reader_range() && { - return std::move(_reader_range); + return std::move(_range); } std::unique_ptr reader_slice() && { - return std::move(_reader_slice); + return std::move(_slice); } flat_mutation_reader reader() && { @@ -349,7 +364,7 @@ public: std::list::iterator _pos; const utils::UUID _key; const lowres_clock::time_point _expires; - std::variant _value; + std::unique_ptr _value; reader_concurrency_semaphore::inactive_read_handle _handle; public: @@ -357,7 +372,7 @@ public: entry(utils::UUID key, Querier q, lowres_clock::time_point expires) : _key(key) , _expires(expires) - , _value(std::move(q)) { + , _value(std::make_unique(std::move(q))) { } std::list::iterator pos() const { @@ -380,42 +395,16 @@ public: return _key; } - const ::schema& schema() const { - return *std::visit([] (auto& q) { - return q.schema(); - }, _value); - } - - reader_permit& permit() { - return std::visit([] (auto& q) -> reader_permit& { - return q.permit(); - }, _value); - } - - dht::partition_ranges_view ranges() const { - return std::visit([] (auto& q) { - return q.ranges(); - }, _value); - } - bool is_expired(const lowres_clock::time_point& now) const { return _expires <= now; } - size_t memory_usage() const { - return std::visit([] (auto& q) { - return q.memory_usage(); - }, _value); + const querier_base& value() const { + return *_value; } - template - const Querier& value() const & { - return std::get(_value); - } - - template - Querier value() && { - return std::get(std::move(_value)); + querier_base& value() { + return *_value; } };