querier: move common stuff into querier_base
The querier cache expects all querier objects it stores to have certain methods. To avoid accessing these via `std::visit()` (the querier object is stored in an `std::variant`), we move all the stuff that is common to all querier types into a base class. The querier cache now accesses the members via a reference to this common base. Additionally the variant is eliminated completely and the cache entry stores an `std::unique_ptr<querier_base>` instead. Tests: unit(dev) Signed-off-by: Botond Dénes <bdenes@scylladb.com> Message-Id: <20200603152544.83704-1-bdenes@scylladb.com>
This commit is contained in:
25
querier.cc
25
querier.cc
@@ -160,7 +160,7 @@ static bool ranges_match(const schema& s, dht::partition_ranges_view original_ra
|
||||
|
||||
template <typename Querier>
|
||||
static can_use can_be_used_for_page(const Querier& q, const schema& s, const dht::partition_range& range, const query::partition_slice& slice) {
|
||||
if (s.version() != q.schema()->version()) {
|
||||
if (s.version() != q.schema().version()) {
|
||||
return can_use::no_schema_version_mismatch;
|
||||
}
|
||||
|
||||
@@ -191,7 +191,7 @@ void querier_cache::scan_cache_entries() {
|
||||
while (it != end && it->is_expired(now)) {
|
||||
++_stats.time_based_evictions;
|
||||
--_stats.population;
|
||||
it->permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle());
|
||||
it->value().permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle());
|
||||
it = _entries.erase(it);
|
||||
}
|
||||
}
|
||||
@@ -206,7 +206,7 @@ static querier_cache::entries::iterator find_querier(querier_cache::entries& ent
|
||||
}
|
||||
|
||||
const auto it = std::find_if(queriers.first, queriers.second, [&] (const querier_cache::entry& e) {
|
||||
return ranges_match(e.schema(), e.ranges(), ranges);
|
||||
return ranges_match(e.value().schema(), e.value().ranges(), ranges);
|
||||
});
|
||||
|
||||
if (it == queriers.second) {
|
||||
@@ -264,7 +264,8 @@ static void insert_querier(
|
||||
|
||||
tracing::trace(trace_state, "Caching querier with key {}", key);
|
||||
|
||||
auto memory_usage = boost::accumulate(entries | boost::adaptors::transformed(std::mem_fn(&querier_cache::entry::memory_usage)), size_t(0));
|
||||
auto memory_usage = boost::accumulate(entries | boost::adaptors::transformed(
|
||||
[] (const querier_cache::entry& e) { return e.value().memory_usage(); }), size_t(0));
|
||||
|
||||
// We add the memory-usage of the to-be added querier to the memory-usage
|
||||
// of all the cached queriers. We now need to makes sure this number is
|
||||
@@ -276,8 +277,8 @@ static void insert_querier(
|
||||
if (memory_usage >= max_queriers_memory_usage) {
|
||||
auto it = entries.begin();
|
||||
while (it != entries.end() && memory_usage >= max_queriers_memory_usage) {
|
||||
memory_usage -= it->memory_usage();
|
||||
it->permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle());
|
||||
memory_usage -= it->value().memory_usage();
|
||||
it->value().permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle());
|
||||
it = entries.erase(it);
|
||||
--stats.population;
|
||||
++stats.memory_based_evictions;
|
||||
@@ -328,7 +329,11 @@ static std::optional<Querier> lookup_querier(
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto q = std::move(*it).template value<Querier>();
|
||||
auto* q_ptr = dynamic_cast<Querier*>(&it->value());
|
||||
if (!q_ptr) {
|
||||
throw std::runtime_error("lookup_querier(): found querier is not of the expected type");
|
||||
}
|
||||
auto q = std::move(*q_ptr);
|
||||
q.permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle());
|
||||
entries.erase(it);
|
||||
--stats.population;
|
||||
@@ -381,7 +386,7 @@ bool querier_cache::evict_one() {
|
||||
|
||||
++_stats.resource_based_evictions;
|
||||
--_stats.population;
|
||||
auto& sem = _entries.front().permit().semaphore();
|
||||
auto& sem = _entries.front().value().permit().semaphore();
|
||||
sem.unregister_inactive_read(std::move(_entries.front()).get_inactive_handle());
|
||||
_entries.pop_front();
|
||||
|
||||
@@ -392,9 +397,9 @@ void querier_cache::evict_all_for_table(const utils::UUID& schema_id) {
|
||||
auto it = _entries.begin();
|
||||
const auto end = _entries.end();
|
||||
while (it != end) {
|
||||
if (it->schema().id() == schema_id) {
|
||||
if (it->value().schema().id() == schema_id) {
|
||||
--_stats.population;
|
||||
it->permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle());
|
||||
it->value().permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle());
|
||||
it = _entries.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
|
||||
191
querier.hh
191
querier.hh
@@ -116,6 +116,64 @@ struct position_view {
|
||||
const clustering_key_prefix* clustering_key;
|
||||
};
|
||||
|
||||
class querier_base {
|
||||
protected:
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
std::unique_ptr<const dht::partition_range> _range;
|
||||
std::unique_ptr<const query::partition_slice> _slice;
|
||||
flat_mutation_reader _reader;
|
||||
dht::partition_ranges_view _query_ranges;
|
||||
|
||||
public:
|
||||
querier_base(reader_permit permit, std::unique_ptr<const dht::partition_range> range,
|
||||
std::unique_ptr<const query::partition_slice> slice, flat_mutation_reader reader, dht::partition_ranges_view query_ranges)
|
||||
: _schema(reader.schema())
|
||||
, _permit(std::move(permit))
|
||||
, _range(std::move(range))
|
||||
, _slice(std::move(slice))
|
||||
, _reader(std::move(reader))
|
||||
, _query_ranges(query_ranges)
|
||||
{ }
|
||||
|
||||
querier_base(schema_ptr schema, reader_permit permit, dht::partition_range range,
|
||||
query::partition_slice slice, const mutation_source& ms, const io_priority_class& pc, tracing::trace_state_ptr trace_ptr)
|
||||
: _schema(std::move(schema))
|
||||
, _permit(std::move(permit))
|
||||
, _range(std::make_unique<const dht::partition_range>(std::move(range)))
|
||||
, _slice(std::make_unique<const query::partition_slice>(std::move(slice)))
|
||||
, _reader(ms.make_reader(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
|
||||
, _query_ranges(*_range)
|
||||
{ }
|
||||
|
||||
querier_base(querier_base&&) = default;
|
||||
querier_base& operator=(querier_base&&) = default;
|
||||
|
||||
virtual ~querier_base() = default;
|
||||
|
||||
const ::schema& schema() const {
|
||||
return *_schema;
|
||||
}
|
||||
|
||||
reader_permit& permit() {
|
||||
return _permit;
|
||||
}
|
||||
|
||||
bool is_reversed() const {
|
||||
return _slice->options.contains(query::partition_slice::option::reversed);
|
||||
}
|
||||
|
||||
virtual position_view current_position() const = 0;
|
||||
|
||||
dht::partition_ranges_view ranges() const {
|
||||
return _query_ranges;
|
||||
}
|
||||
|
||||
size_t memory_usage() const {
|
||||
return _reader.buffer_size();
|
||||
}
|
||||
};
|
||||
|
||||
/// One-stop object for serving queries.
|
||||
///
|
||||
/// Encapsulates all state and logic for serving all pages for a given range
|
||||
@@ -138,12 +196,7 @@ struct position_view {
|
||||
/// page. It should be dropped instead and a new one should be created
|
||||
/// instead.
|
||||
template <emit_only_live_rows OnlyLive>
|
||||
class querier {
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
std::unique_ptr<const dht::partition_range> _range;
|
||||
std::unique_ptr<const query::partition_slice> _slice;
|
||||
flat_mutation_reader _reader;
|
||||
class querier : public querier_base {
|
||||
lw_shared_ptr<compact_for_query_state<OnlyLive>> _compaction_state;
|
||||
std::optional<clustering_key_prefix> _last_ckey;
|
||||
|
||||
@@ -155,19 +208,10 @@ public:
|
||||
query::partition_slice slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_ptr)
|
||||
: _schema(schema)
|
||||
, _permit(permit)
|
||||
, _range(std::make_unique<dht::partition_range>(std::move(range)))
|
||||
, _slice(std::make_unique<query::partition_slice>(std::move(slice)))
|
||||
, _reader(ms.make_reader(schema, std::move(permit), *_range, *_slice, pc, std::move(trace_ptr),
|
||||
streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
|
||||
: querier_base(schema, permit, std::move(range), std::move(slice), ms, pc, std::move(trace_ptr))
|
||||
, _compaction_state(make_lw_shared<compact_for_query_state<OnlyLive>>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) {
|
||||
}
|
||||
|
||||
bool is_reversed() const {
|
||||
return _slice->options.contains(query::partition_slice::option::reversed);
|
||||
}
|
||||
|
||||
bool are_limits_reached() const {
|
||||
return _compaction_state->are_limits_reached();
|
||||
}
|
||||
@@ -194,27 +238,11 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
size_t memory_usage() const {
|
||||
return _reader.buffer_size();
|
||||
}
|
||||
|
||||
schema_ptr schema() const {
|
||||
return _schema;
|
||||
}
|
||||
|
||||
reader_permit& permit() {
|
||||
return _permit;
|
||||
}
|
||||
|
||||
position_view current_position() const {
|
||||
virtual position_view current_position() const override {
|
||||
const dht::decorated_key* dk = _compaction_state->current_partition();
|
||||
const clustering_key_prefix* clustering_key = _last_ckey ? &*_last_ckey : nullptr;
|
||||
return {dk, clustering_key};
|
||||
}
|
||||
|
||||
dht::partition_ranges_view ranges() const {
|
||||
return *_range;
|
||||
}
|
||||
};
|
||||
|
||||
using data_querier = querier<emit_only_live_rows::yes>;
|
||||
@@ -229,15 +257,27 @@ using mutation_querier = querier<emit_only_live_rows::no>;
|
||||
/// created with (similar to other queriers).
|
||||
/// For position validation purposes (at lookup) the reader's position is
|
||||
/// considered to be the same as that of the query.
|
||||
class shard_mutation_querier {
|
||||
dht::partition_range_vector _query_ranges;
|
||||
std::unique_ptr<const dht::partition_range> _reader_range;
|
||||
std::unique_ptr<const query::partition_slice> _reader_slice;
|
||||
flat_mutation_reader _reader;
|
||||
reader_permit _permit;
|
||||
class shard_mutation_querier : public querier_base {
|
||||
std::unique_ptr<const dht::partition_range_vector> _query_ranges;
|
||||
dht::decorated_key _nominal_pkey;
|
||||
std::optional<clustering_key_prefix> _nominal_ckey;
|
||||
|
||||
private:
|
||||
shard_mutation_querier(
|
||||
std::unique_ptr<const dht::partition_range_vector> query_ranges,
|
||||
std::unique_ptr<const dht::partition_range> reader_range,
|
||||
std::unique_ptr<const query::partition_slice> reader_slice,
|
||||
flat_mutation_reader reader,
|
||||
reader_permit permit,
|
||||
dht::decorated_key nominal_pkey,
|
||||
std::optional<clustering_key_prefix> nominal_ckey)
|
||||
: querier_base(permit, std::move(reader_range), std::move(reader_slice), std::move(reader), *query_ranges)
|
||||
, _query_ranges(std::move(query_ranges))
|
||||
, _nominal_pkey(std::move(nominal_pkey))
|
||||
, _nominal_ckey(std::move(nominal_ckey)) {
|
||||
}
|
||||
|
||||
|
||||
public:
|
||||
shard_mutation_querier(
|
||||
const dht::partition_range_vector query_ranges,
|
||||
@@ -247,45 +287,20 @@ public:
|
||||
reader_permit permit,
|
||||
dht::decorated_key nominal_pkey,
|
||||
std::optional<clustering_key_prefix> nominal_ckey)
|
||||
: _query_ranges(std::move(query_ranges))
|
||||
, _reader_range(std::move(reader_range))
|
||||
, _reader_slice(std::move(reader_slice))
|
||||
, _reader(std::move(reader))
|
||||
, _permit(std::move(permit))
|
||||
, _nominal_pkey(std::move(nominal_pkey))
|
||||
, _nominal_ckey(std::move(nominal_ckey)) {
|
||||
: shard_mutation_querier(std::make_unique<const dht::partition_range_vector>(std::move(query_ranges)), std::move(reader_range),
|
||||
std::move(reader_slice), std::move(reader), std::move(permit), std::move(nominal_pkey), std::move(nominal_ckey)) {
|
||||
}
|
||||
|
||||
bool is_reversed() const {
|
||||
return _reader_slice->options.contains(query::partition_slice::option::reversed);
|
||||
}
|
||||
|
||||
size_t memory_usage() const {
|
||||
return _reader.buffer_size();
|
||||
}
|
||||
|
||||
schema_ptr schema() const {
|
||||
return _reader.schema();
|
||||
}
|
||||
|
||||
reader_permit& permit() {
|
||||
return _permit;
|
||||
}
|
||||
|
||||
position_view current_position() const {
|
||||
virtual position_view current_position() const override {
|
||||
return {&_nominal_pkey, _nominal_ckey ? &*_nominal_ckey : nullptr};
|
||||
}
|
||||
|
||||
dht::partition_ranges_view ranges() const {
|
||||
return _query_ranges;
|
||||
}
|
||||
|
||||
std::unique_ptr<const dht::partition_range> reader_range() && {
|
||||
return std::move(_reader_range);
|
||||
return std::move(_range);
|
||||
}
|
||||
|
||||
std::unique_ptr<const query::partition_slice> reader_slice() && {
|
||||
return std::move(_reader_slice);
|
||||
return std::move(_slice);
|
||||
}
|
||||
|
||||
flat_mutation_reader reader() && {
|
||||
@@ -349,7 +364,7 @@ public:
|
||||
std::list<entry>::iterator _pos;
|
||||
const utils::UUID _key;
|
||||
const lowres_clock::time_point _expires;
|
||||
std::variant<data_querier, mutation_querier, shard_mutation_querier> _value;
|
||||
std::unique_ptr<querier_base> _value;
|
||||
reader_concurrency_semaphore::inactive_read_handle _handle;
|
||||
|
||||
public:
|
||||
@@ -357,7 +372,7 @@ public:
|
||||
entry(utils::UUID key, Querier q, lowres_clock::time_point expires)
|
||||
: _key(key)
|
||||
, _expires(expires)
|
||||
, _value(std::move(q)) {
|
||||
, _value(std::make_unique<Querier>(std::move(q))) {
|
||||
}
|
||||
|
||||
std::list<entry>::iterator pos() const {
|
||||
@@ -380,42 +395,16 @@ public:
|
||||
return _key;
|
||||
}
|
||||
|
||||
const ::schema& schema() const {
|
||||
return *std::visit([] (auto& q) {
|
||||
return q.schema();
|
||||
}, _value);
|
||||
}
|
||||
|
||||
reader_permit& permit() {
|
||||
return std::visit([] (auto& q) -> reader_permit& {
|
||||
return q.permit();
|
||||
}, _value);
|
||||
}
|
||||
|
||||
dht::partition_ranges_view ranges() const {
|
||||
return std::visit([] (auto& q) {
|
||||
return q.ranges();
|
||||
}, _value);
|
||||
}
|
||||
|
||||
bool is_expired(const lowres_clock::time_point& now) const {
|
||||
return _expires <= now;
|
||||
}
|
||||
|
||||
size_t memory_usage() const {
|
||||
return std::visit([] (auto& q) {
|
||||
return q.memory_usage();
|
||||
}, _value);
|
||||
const querier_base& value() const {
|
||||
return *_value;
|
||||
}
|
||||
|
||||
template <typename Querier>
|
||||
const Querier& value() const & {
|
||||
return std::get<Querier>(_value);
|
||||
}
|
||||
|
||||
template <typename Querier>
|
||||
Querier value() && {
|
||||
return std::get<Querier>(std::move(_value));
|
||||
querier_base& value() {
|
||||
return *_value;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user