Save and restore queriers in mutation_query() and data_query()

Use the querier_cache (represented by the passed-in
querier_cache_context) object to lookup saved queriers at the start of
the page and save them at the end of it if it is likely that there will
be more page requests.
This commit is contained in:
Botond Dénes
2018-01-18 12:16:31 +02:00
parent cab38c9f81
commit ff808d9ce6
4 changed files with 92 additions and 43 deletions

View File

@@ -2999,22 +2999,27 @@ struct query_state {
};
future<lw_shared_ptr<query::result>>
column_family::query(schema_ptr s, const query::read_command& cmd, query::result_options opts,
const dht::partition_range_vector& partition_ranges,
tracing::trace_state_ptr trace_state, query::result_memory_limiter& memory_limiter,
uint64_t max_size, db::timeout_clock::time_point timeout) {
column_family::query(schema_ptr s,
const query::read_command& cmd,
query::result_options opts,
const dht::partition_range_vector& partition_ranges,
tracing::trace_state_ptr trace_state,
query::result_memory_limiter& memory_limiter,
uint64_t max_size,
db::timeout_clock::time_point timeout,
querier_cache_context cache_ctx) {
utils::latency_counter lc;
_stats.reads.set_latency(lc);
auto f = opts.request == query::result_request::only_digest
? memory_limiter.new_digest_read(max_size) : memory_limiter.new_data_read(max_size);
return f.then([this, lc, s = std::move(s), &cmd, opts, &partition_ranges, trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) mutable {
return f.then([this, lc, s = std::move(s), &cmd, opts, &partition_ranges,
trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] (query::result_memory_accounter accounter) mutable {
auto qs_ptr = std::make_unique<query_state>(std::move(s), cmd, opts, partition_ranges, std::move(accounter));
auto& qs = *qs_ptr;
return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state), timeout] {
return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] {
auto&& range = *qs.current_partition_range++;
return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.remaining_rows(),
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, trace_state,
timeout);
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, trace_state, timeout, cache_ctx);
}).then([qs_ptr = std::move(qs_ptr), &qs] {
return make_ready_future<lw_shared_ptr<query::result>>(
make_lw_shared<query::result>(qs.builder.build()));
@@ -3058,10 +3063,17 @@ future<lw_shared_ptr<query::result>, cache_temperature>
database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state, uint64_t max_result_size, db::timeout_clock::time_point timeout) {
column_family& cf = find_column_family(cmd.cf_id);
return _data_query_stage(&cf, std::move(s), seastar::cref(cmd), opts, seastar::cref(ranges),
std::move(trace_state), seastar::ref(get_result_memory_limiter()),
max_result_size,
timeout).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) {
querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page);
return _data_query_stage(&cf,
std::move(s),
seastar::cref(cmd),
opts,
seastar::cref(ranges),
std::move(trace_state),
seastar::ref(get_result_memory_limiter()),
max_result_size,
timeout,
std::move(cache_ctx)).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) {
if (f.failed()) {
++s->total_reads_failed;
return make_exception_future<lw_shared_ptr<query::result>, cache_temperature>(f.get_exception());
@@ -3078,9 +3090,18 @@ future<reconcilable_result, cache_temperature>
database::query_mutations(schema_ptr s, const query::read_command& cmd, const dht::partition_range& range,
query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
column_family& cf = find_column_family(cmd.cf_id);
return mutation_query(std::move(s), cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.partition_limit,
cmd.timestamp, std::move(accounter), std::move(trace_state),
timeout).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) {
querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page);
return mutation_query(std::move(s),
cf.as_mutation_source(),
range,
cmd.slice,
cmd.row_limit,
cmd.partition_limit,
cmd.timestamp,
std::move(accounter),
std::move(trace_state),
timeout,
std::move(cache_ctx)).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) {
if (f.failed()) {
++s->total_reads_failed;
return make_exception_future<reconcilable_result, cache_temperature>(f.get_exception());

View File

@@ -84,6 +84,7 @@
#include "dirty_memory_manager.hh"
#include "reader_concurrency_semaphore.hh"
#include "db/timeout_clock.hh"
#include "querier.hh"
class cell_locker;
class cell_locker_stats;
@@ -660,7 +661,8 @@ public:
tracing::trace_state_ptr trace_state,
query::result_memory_limiter& memory_limiter,
uint64_t max_result_size,
db::timeout_clock::time_point timeout = db::no_timeout);
db::timeout_clock::time_point timeout = db::no_timeout,
querier_cache_context cache_ctx = { });
void start();
future<> stop();
@@ -1117,8 +1119,17 @@ private:
semaphore _sstable_load_concurrency_sem{max_concurrent_sstable_loads()};
concrete_execution_stage<future<lw_shared_ptr<query::result>>, column_family*, schema_ptr, const query::read_command&, query::result_options,
const dht::partition_range_vector&, tracing::trace_state_ptr, query::result_memory_limiter&, uint64_t, db::timeout_clock::time_point> _data_query_stage;
concrete_execution_stage<future<lw_shared_ptr<query::result>>,
column_family*,
schema_ptr,
const query::read_command&,
query::result_options,
const dht::partition_range_vector&,
tracing::trace_state_ptr,
query::result_memory_limiter&,
uint64_t,
db::timeout_clock::time_point,
querier_cache_context> _data_query_stage;
std::unordered_map<sstring, keyspace> _keyspaces;
std::unordered_map<utils::UUID, lw_shared_ptr<column_family>> _column_families;
@@ -1131,6 +1142,9 @@ private:
bool _enable_incremental_backups = false;
compaction_controller _compaction_controller;
querier_cache _querier_cache;
future<> init_commitlog();
future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout);
future<> apply_in_memory(const mutation& m, column_family& cf, db::rp_handle&&, db::timeout_clock::time_point timeout);

View File

@@ -1904,22 +1904,26 @@ future<> data_query(
gc_clock::time_point query_time,
query::result::builder& builder,
tracing::trace_state_ptr trace_ptr,
db::timeout_clock::time_point timeout)
db::timeout_clock::time_point timeout,
querier_cache_context cache_ctx)
{
if (row_limit == 0 || slice.partition_row_limit() == 0 || partition_limit == 0) {
return make_ready_future<>();
}
auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);
auto q = cache_ctx.lookup(emit_only_live_rows::yes, *s, range, slice, trace_ptr, [&, trace_ptr] {
return querier(source, s, range, slice, service::get_local_sstable_query_read_priority(),
std::move(trace_ptr), emit_only_live_rows::yes);
});
auto qrb = query_result_builder(*s, builder);
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::yes, query_result_builder>>(
*s, query_time, slice, row_limit, partition_limit, std::move(qrb));
return do_with(source.make_reader(s, range, slice, service::get_local_sstable_query_read_priority(), std::move(trace_ptr),
streamed_mutation::forwarding::no, mutation_reader::forwarding::no),
[cfq = std::move(cfq), is_reversed, timeout] (flat_mutation_reader& reader) mutable {
return reader.consume(std::move(cfq), flat_mutation_reader::consume_reversed_partitions(is_reversed), timeout);
return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (querier& q) mutable {
auto qrb = query_result_builder(*s, builder);
return q.consume_page(std::move(qrb), row_limit, partition_limit, query_time, timeout).then(
[=, &builder, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] () mutable {
if (q.are_limits_reached() || builder.is_short_read()) {
cache_ctx.insert(std::move(q), std::move(trace_ptr));
}
});
});
}
@@ -1928,7 +1932,7 @@ class reconcilable_result_builder {
const query::partition_slice& _slice;
std::vector<partition> _result;
uint32_t _live_rows;
uint32_t _live_rows{};
bool _has_ck_selector{};
bool _static_row_is_alive{};
@@ -2012,22 +2016,28 @@ static do_mutation_query(schema_ptr s,
gc_clock::time_point query_time,
query::result_memory_accounter&& accounter,
tracing::trace_state_ptr trace_ptr,
db::timeout_clock::time_point timeout)
db::timeout_clock::time_point timeout,
querier_cache_context cache_ctx)
{
if (row_limit == 0 || slice.partition_row_limit() == 0 || partition_limit == 0) {
return make_ready_future<reconcilable_result>(reconcilable_result());
}
auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);
auto q = cache_ctx.lookup(emit_only_live_rows::no, *s, range, slice, trace_ptr, [&, trace_ptr] {
return querier(source, s, range, slice, service::get_local_sstable_query_read_priority(),
std::move(trace_ptr), emit_only_live_rows::no);
});
auto rrb = reconcilable_result_builder(*s, slice, std::move(accounter));
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::no, reconcilable_result_builder>>(
*s, query_time, slice, row_limit, partition_limit, std::move(rrb));
return do_with(source.make_reader(s, range, slice, service::get_local_sstable_query_read_priority(), std::move(trace_ptr),
streamed_mutation::forwarding::no, mutation_reader::forwarding::no),
[cfq = std::move(cfq), is_reversed, timeout] (flat_mutation_reader& reader) mutable {
return reader.consume(std::move(cfq), flat_mutation_reader::consume_reversed_partitions(is_reversed), timeout);
return do_with(std::move(q),
[=, &slice, accounter = std::move(accounter), trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (querier& q) mutable {
auto rrb = reconcilable_result_builder(*s, slice, std::move(accounter));
return q.consume_page(std::move(rrb), row_limit, partition_limit, query_time, timeout).then(
[=, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (reconcilable_result r) mutable {
if (q.are_limits_reached() || r.is_short_read()) {
cache_ctx.insert(std::move(q), std::move(trace_ptr));
}
return r;
});
});
}
@@ -2043,10 +2053,11 @@ mutation_query(schema_ptr s,
gc_clock::time_point query_time,
query::result_memory_accounter&& accounter,
tracing::trace_state_ptr trace_ptr,
db::timeout_clock::time_point timeout)
db::timeout_clock::time_point timeout,
querier_cache_context cache_ctx)
{
return mutation_query_stage(std::move(s), std::move(source), seastar::cref(range), seastar::cref(slice),
row_limit, partition_limit, query_time, std::move(accounter), std::move(trace_ptr), timeout);
row_limit, partition_limit, query_time, std::move(accounter), std::move(trace_ptr), timeout, std::move(cache_ctx));
}
deletable_row::deletable_row(clustering_row&& cr)

View File

@@ -26,6 +26,7 @@
#include "mutation_reader.hh"
#include "frozen_mutation.hh"
#include "db/timeout_clock.hh"
#include "querier.hh"
class reconcilable_result;
class frozen_reconcilable_result;
@@ -129,7 +130,8 @@ future<reconcilable_result> mutation_query(
gc_clock::time_point query_time,
query::result_memory_accounter&& accounter = { },
tracing::trace_state_ptr trace_ptr = nullptr,
db::timeout_clock::time_point timeout = db::no_timeout);
db::timeout_clock::time_point timeout = db::no_timeout,
querier_cache_context cache_ctx = { });
future<> data_query(
schema_ptr s,
@@ -141,7 +143,8 @@ future<> data_query(
gc_clock::time_point query_time,
query::result::builder& builder,
tracing::trace_state_ptr trace_ptr = nullptr,
db::timeout_clock::time_point timeout = db::no_timeout);
db::timeout_clock::time_point timeout = db::no_timeout,
querier_cache_context cache_ctx = { });
// Performs a query for counter updates.
future<mutation_opt> counter_write_query(schema_ptr, const mutation_source&,