querier: convert querier_cache and {data,mutation}_querier to v2

The shard_mutation_querier is left using a v1 reader in its API as the
multishard query code is not ready yet. When saving this reader it is
upgraded to v2 and on lookup it is downgraded to v1. This should cancel
out thanks to upgrade/downgrade unwrapping.
This commit is contained in:
Botond Dénes
2022-01-06 15:43:02 +02:00
parent 15d8ea983e
commit 85c42a5d76
2 changed files with 45 additions and 12 deletions

View File

@@ -216,13 +216,13 @@ querier_cache::querier_cache(std::chrono::seconds entry_ttl)
}
struct querier_utils {
static flat_mutation_reader get_reader(querier_base& q) noexcept {
return std::move(std::get<flat_mutation_reader>(q._reader));
static flat_mutation_reader_v2 get_reader(querier_base& q) noexcept {
return std::move(std::get<flat_mutation_reader_v2>(q._reader));
}
static reader_concurrency_semaphore::inactive_read_handle get_inactive_read_handle(querier_base& q) noexcept {
return std::move(std::get<reader_concurrency_semaphore::inactive_read_handle>(q._reader));
}
static void set_reader(querier_base& q, flat_mutation_reader r) noexcept {
static void set_reader(querier_base& q, flat_mutation_reader_v2 r) noexcept {
q._reader = std::move(r);
}
static void set_inactive_read_handle(querier_base& q, reader_concurrency_semaphore::inactive_read_handle h) noexcept {
@@ -255,7 +255,7 @@ void querier_cache::insert_querier(
auto& sem = q.permit().semaphore();
auto irh = sem.register_inactive_read(upgrade_to_v2(querier_utils::get_reader(q)));
auto irh = sem.register_inactive_read(querier_utils::get_reader(q));
if (!irh) {
++stats.resource_based_evictions;
return;
@@ -340,7 +340,7 @@ std::optional<Querier> querier_cache::lookup_querier(
throw std::runtime_error("lookup_querier(): found querier that is evicted");
}
reader_opt->set_timeout(timeout);
querier_utils::set_reader(q, downgrade_to_v1(std::move(*reader_opt)));
querier_utils::set_reader(q, std::move(*reader_opt));
--stats.population;
const auto can_be_used = can_be_used_for_page(q, s, ranges.front(), slice);
@@ -393,7 +393,7 @@ std::optional<shard_mutation_querier> querier_cache::lookup_shard_mutation_queri
future<> querier_base::close() noexcept {
struct variant_closer {
querier_base& q;
future<> operator()(flat_mutation_reader& reader) {
future<> operator()(flat_mutation_reader_v2& reader) {
return reader.close();
}
future<> operator()(reader_concurrency_semaphore::inactive_read_handle& irh) {

View File

@@ -101,6 +101,39 @@ auto consume_page(flat_mutation_reader& reader,
});
}
/// Consume a page worth of data from the reader.
///
/// Uses `compaction_state` for compacting the fragments and `consumer` for
/// building the results.
/// Returns a future containing a tuple with the last consumed clustering key,
/// or std::nullopt if the last row wasn't a clustering row, and whatever the
/// consumer's `consume_end_of_stream()` method returns.
template <emit_only_live_rows OnlyLive, typename Consumer>
requires CompactedFragmentsConsumer<Consumer>
auto consume_page(flat_mutation_reader_v2& reader,
lw_shared_ptr<compact_for_query_state<OnlyLive>> compaction_state,
const query::partition_slice& slice,
Consumer&& consumer,
uint64_t row_limit,
uint32_t partition_limit,
gc_clock::time_point query_time) {
return reader.peek().then([=, &reader, consumer = std::move(consumer), &slice] (
mutation_fragment_v2* next_fragment) mutable {
const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end;
compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer);
auto last_ckey = make_lw_shared<std::optional<clustering_key_prefix>>();
auto reader_consumer = compact_for_query<OnlyLive, clustering_position_tracker<Consumer>>(
compaction_state,
clustering_position_tracker(std::move(consumer), last_ckey));
return reader.consume(std::move(reader_consumer)).then([last_ckey] (auto&&... results) mutable {
static_assert(sizeof...(results) <= 1);
return make_ready_future<std::tuple<std::optional<clustering_key_prefix>, std::decay_t<decltype(results)>...>>(std::tuple(std::move(*last_ckey), std::move(results)...));
});
});
}
struct position_view {
const dht::decorated_key* partition_key;
const clustering_key_prefix* clustering_key;
@@ -114,12 +147,12 @@ protected:
reader_permit _permit;
lw_shared_ptr<const dht::partition_range> _range;
std::unique_ptr<const query::partition_slice> _slice;
std::variant<flat_mutation_reader, reader_concurrency_semaphore::inactive_read_handle> _reader;
std::variant<flat_mutation_reader_v2, reader_concurrency_semaphore::inactive_read_handle> _reader;
dht::partition_ranges_view _query_ranges;
public:
querier_base(reader_permit permit, lw_shared_ptr<const dht::partition_range> range,
std::unique_ptr<const query::partition_slice> slice, flat_mutation_reader reader, dht::partition_ranges_view query_ranges)
std::unique_ptr<const query::partition_slice> slice, flat_mutation_reader_v2 reader, dht::partition_ranges_view query_ranges)
: _schema(reader.schema())
, _permit(std::move(permit))
, _range(std::move(range))
@@ -134,7 +167,7 @@ public:
, _permit(std::move(permit))
, _range(make_lw_shared<const dht::partition_range>(std::move(range)))
, _slice(std::make_unique<const query::partition_slice>(std::move(slice)))
, _reader(ms.make_reader(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
, _reader(ms.make_reader_v2(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
, _query_ranges(*_range)
{ }
@@ -217,7 +250,7 @@ public:
uint32_t partition_limit,
gc_clock::time_point query_time,
tracing::trace_state_ptr trace_ptr = {}) {
return ::query::consume_page(std::get<flat_mutation_reader>(_reader), _compaction_state, *_slice, std::move(consumer), row_limit,
return ::query::consume_page(std::get<flat_mutation_reader_v2>(_reader), _compaction_state, *_slice, std::move(consumer), row_limit,
partition_limit, query_time).then([this, trace_ptr = std::move(trace_ptr)] (auto&& results) {
_last_ckey = std::get<std::optional<clustering_key>>(std::move(results));
const auto& cstats = _compaction_state->stats();
@@ -274,7 +307,7 @@ private:
reader_permit permit,
dht::decorated_key nominal_pkey,
std::optional<clustering_key_prefix> nominal_ckey)
: querier_base(permit, std::move(reader_range), std::move(reader_slice), std::move(reader), *query_ranges)
: querier_base(permit, std::move(reader_range), std::move(reader_slice), upgrade_to_v2(std::move(reader)), *query_ranges)
, _query_ranges(std::move(query_ranges))
, _nominal_pkey(std::move(nominal_pkey))
, _nominal_ckey(std::move(nominal_ckey)) {
@@ -307,7 +340,7 @@ public:
}
flat_mutation_reader reader() && {
return std::move(std::get<flat_mutation_reader>(_reader));
return downgrade_to_v1(std::move(std::get<flat_mutation_reader_v2>(_reader)));
}
};