storage_proxy: add fencing for mutation

At the call site, we use the version, captured
in erm/token_metadata. In the handler, we use
double checking, apply_fence after the local
write guarantees that no mutations
succeed on coordinators if the fence version
has been updated on the replica during the write.

Fencing was also added to mutate_locally calls
on request coordinator, for the case
if this coordinator was isolated from the
topology change coordinator and missed the
barrier_and_drain command.
This commit is contained in:
Petr Gusev
2023-05-18 17:54:30 +04:00
parent 7fe707570a
commit 46f73fcaa6
2 changed files with 64 additions and 34 deletions

View File

@@ -21,8 +21,9 @@
#include "idl/per_partition_rate_limit_info.idl.hh"
#include "idl/keys.idl.hh"
#include "idl/uuid.idl.hh"
#include "idl/storage_service.idl.hh"
verb [[with_client_info, with_timeout, one_way]] mutation (frozen_mutation fm [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[ref]] [[version 1.3.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]]);
verb [[with_client_info, with_timeout, one_way]] mutation (frozen_mutation fm [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[ref]] [[version 1.3.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]]);
verb [[with_client_info, one_way]] mutation_done (unsigned shard, uint64_t response_id, db::view::update_backlog backlog [[version 3.1.0]]);
verb [[with_client_info, one_way]] mutation_failed (unsigned shard, uint64_t response_id, size_t num_failed, db::view::update_backlog backlog [[version 3.1.0]], replica::exception_variant exception [[version 5.1.0]]);
verb [[with_client_info, with_timeout]] counter_mutation (std::vector<frozen_mutation> fms, db::consistency_level cl, std::optional<tracing::trace_info> trace_info [[ref]]);

View File

