Merge 'fencing: handle counter_mutations' from Gusev Petr
In this PR we add proper fencing handling to the `counter_mutation` verb. As for regular mutations, we do the check twice in `handle_counter_mutation`, before and after applying the mutations. The last is important in case fence was moved while we were handling the request - some post-fence actions might have already happened at this time, so we can't treat the request as successful. For example, if topology change coordinator was switching to `write_both_read_new`, streaming might have already started and missed this update. In `mutate_counters` we can use a single `fencing_token` for all leaders, since all the erms are processed without yields and should underneath share the same `token_metadata`. We don't pass fencing token for replication explicitly in `replicate_counter_from_leader` since `mutate_counter_on_leader_and_replicate` doesn't capture erm and if the drain on the coordinator timed out the erm for replication might be different and we should use the corresponding (maybe the new one) topology version for outgoing write replication requests. This delayed replication is similar to any other background activity (e.g. writing hints) - it takes the current erm and the current `token_metadata` version for outgoing requests. Closes #14564 * github.com:scylladb/scylladb: counter_mutation: add fencing encode_replica_exception_for_rpc: handle the case when result type is a single exception_variant counter_mutation: add replica::exception_variant to signature
This commit is contained in:
@@ -26,7 +26,7 @@
|
||||
verb [[with_client_info, with_timeout, one_way]] mutation (frozen_mutation fm [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[ref]] [[version 1.3.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]]);
|
||||
verb [[with_client_info, one_way]] mutation_done (unsigned shard, uint64_t response_id, db::view::update_backlog backlog [[version 3.1.0]]);
|
||||
verb [[with_client_info, one_way]] mutation_failed (unsigned shard, uint64_t response_id, size_t num_failed, db::view::update_backlog backlog [[version 3.1.0]], replica::exception_variant exception [[version 5.1.0]]);
|
||||
verb [[with_client_info, with_timeout]] counter_mutation (std::vector<frozen_mutation> fms, db::consistency_level cl, std::optional<tracing::trace_info> trace_info [[ref]]);
|
||||
verb [[with_client_info, with_timeout]] counter_mutation (std::vector<frozen_mutation> fms, db::consistency_level cl, std::optional<tracing::trace_info> trace_info [[ref]], service::fencing_token fence [[version 5.4.0]]) -> replica::exception_variant [[version 5.4.0]];
|
||||
verb [[with_client_info, with_timeout, one_way]] hint_mutation (frozen_mutation fm [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[ref]] [[version 1.3.0]] /* this verb was mistakenly introduced with optional trace_info */, service::fencing_token fence [[version 5.4.0]]);
|
||||
verb [[with_client_info, with_timeout]] read_data (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]]) -> query::result [[lw_shared_ptr]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]];
|
||||
verb [[with_client_info, with_timeout]] read_mutation_data (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, service::fencing_token fence [[version 5.4.0]]) -> reconcilable_result [[lw_shared_ptr]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]];
|
||||
|
||||
@@ -128,19 +128,23 @@ seastar::metrics::label_instance current_scheduling_group_label() {
|
||||
|
||||
}
|
||||
|
||||
template<typename ResultTuple>
|
||||
static future<ResultTuple> encode_replica_exception_for_rpc(gms::feature_service& features, std::exception_ptr eptr) {
|
||||
template<typename ResultType>
|
||||
static future<ResultType> encode_replica_exception_for_rpc(gms::feature_service& features, std::exception_ptr eptr) {
|
||||
if (features.typed_errors_in_read_rpc) {
|
||||
if (auto ex = replica::try_encode_replica_exception(eptr); ex) {
|
||||
ResultTuple encoded_ex = utils::make_default_rpc_tuple<ResultTuple>();
|
||||
std::get<replica::exception_variant>(encoded_ex) = std::move(ex);
|
||||
return make_ready_future<ResultTuple>(std::move(encoded_ex));
|
||||
if constexpr (std::is_same_v<ResultType, replica::exception_variant>) {
|
||||
return make_ready_future<ResultType>(std::move(ex));
|
||||
} else {
|
||||
ResultType encoded_ex = utils::make_default_rpc_tuple<ResultType>();
|
||||
std::get<replica::exception_variant>(encoded_ex) = std::move(ex);
|
||||
return make_ready_future<ResultType>(std::move(encoded_ex));
|
||||
}
|
||||
}
|
||||
}
|
||||
return make_exception_future<ResultTuple>(std::move(eptr));
|
||||
return make_exception_future<ResultType>(std::move(eptr));
|
||||
}
|
||||
|
||||
template<typename ResultTuple, typename SourceTuple>
|
||||
template<utils::Tuple ResultTuple, utils::Tuple SourceTuple>
|
||||
static future<ResultTuple> add_replica_exception_to_query_result(gms::feature_service& features, future<SourceTuple>&& f) {
|
||||
if (!f.failed()) {
|
||||
return make_ready_future<ResultTuple>(utils::tuple_insert<ResultTuple>(f.get(), replica::exception_variant{}));
|
||||
@@ -247,11 +251,14 @@ public:
|
||||
|
||||
future<> send_counter_mutation(
|
||||
netw::msg_addr addr, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state,
|
||||
std::vector<frozen_mutation> fms, db::consistency_level cl) {
|
||||
std::vector<frozen_mutation> fms, db::consistency_level cl, fencing_token fence) {
|
||||
tracing::trace(tr_state, "Enqueuing counter update to {}", addr);
|
||||
return ser::storage_proxy_rpc_verbs::send_counter_mutation(
|
||||
&_ms, std::move(addr), timeout,
|
||||
std::move(fms), cl, tracing::make_trace_info(tr_state));
|
||||
auto&& opt_exception = co_await ser::storage_proxy_rpc_verbs::send_counter_mutation(
|
||||
&_ms, std::move(addr), timeout,
|
||||
std::move(fms), cl, tracing::make_trace_info(tr_state), fence);
|
||||
if (opt_exception.has_value() && *opt_exception) {
|
||||
co_await coroutine::return_exception_ptr((*opt_exception).into_exception_ptr());
|
||||
}
|
||||
}
|
||||
|
||||
future<> send_mutation_done(
|
||||
@@ -399,9 +406,10 @@ private:
|
||||
co_return co_await _mm.get_schema_for_write(std::move(v), std::move(from), _ms, &aoe.abort_source());
|
||||
}
|
||||
|
||||
future<> handle_counter_mutation(
|
||||
future<replica::exception_variant> handle_counter_mutation(
|
||||
const rpc::client_info& cinfo, rpc::opt_time_point t,
|
||||
std::vector<frozen_mutation> fms, db::consistency_level cl, std::optional<tracing::trace_info> trace_info) {
|
||||
std::vector<frozen_mutation> fms, db::consistency_level cl, std::optional<tracing::trace_info> trace_info,
|
||||
rpc::optional<service::fencing_token> fence_opt) {
|
||||
auto src_addr = netw::messaging_service::get_source(cinfo);
|
||||
|
||||
tracing::trace_state_ptr trace_state_ptr;
|
||||
@@ -411,6 +419,12 @@ private:
|
||||
tracing::trace(trace_state_ptr, "Message received from /{}", src_addr.addr);
|
||||
}
|
||||
|
||||
const auto fence = fence_opt.value_or(fencing_token{});
|
||||
if (auto stale = _sp.apply_fence(fence, src_addr.addr)) {
|
||||
co_return co_await encode_replica_exception_for_rpc<replica::exception_variant>(_sp.features(),
|
||||
make_exception_ptr(std::move(*stale)));
|
||||
}
|
||||
|
||||
std::vector<frozen_mutation_and_schema> mutations;
|
||||
auto timeout = *t;
|
||||
co_await coroutine::parallel_for_each(std::move(fms), [&] (frozen_mutation& fm) {
|
||||
@@ -423,6 +437,11 @@ private:
|
||||
});
|
||||
auto& sp = _sp;
|
||||
co_await sp.mutate_counters_on_leader(std::move(mutations), cl, timeout, std::move(trace_state_ptr), /* FIXME: rpc should also pass a permit down to callbacks */ empty_service_permit());
|
||||
if (auto stale = _sp.apply_fence(fence, src_addr.addr)) {
|
||||
co_return co_await encode_replica_exception_for_rpc<replica::exception_variant>(_sp.features(),
|
||||
make_exception_ptr(std::move(*stale)));
|
||||
}
|
||||
co_return replica::exception_variant{};
|
||||
}
|
||||
|
||||
future<rpc::no_wait_type> handle_write(
|
||||
@@ -3287,6 +3306,7 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level
|
||||
// so we need a container for them. std::set<> will result in the fewest allocations if there is just one.
|
||||
std::set<locator::effective_replication_map_ptr> erms;
|
||||
|
||||
const auto fence = fencing_token{_shared_token_metadata.get()->get_version()};
|
||||
for (auto& m : mutations) {
|
||||
auto& table = _db.local().find_column_family(m.schema()->id());
|
||||
auto erm = table.get_effective_replication_map();
|
||||
@@ -3298,14 +3318,14 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level
|
||||
|
||||
// Forward mutations to the leaders chosen for them
|
||||
auto my_address = utils::fb_utilities::get_broadcast_address();
|
||||
co_await coroutine::parallel_for_each(leaders, [this, cl, timeout, tr_state = std::move(tr_state), permit = std::move(permit), my_address] (auto& endpoint_and_mutations) -> future<> {
|
||||
co_await coroutine::parallel_for_each(leaders, [this, cl, timeout, tr_state = std::move(tr_state), permit = std::move(permit), my_address, fence] (auto& endpoint_and_mutations) -> future<> {
|
||||
auto first_schema = endpoint_and_mutations.second[0].s;
|
||||
|
||||
try {
|
||||
auto endpoint = endpoint_and_mutations.first;
|
||||
|
||||
if (endpoint == my_address) {
|
||||
co_await this->mutate_counters_on_leader(std::move(endpoint_and_mutations.second), cl, timeout, tr_state, permit);
|
||||
co_await apply_fence(this->mutate_counters_on_leader(std::move(endpoint_and_mutations.second), cl, timeout, tr_state, permit), fence, my_address);
|
||||
} else {
|
||||
auto& mutations = endpoint_and_mutations.second;
|
||||
auto fms = boost::copy_range<std::vector<frozen_mutation>>(mutations | boost::adaptors::transformed([] (auto& m) {
|
||||
@@ -3318,7 +3338,7 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level
|
||||
|
||||
co_await remote().send_counter_mutation(
|
||||
netw::messaging_service::msg_addr{ endpoint_and_mutations.first, 0 }, timeout, tr_state,
|
||||
std::move(fms), cl);
|
||||
std::move(fms), cl, fence);
|
||||
}
|
||||
} catch (...) {
|
||||
// The leader receives a vector of mutations and processes them together,
|
||||
@@ -3343,6 +3363,8 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level
|
||||
throw mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(*erm, cl), db::write_type::COUNTER);
|
||||
} catch (rpc::closed_error&) {
|
||||
throw mutation_write_failure_exception(s->ks_name(), s->cf_name(), cl, 0, 1, db::block_for(*erm, cl), db::write_type::COUNTER);
|
||||
} catch (replica::stale_topology_exception& e) {
|
||||
throw mutation_write_failure_exception(e.what(), cl, 0, 1, db::block_for(*erm, cl), db::write_type::COUNTER);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user