query: coroutinize to_data_query_result

Reduce stalls by maybe yielding in-between partitions,
and by awaiting unfreeze_gently where possible.

Refs #10038

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-05-03 20:33:31 +03:00
parent e12454f175
commit c9612855c7
4 changed files with 18 additions and 12 deletions

View File

@@ -6,6 +6,9 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <boost/range/adaptor/reversed.hpp>
#include "mutation_partition.hh"
#include "clustering_interval_set.hh"
@@ -2064,7 +2067,7 @@ reconcilable_result reconcilable_result_builder::consume_end_of_stream() {
std::move(_memory_accounter).done());
}
query::result
future<query::result>
to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint64_t max_rows, uint32_t max_partitions,
query::result_options opts) {
// This result was already built with a limit, don't apply another one.
@@ -2081,10 +2084,12 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
if (res.stop == stop_iteration::yes) {
break;
}
co_await coroutine::maybe_yield();
}
} else {
for (const partition& p : r.partitions()) {
const auto res = p.mut().unfreeze(s).consume(consumer, reverse);
auto m = co_await p.mut().unfreeze_gently(s);
const auto res = std::move(m).consume(consumer, reverse);
if (res.stop == stop_iteration::yes) {
break;
}
@@ -2093,7 +2098,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
if (r.is_short_read()) {
builder.mark_as_short_read();
}
return builder.build();
co_return builder.build();
}
query::result

View File

@@ -159,7 +159,7 @@ public:
reconcilable_result consume_end_of_stream();
};
query::result to_data_query_result(
future<query::result> to_data_query_result(
const reconcilable_result&,
schema_ptr,
const query::partition_slice&,

View File

@@ -3709,7 +3709,7 @@ protected:
|| data_resolver->live_partition_count() >= original_partition_limit())
&& !data_resolver->any_partition_short_read()) {
auto result = ::make_foreign(::make_lw_shared<query::result>(
to_data_query_result(std::move(*rr_opt), _schema, _cmd->slice, _cmd->get_row_limit(), cmd->partition_limit)));
co_await to_data_query_result(std::move(*rr_opt), _schema, _cmd->slice, _cmd->get_row_limit(), cmd->partition_limit)));
// wait for write to complete before returning result to prevent multiple concurrent read requests to
// trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum
// another read had returned a newer value (but the newer value had not yet been sent to the other replicas)
@@ -5382,7 +5382,7 @@ storage_proxy::query_nonsingular_data_locally(schema_ptr s, lw_shared_ptr<query:
ret = co_await query_data_on_all_shards(_db, std::move(s), *local_cmd, ranges, opts, std::move(trace_state), timeout);
} else {
auto res = co_await query_mutations_on_all_shards(_db, s, *local_cmd, ranges, std::move(trace_state), timeout);
ret = rpc::tuple(make_foreign(make_lw_shared<query::result>(to_data_query_result(std::move(*std::get<0>(res)), std::move(s), local_cmd->slice,
ret = rpc::tuple(make_foreign(make_lw_shared<query::result>(co_await to_data_query_result(std::move(*std::get<0>(res)), std::move(s), local_cmd->slice,
local_cmd->get_row_limit(), local_cmd->partition_limit, opts))), std::get<1>(res));
}
co_return ret;

View File

@@ -74,8 +74,9 @@ static query::result_memory_accounter make_accounter() {
return query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size };
}
// Called from a seastar thread
query::result_set to_result_set(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice) {
return query::result_set::from_raw_result(s, slice, to_data_query_result(r, s, slice, inf32, inf32));
return query::result_set::from_raw_result(s, slice, to_data_query_result(r, s, slice, inf32, inf32).get0());
}
static reconcilable_result mutation_query(schema_ptr s, reader_permit permit, const mutation_source& source, const dht::partition_range& range,
@@ -460,28 +461,28 @@ SEASTAR_TEST_CASE(test_result_row_count) {
auto src = make_source({m1});
auto r = to_data_query_result(mutation_query(s, semaphore.make_permit(), make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now),
s, slice, inf32, inf32);
s, slice, inf32, inf32).get0();
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
r = to_data_query_result(mutation_query(s, semaphore.make_permit(), make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now),
s, slice, inf32, inf32);
s, slice, inf32, inf32).get0();
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
r = to_data_query_result(mutation_query(s, semaphore.make_permit(), make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now),
s, slice, inf32, inf32);
s, slice, inf32, inf32).get0();
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
r = to_data_query_result(mutation_query(s, semaphore.make_permit(), make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now),
s, slice, inf32, inf32);
s, slice, inf32, inf32).get0();
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
mutation m2(s, partition_key::from_single_value(*s, "key2"));
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
r = to_data_query_result(mutation_query(s, semaphore.make_permit(), make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now),
s, slice, inf32, inf32);
s, slice, inf32, inf32).get0();
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
});
}