@@ -172,7 +172,7 @@ public:
{
ser::storage_proxy_rpc_verbs::register_counter_mutation(&_ms, std::bind_front(&remote::handle_counter_mutation, this));
ser::storage_proxy_rpc_verbs::register_mutation(&_ms, std::bind_front(&remote::receive_mutation_handler, this, _sp._write_smp_service_group));
ser::storage_proxy_rpc_verbs::register_hint_mutation(&_ms, [this] <typename... Args>(Args&&... args) { return receive_mutation_handler(_sp._hints_write_smp_service_group, std::forward<Args>(args)..., std::monostate()); });
ser::storage_proxy_rpc_verbs::register_hint_mutation(&_ms, [this] <typename... Args>(Args&&... args) { return receive_mutation_handler(_sp._hints_write_smp_service_group, std::forward<Args>(args)..., std::monostate(), rpc::optional<fencing_token>{}); });
ser::storage_proxy_rpc_verbs::register_paxos_learn(&_ms, std::bind_front(&remote::handle_paxos_learn, this));
ser::storage_proxy_rpc_verbs::register_mutation_done(&_ms, std::bind_front(&remote::handle_mutation_done, this));
ser::storage_proxy_rpc_verbs::register_mutation_failed(&_ms, std::bind_front(&remote::handle_mutation_failed, this));
@@ -214,11 +214,12 @@ public:
future<> send_mutation(
netw::msg_addr addr, storage_proxy::clock_type::time_point timeout, const std::optional<tracing::trace_info>& trace_info,
const frozen_mutation& m, const inet_address_vector_replica_set& forward, gms::inet_address reply_to, unsigned shard,
storage_proxy::response_id_type response_id, db::per_partition_rate_limit::info rate_limit_info) {
storage_proxy::response_id_type response_id, db::per_partition_rate_limit::info rate_limit_info,
fencing_token fence) {
return ser::storage_proxy_rpc_verbs::send_mutation(
&_ms, std::move(addr), timeout,
m, forward, std::move(reply_to), shard,
response_id, trace_info, rate_limit_info);
response_id, trace_info, rate_limit_info, fence);
}
future<> send_hint_mutation(
@@ -413,7 +414,7 @@ private:
netw::messaging_service::msg_addr src_addr, rpc::opt_time_point t,
auto schema_version, auto in, const inet_address_vector_replica_set& forward, gms::inet_address reply_to,
unsigned shard, storage_proxy::response_id_type response_id, const std::optional<tracing::trace_info>& trace_info,
auto&& apply_fn1, auto&& forward_fn1) {
fencing_token fence, auto&& apply_fn1, auto&& forward_fn1) {
auto apply_fn = std::move(apply_fn1);
auto forward_fn = std::move(forward_fn1);
@@ -448,13 +449,18 @@ private:
errors_info errors;
++p->get_stats().received_mutations;
p->get_stats().forwarded_mutations += forward.size();
if (auto stale = _sp.apply_fence(fence, src_addr.addr)) {
errors.count += (forward.size() + 1);
errors.local = std::move(*stale);
} else {
co_await coroutine::all(
[&] () -> future<> {
try {
// FIXME: get_schema_for_write() doesn't timeout
schema_ptr s = co_await get_schema_for_write(schema_version, netw::messaging_service::msg_addr{reply_to, shard}, timeout);
// Note: blocks due to execution_stage in replica::database::apply()
co_await apply_fn(p, trace_state_ptr, std::move(s), m, timeout);
co_await apply_fn(p, trace_state_ptr, std::move(s), m, timeout, fence);
// We wait for send_mutation_done to complete, otherwise, if reply_to is busy, we will accumulate
// lots of unsent responses, which can OOM our shard.
//
@@ -482,7 +488,7 @@ private:
// Note: not a coroutine, since forward_fn() typically returns a ready future
tracing::trace(trace_state_ptr, "Forwarding a mutation to /{}", forward);
return forward_fn(p, netw::messaging_service::msg_addr{forward, 0}, timeout, m, reply_to, shard, response_id,
tracing::make_trace_info(trace_state_ptr))
tracing::make_trace_info(trace_state_ptr), fence)
.then_wrapped([&] (future<> f) {
if (f.failed()) {
++p->get_stats().forwarding_errors;
@@ -493,6 +499,7 @@ private:
});
}
);
}
// ignore results, since we'll be returning them via MUTATION_DONE/MUTATION_FAILURE verbs
if (errors.count) {
auto f = co_await coroutine::as_future(send_mutation_failed(
@@ -512,7 +519,9 @@ private:
smp_service_group smp_grp, const rpc::client_info& cinfo, rpc::opt_time_point t,
frozen_mutation in, inet_address_vector_replica_set forward, gms::inet_address reply_to,
unsigned shard, storage_proxy::response_id_type response_id,
rpc::optional<std::optional<tracing::trace_info>> trace_info, rpc::optional<db::per_partition_rate_limit::info> rate_limit_info_opt) {
rpc::optional<std::optional<tracing::trace_info>> trace_info,
rpc::optional<db::per_partition_rate_limit::info> rate_limit_info_opt,
rpc::optional<fencing_token> fence) {
tracing::trace_state_ptr trace_state_ptr;
auto src_addr = netw::messaging_service::get_source(cinfo);
auto rate_limit_info = rate_limit_info_opt.value_or(std::monostate());
@@ -520,14 +529,15 @@ private:
auto schema_version = in.schema_version();
return handle_write(src_addr, t, schema_version, std::move(in), forward, reply_to, shard, response_id,
trace_info ? *trace_info : std::nullopt,
/* apply_fn */ [smp_grp, rate_limit_info] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s, const frozen_mutation& m,
clock_type::time_point timeout) {
return p->mutate_locally(std::move(s), m, std::move(tr_state), db::commitlog::force_sync::no, timeout, smp_grp, rate_limit_info);
fence.value_or(fencing_token{}),
/* apply_fn */ [smp_grp, rate_limit_info, src_ip = src_addr.addr] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s, const frozen_mutation& m,
clock_type::time_point timeout, fencing_token fence) {
return p->apply_fence(p->mutate_locally(std::move(s), m, std::move(tr_state), db::commitlog::force_sync::no, timeout, smp_grp, rate_limit_info), fence, src_ip);
},
/* forward_fn */ [this, rate_limit_info] (shared_ptr<storage_proxy>& p, netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const frozen_mutation& m,
gms::inet_address reply_to, unsigned shard, response_id_type response_id,
const std::optional<tracing::trace_info>& trace_info) {
return send_mutation(addr, timeout, trace_info, m, {}, reply_to, shard, response_id, rate_limit_info);
const std::optional<tracing::trace_info>& trace_info, fencing_token fence) {
return send_mutation(addr, timeout, trace_info, m, {}, reply_to, shard, response_id, rate_limit_info, fence);
});
}
@@ -541,13 +551,14 @@ private:
auto schema_version = decision.update.schema_version();
return handle_write(src_addr, t, schema_version, std::move(decision), forward, reply_to, shard,
response_id, trace_info,
fencing_token{},
/* apply_fn */ [] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s,
const paxos::proposal& decision, clock_type::time_point timeout) {
const paxos::proposal& decision, clock_type::time_point timeout, fencing_token) {
return paxos::paxos_state::learn(*p, std::move(s), decision, timeout, tr_state);
},
/* forward_fn */ [this] (shared_ptr<storage_proxy>&, netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const paxos::proposal& m,
gms::inet_address reply_to, unsigned shard, response_id_type response_id,
const std::optional<tracing::trace_info>& trace_info) {
const std::optional<tracing::trace_info>& trace_info, fencing_token) {
return send_paxos_learn(addr, timeout, trace_info, m, {}, reply_to, shard, response_id);
});
}
@@ -573,16 +584,20 @@ private:
return _sp.container().invoke_on(shard, _sp._write_ack_smp_service_group,
[from, response_id, num_failed, backlog = std::move(backlog), exception = std::move(exception)] (storage_proxy& sp) mutable {
error err = error::FAILURE;
std::optional<sstring> msg;
if (exception) {
err = std::visit([] <typename Ex> (Ex&) {
err = std::visit([&] <typename Ex> (Ex& e) {
if constexpr (std::is_same_v<Ex, replica::rate_limit_exception>) {
return error::RATE_LIMIT;
} else if constexpr (std::is_same_v<Ex, replica::unknown_exception> || std::is_same_v<Ex, replica::no_exception>) {
return error::FAILURE;
} else if constexpr(std::is_same_v<Ex, replica::stale_topology_exception>) {
msg = e.what();
return error::FAILURE;
}
}, exception->reason);
}
sp.got_failure_response(response_id, from, num_failed, std::move(backlog), err, std::nullopt);
sp.got_failure_response(response_id, from, num_failed, std::move(backlog), err, std::move(msg));
return netw::messaging_service::no_wait();
});
}
@@ -901,10 +916,12 @@ public:
virtual ~mutation_holder() {}
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) = 0;
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info) = 0;
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
fencing_token fence) = 0;
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, const inet_address_vector_replica_set& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info) = 0;
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
fencing_token fence) = 0;
virtual bool is_shared() = 0;
size_t size() const {
return _size;
@@ -945,23 +962,25 @@ public:
}
}
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info) override {
auto m = _mutations[utils::fb_utilities::get_broadcast_address()];
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
fencing_token fence) override {
const auto my_ip = utils::fb_utilities::get_broadcast_address();
auto m = _mutations[my_ip];
if (m) {
tracing::trace(tr_state, "Executing a mutation locally");
return sp.mutate_locally(_schema, *m, std::move(tr_state), db::commitlog::force_sync::no, timeout, rate_limit_info);
return sp.apply_fence(sp.mutate_locally(_schema, *m, std::move(tr_state), db::commitlog::force_sync::no, timeout, rate_limit_info), fence, my_ip);
}
return make_ready_future<>();
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, const inet_address_vector_replica_set& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info) override {
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, fencing_token fence) override {
auto m = _mutations[ep];
if (m) {
tracing::trace(tr_state, "Sending a mutation to /{}", ep);
return sp.remote().send_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, tracing::make_trace_info(tr_state),
*m, forward, utils::fb_utilities::get_broadcast_address(), this_shard_id(),
response_id, rate_limit_info);
response_id, rate_limit_info, fence);
}
sp.got_response(response_id, ep, std::nullopt);
return make_ready_future<>();
@@ -997,17 +1016,19 @@ public:
return hm.store_hint(ep, _schema, _mutation, tr_state);
}
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info) override {
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
fencing_token fence) override {
tracing::trace(tr_state, "Executing a mutation locally");
return sp.mutate_locally(_schema, *_mutation, std::move(tr_state), db::commitlog::force_sync::no, timeout, rate_limit_info);
return sp.apply_fence(sp.mutate_locally(_schema, *_mutation, std::move(tr_state), db::commitlog::force_sync::no, timeout, rate_limit_info), fence, utils::fb_utilities::get_broadcast_address());
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, const inet_address_vector_replica_set& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info) override {
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
fencing_token fence) override {
tracing::trace(tr_state, "Sending a mutation to /{}", ep);
return sp.remote().send_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, tracing::make_trace_info(tr_state),
*_mutation, forward, utils::fb_utilities::get_broadcast_address(), this_shard_id(),
response_id, rate_limit_info);
response_id, rate_limit_info, fence);
}
virtual bool is_shared() override {
return true;
@@ -1025,14 +1046,15 @@ public:
throw std::runtime_error("Attempted to store a hint for a hint");
}
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info) override {
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
fencing_token) override {
// A hint will be sent to all relevant endpoints when the endpoint it was originally intended for
// becomes unavailable - this might include the current node
return sp.mutate_hint(_schema, *_mutation, std::move(tr_state), timeout);
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, const inet_address_vector_replica_set& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info) override {
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, fencing_token) override {
return sp.remote().send_hint_mutation(
netw::messaging_service::msg_addr{ep, 0}, timeout, tr_state,
*_mutation, forward, utils::fb_utilities::get_broadcast_address(), this_shard_id(), response_id, rate_limit_info);
@@ -1145,14 +1167,15 @@ public:
return false; // CAS does not save hints yet
}
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info) override {
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
fencing_token) override {
tracing::trace(tr_state, "Executing a learn locally");
// TODO: Enforce per partition rate limiting in paxos
return paxos::paxos_state::learn(sp, _schema, *_proposal, timeout, tr_state);
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, const inet_address_vector_replica_set& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info) override {
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, fencing_token) override {
tracing::trace(tr_state, "Sending a learn to /{}", ep);
// TODO: Enforce per partition rate limiting in paxos
return sp.remote().send_paxos_learn(
@@ -1450,12 +1473,16 @@ public:
return _mutation_holder->store_hint(hm, ep, tr_state);
}
future<> apply_locally(storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) {
return _mutation_holder->apply_locally(*_proxy, timeout, std::move(tr_state), adjust_rate_limit_for_local_operation(_rate_limit_info));
return _mutation_holder->apply_locally(*_proxy, timeout, std::move(tr_state),
adjust_rate_limit_for_local_operation(_rate_limit_info),
{_effective_replication_map_ptr->get_token_metadata().get_version()});
}
future<> apply_remotely(gms::inet_address ep, const inet_address_vector_replica_set& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) {
return _mutation_holder->apply_remotely(*_proxy, ep, forward, response_id, timeout, std::move(tr_state), _rate_limit_info);
return _mutation_holder->apply_remotely(*_proxy, ep, forward,
response_id, timeout, std::move(tr_state), _rate_limit_info,
{_effective_replication_map_ptr->get_token_metadata().get_version()});
}
const schema_ptr& get_schema() const {
return _mutation_holder->schema();
@@ -3894,6 +3921,8 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
if (try_catch<replica::rate_limit_exception>(eptr)) {
// There might be a lot of those, so ignore
err = error::RATE_LIMIT;
} else if (const auto* stale = try_catch<replica::stale_topology_exception>(eptr)) {
msg = stale->what();
} else if (try_catch<rpc::closed_error>(eptr)) {
// ignore, disconnect will be logged by gossiper
} else if (try_catch<seastar::gate_closed_exception>(eptr)) {