service: storage_proxy: refactor encode_replica_exception_for_rpc

To properly handle abort_requested_exception thrown from
migration_manager::get_schema_for_read in storage_proxy::handle_read (we
do in the next commit) we have to somehow encode and return it. The
encode_replica_exception_for_rpc function is not suitable for that because
it requires the SourceTuple type (of a value returned by do_query()) which
we don't know when calling get_schema_for_read.

We move the part of encode_replica_exception_for_rpc responsible for
handling exceptions to a new function and rewrite it in a way that doesn't
require the SourceTuple type. As this function fits the name
encode_replica_exception_for_rpc better, we name it this way and rename
the previous encode_replica_exception_for_rpc.
This commit is contained in:
Patryk Jędrzejczak
2023-07-12 13:07:18 +02:00
parent a21c4abad7
commit 68bd0424c2

View File

@@ -127,20 +127,26 @@ seastar::metrics::label_instance current_scheduling_group_label() {
}
template<typename ResultTuple, typename SourceTuple>
static future<ResultTuple> encode_replica_exception_for_rpc(gms::feature_service& features, future<SourceTuple>&& f) {
if (!f.failed()) {
return make_ready_future<ResultTuple>(utils::tuple_insert<ResultTuple>(f.get(), replica::exception_variant{}));
}
std::exception_ptr eptr = f.get_exception();
template<typename ResultTuple>
static future<ResultTuple> encode_replica_exception_for_rpc(gms::feature_service& features, std::exception_ptr eptr) {
if (features.typed_errors_in_read_rpc) {
if (auto ex = replica::try_encode_replica_exception(eptr); ex) {
return make_ready_future<ResultTuple>(utils::tuple_insert<ResultTuple>(utils::make_default_rpc_tuple<SourceTuple>(), std::move(ex)));
ResultTuple encoded_ex = utils::make_default_rpc_tuple<ResultTuple>();
std::get<replica::exception_variant>(encoded_ex) = std::move(ex);
return make_ready_future<ResultTuple>(std::move(encoded_ex));
}
}
return make_exception_future<ResultTuple>(std::move(eptr));
}
template<typename ResultTuple, typename SourceTuple>
static future<ResultTuple> add_replica_exception_to_query_result(gms::feature_service& features, future<SourceTuple>&& f) {
if (!f.failed()) {
return make_ready_future<ResultTuple>(utils::tuple_insert<ResultTuple>(f.get(), replica::exception_variant{}));
}
return encode_replica_exception_for_rpc<ResultTuple>(features, f.get_exception());
}
static bool only_me(const inet_address_vector_replica_set& replicas) {
return replicas.size() == 1 && replicas[0] == utils::fb_utilities::get_broadcast_address();
}
@@ -684,23 +690,21 @@ private:
static_assert(verb == static_cast<read_verb>(-1), "Unsupported verb");
}
};
auto to_future = [&](replica::stale_topology_exception e) {
return make_exception_future<typename decltype(do_query())::value_type>(std::move(e));
};
const auto fence = fence_opt.value_or(fencing_token{});
if (auto stale = _sp.apply_fence(fence, src_ip)) {
co_return co_await encode_replica_exception_for_rpc<Result>(p->features(), to_future(std::move(*stale)));
co_return co_await encode_replica_exception_for_rpc<Result>(p->features(), std::make_exception_ptr(std::move(*stale)));
}
auto f = co_await coroutine::as_future(do_query());
tracing::trace(trace_state_ptr, "{} handling is done, sending a response to /{}", verb, src_ip);
if (auto stale = _sp.apply_fence(fence, src_ip)) {
co_return co_await encode_replica_exception_for_rpc<Result>(p->features(), to_future(std::move(*stale)));
co_return co_await encode_replica_exception_for_rpc<Result>(p->features(), std::make_exception_ptr(std::move(*stale)));
}
co_return co_await encode_replica_exception_for_rpc<Result>(p->features(), std::move(f));
co_return co_await add_replica_exception_to_query_result<Result>(p->features(), std::move(f));
}
using read_data_result_t = rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, replica::exception_variant>;