From 85cffd1aebb73312ac581bf8ad6a84a756329c8c Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 1 Dec 2020 18:02:13 +0200 Subject: [PATCH] lwt: rewrite storage_proxy::cas using coroutings Makes code much simpler to understand. Message-Id: <20201201160213.GW1655743@scylladb.com> --- service/paxos/paxos_state.cc | 8 +- service/paxos/paxos_state.hh | 36 +++++- service/storage_proxy.cc | 242 +++++++++++++++++------------------ service/storage_proxy.hh | 2 +- 4 files changed, 156 insertions(+), 132 deletions(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 327d6d0c18..a36a4c9333 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -20,7 +20,7 @@ * You should have received a copy of the GNU General Public License * along with Scylla. If not, see . */ - +#include "seastar/core/coroutine.hh" #include "service/storage_proxy.hh" #include "service/paxos/proposal.hh" #include "service/paxos/paxos_state.hh" @@ -52,6 +52,12 @@ void paxos_state::key_lock_map::release_semaphore_for_key(const dht::token& key) } } +future paxos_state::get_cas_lock(const dht::token& key, clock_type::time_point timeout) { + guard m(_coordinator_lock, key, timeout); + co_await m.lock(); + co_return m; +} + future paxos_state::prepare(tracing::trace_state_ptr tr_state, schema_ptr schema, const query::read_command& cmd, const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) { diff --git a/service/paxos/paxos_state.hh b/service/paxos/paxos_state.hh index e55879f129..4762b044d9 100644 --- a/service/paxos/paxos_state.hh +++ b/service/paxos/paxos_state.hh @@ -54,6 +54,10 @@ using clock_type = db::timeout_clock; // The state of a CAS update of a given primary key as persisted in the paxos table. class paxos_state { +public: + class guard; +private: + class key_lock_map { using semaphore = basic_semaphore; using map = std::unordered_map; @@ -75,6 +79,8 @@ class paxos_state { release_semaphore_for_key(key); }); } + + friend class guard; }; // Locks are local to the shard which owns the corresponding token range. @@ -100,12 +106,30 @@ class paxos_state { public: - // taken by the coordinator code to queue concurrent requests - template - static - futurize_t> with_cas_lock(const dht::token& key, clock_type::time_point timeout, Func func) { - return _coordinator_lock.with_locked_key(key, timeout, std::move(func)); - } + class guard { + key_lock_map& _map; + dht::token _key; + clock_type::time_point _timeout; + bool _locked = false; + public: + future<> lock() { + auto f = _map.get_semaphore_for_key(_key).wait(_timeout, 1); + _locked = true; + return f; + } + guard(key_lock_map& map, const dht::token& key, clock_type::time_point timeout) : _map(map), _key(key), _timeout(timeout) {}; + guard(guard&& o) noexcept : _map(o._map), _key(std::move(o._key)), _timeout(o._timeout), _locked(o._locked) { + o._locked = false; + } + ~guard() { + if (_locked) { + _map.get_semaphore_for_key(_key).signal(1); + _map.release_semaphore_for_key(_key); + } + } + }; + + static future get_cas_lock(const dht::token& key, clock_type::time_point timeout); static logging::logger logger; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 0780a55ac2..a3c15bb5bd 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -95,6 +95,7 @@ #include "service/paxos/prepare_summary.hh" #include "service/paxos/proposal.hh" #include "locator/token_metadata.hh" +#include "seastar/core/coroutine.hh" namespace bi = boost::intrusive; @@ -4403,7 +4404,7 @@ static mutation_write_failure_exception read_failure_to_write(schema_ptr s, read * WARNING: the function should be called on a shard that owns the key cas() operates on */ future storage_proxy::cas(schema_ptr schema, shared_ptr request, lw_shared_ptr cmd, - dht::partition_range_vector&& partition_ranges, storage_proxy::coordinator_query_options query_options, + dht::partition_range_vector partition_ranges, storage_proxy::coordinator_query_options query_options, db::consistency_level cl_for_paxos, db::consistency_level cl_for_learn, clock_type::time_point write_timeout, clock_type::time_point cas_timeout, bool write) { @@ -4439,94 +4440,16 @@ future storage_proxy::cas(schema_ptr schema, shared_ptr reque db::consistency_level cl = cl_for_paxos == db::consistency_level::LOCAL_SERIAL ? db::consistency_level::LOCAL_QUORUM : db::consistency_level::QUORUM; - return do_with(unsigned(0), [this, handler, schema, cmd, request, partition_ranges = std::move(partition_ranges), - query_options = std::move(query_options), cl, write_timeout, cl_for_paxos, write] (unsigned& contentions) mutable { - dht::token token = partition_ranges[0].start()->value().as_decorated_key().token(); - utils::latency_counter lc; - lc.start(); + unsigned contentions; - return paxos::paxos_state::with_cas_lock(token, write_timeout, [this, lc, handler, schema, cmd, request, - partition_ranges = std::move(partition_ranges), query_options = std::move(query_options), cl, - &contentions, write] () mutable { - return repeat_until_value([this, handler, schema, cmd, request, partition_ranges = std::move(partition_ranges), - query_options = std::move(query_options), cl, &contentions, write] () mutable { - // Finish the previous PAXOS round, if any, and, as a side effect, compute - // a ballot (round identifier) which is a) unique b) has good chances of being - // recent enough. - return handler->begin_and_repair_paxos(query_options.cstate, contentions, write) - .then([this, handler, schema, cmd, request, partition_ranges, query_options, cl, &contentions, write] - (paxos_response_handler::ballot_and_data v) mutable { - // Read the current values and check they validate the conditions. - auto f = [&]() { - if (v.data) { - paxos::paxos_state::logger.debug("CAS[{}]: Using prefetched values for CAS precondition", - handler->id()); - tracing::trace(handler->tr_state, "Using prefetched values for CAS precondition"); + dht::token token = partition_ranges[0].start()->value().as_decorated_key().token(); + utils::latency_counter lc; + lc.start(); - return make_ready_future>>(std::move(v.data)); - } else { - paxos::paxos_state::logger.debug("CAS[{}]: Reading existing values for CAS precondition", - handler->id()); - tracing::trace(handler->tr_state, "Reading existing values for CAS precondition"); - ++get_stats().cas_failed_read_round_optimization; - return query(schema, cmd, std::move(partition_ranges), cl, query_options).then([](coordinator_query_result&& qr) { + bool condition_met; - return make_ready_future>>(std::move(qr.query_result)); - }); - } - }(); - return f.then([this, handler, schema, cmd, request, ballot = v.ballot, &contentions, write] (auto&& qr) { - auto mutation = request->apply(std::move(qr), cmd->slice, utils::UUID_gen::micros_timestamp(ballot)); - bool condition_met = true; - if (!mutation) { - if (write) { - paxos::paxos_state::logger.debug("CAS[{}] precondition does not match current values", handler->id()); - tracing::trace(handler->tr_state, "CAS precondition does not match current values"); - ++get_stats().cas_write_condition_not_met; - condition_met = false; - } - // If a condition is not met we still need to complete paxos round to achieve - // linearizability otherwise next write attempt may read differnt value as described - // in https://github.com/scylladb/scylla/issues/6299 - // Let's use empty mutation as a value and proceed - mutation.emplace(handler->schema(), handler->key()); - // since the value we are writing is dummy we may use minimal consistency level for learn - handler->set_cl_for_learn(db::consistency_level::ANY); - } else { - paxos::paxos_state::logger.debug("CAS[{}] precondition is met; proposing client-requested updates for {}", - handler->id(), ballot); - tracing::trace(handler->tr_state, "CAS precondition is met; proposing client-requested updates for {}", ballot); - } - - auto proposal = make_lw_shared(ballot, freeze(*mutation)); - - return handler->accept_proposal(proposal).then([handler, proposal, &contentions, condition_met] (bool is_accepted) mutable { - if (is_accepted) { - // The majority (aka a QUORUM) has promised the coordinator to - // accept the action associated with the computed ballot. - // Apply the mutation. - return handler->learn_decision(std::move(proposal)).then([handler, condition_met] { - paxos::paxos_state::logger.debug("CAS[{}] successful", handler->id()); - tracing::trace(handler->tr_state, "CAS successful"); - return std::optional(condition_met); - }).handle_exception_type([handler] (unavailable_exception& e) { - // if learning stage encountered unavailablity error lets re-map it to a write error - // since unavailable error means that operation has never ever started which is not the case here - schema_ptr schema = handler->schema(); - return make_exception_future>(mutation_write_timeout_exception(schema->ks_name(), schema->cf_name(), - e.consistency, e.alive, e.required, db::write_type::CAS)); - }); - } - paxos::paxos_state::logger.debug("CAS[{}] PAXOS proposal not accepted (pre-empted by a higher ballot)", - handler->id()); - tracing::trace(handler->tr_state, "PAXOS proposal not accepted (pre-empted by a higher ballot)"); - ++contentions; - return sleep_approx_50ms().then([] { return std::optional(); }); - }); - }); - }); - }); - }).then_wrapped([this, lc, &contentions, handler, schema, cl_for_paxos, write] (future f) mutable { + try { + auto update_stats = seastar::defer ([&] { get_stats().cas_foreground--; write ? get_stats().cas_write.mark(lc.stop().latency()) : get_stats().cas_read.mark(lc.stop().latency()); if (lc.is_start()) { @@ -4536,45 +4459,116 @@ future storage_proxy::cas(schema_ptr schema, shared_ptr reque if (contentions > 0) { write ? get_stats().cas_write_contention.add(contentions) : get_stats().cas_read_contention.add(contentions); } - try { - return make_ready_future(f.get0()); - } catch (read_failure_exception& ex) { - return write ? make_exception_future(read_failure_to_write(schema, ex)) : make_exception_future(std::move(ex)); - } catch (read_timeout_exception& ex) { - if (write) { - get_stats().cas_write_timeouts.mark(); - return make_exception_future(read_timeout_to_write(schema, ex)); - } else { - get_stats().cas_read_timeouts.mark(); - return make_exception_future(std::move(ex)); - } - } catch (mutation_write_failure_exception& ex) { - return write ? make_exception_future(std::move(ex)) : make_exception_future(write_failure_to_read(schema, ex)); - } catch (mutation_write_timeout_exception& ex) { - if (write) { - get_stats().cas_write_timeouts.mark(); - return make_exception_future(std::move(ex)); - } else { - get_stats().cas_read_timeouts.mark(); - return make_exception_future(write_timeout_to_read(schema, ex)); - } - } catch (exceptions::unavailable_exception& ex) { - write ? get_stats().cas_write_unavailables.mark() : get_stats().cas_read_unavailables.mark(); - return make_exception_future(std::move(ex)); - } catch (seastar::semaphore_timed_out& ex) { - paxos::paxos_state::logger.trace("CAS[{}]: timeout while waiting for row lock {}", handler->id()); - if (write) { - get_stats().cas_write_timeouts.mark(); - return make_exception_future(mutation_write_timeout_exception(schema->ks_name(), schema->cf_name(), - cl_for_paxos, 0, handler->block_for(), db::write_type::CAS)); - } else { - get_stats().cas_read_timeouts.mark(); - return make_exception_future(read_timeout_exception(schema->ks_name(), schema->cf_name(), - cl_for_paxos, 0, handler->block_for(), 0)); - } - } }); - }); + + paxos::paxos_state::guard l = co_await paxos::paxos_state::get_cas_lock(token, write_timeout); + + while (true) { + // Finish the previous PAXOS round, if any, and, as a side effect, compute + // a ballot (round identifier) which is a) unique b) has good chances of being + // recent enough. + auto [ballot, qr] = co_await handler->begin_and_repair_paxos(query_options.cstate, contentions, write); + // Read the current values and check they validate the conditions. + if (qr) { + paxos::paxos_state::logger.debug("CAS[{}]: Using prefetched values for CAS precondition", + handler->id()); + tracing::trace(handler->tr_state, "Using prefetched values for CAS precondition"); + } else { + paxos::paxos_state::logger.debug("CAS[{}]: Reading existing values for CAS precondition", + handler->id()); + tracing::trace(handler->tr_state, "Reading existing values for CAS precondition"); + ++get_stats().cas_failed_read_round_optimization; + + auto pr = partition_ranges; // cannot move original because it can be reused during retry + auto cqr = co_await query(schema, cmd, std::move(pr), cl, query_options); + qr = std::move(cqr.query_result); + } + + auto mutation = request->apply(std::move(qr), cmd->slice, utils::UUID_gen::micros_timestamp(ballot)); + condition_met = true; + if (!mutation) { + if (write) { + paxos::paxos_state::logger.debug("CAS[{}] precondition does not match current values", handler->id()); + tracing::trace(handler->tr_state, "CAS precondition does not match current values"); + ++get_stats().cas_write_condition_not_met; + condition_met = false; + } + // If a condition is not met we still need to complete paxos round to achieve + // linearizability otherwise next write attempt may read differnt value as described + // in https://github.com/scylladb/scylla/issues/6299 + // Let's use empty mutation as a value and proceed + mutation.emplace(handler->schema(), handler->key()); + // since the value we are writing is dummy we may use minimal consistency level for learn + handler->set_cl_for_learn(db::consistency_level::ANY); + } else { + paxos::paxos_state::logger.debug("CAS[{}] precondition is met; proposing client-requested updates for {}", + handler->id(), ballot); + tracing::trace(handler->tr_state, "CAS precondition is met; proposing client-requested updates for {}", ballot); + } + + auto proposal = make_lw_shared(ballot, freeze(*mutation)); + + bool is_accepted = co_await handler->accept_proposal(proposal); + if (is_accepted) { + // The majority (aka a QUORUM) has promised the coordinator to + // accept the action associated with the computed ballot. + // Apply the mutation. + try { + co_await handler->learn_decision(std::move(proposal)); + } catch (unavailable_exception& e) { + // if learning stage encountered unavailablity error lets re-map it to a write error + // since unavailable error means that operation has never ever started which is not + // the case here + schema_ptr schema = handler->schema(); + throw mutation_write_timeout_exception(schema->ks_name(), schema->cf_name(), + e.consistency, e.alive, e.required, db::write_type::CAS); + } + paxos::paxos_state::logger.debug("CAS[{}] successful", handler->id()); + tracing::trace(handler->tr_state, "CAS successful"); + break; + } else { + paxos::paxos_state::logger.debug("CAS[{}] PAXOS proposal not accepted (pre-empted by a higher ballot)", + handler->id()); + tracing::trace(handler->tr_state, "PAXOS proposal not accepted (pre-empted by a higher ballot)"); + ++contentions; + co_await sleep_approx_50ms(); + } + } + } catch (read_failure_exception& ex) { + write ? throw read_failure_to_write(schema, ex) : throw; + } catch (read_timeout_exception& ex) { + if (write) { + get_stats().cas_write_timeouts.mark(); + throw read_timeout_to_write(schema, ex); + } else { + get_stats().cas_read_timeouts.mark(); + throw; + } + } catch (mutation_write_failure_exception& ex) { + write ? throw : throw write_failure_to_read(schema, ex); + } catch (mutation_write_timeout_exception& ex) { + if (write) { + get_stats().cas_write_timeouts.mark(); + throw; + } else { + get_stats().cas_read_timeouts.mark(); + throw write_timeout_to_read(schema, ex); + } + } catch (exceptions::unavailable_exception& ex) { + write ? get_stats().cas_write_unavailables.mark() : get_stats().cas_read_unavailables.mark(); + throw; + } catch (seastar::semaphore_timed_out& ex) { + paxos::paxos_state::logger.trace("CAS[{}]: timeout while waiting for row lock {}", handler->id()); + if (write) { + get_stats().cas_write_timeouts.mark(); + throw mutation_write_timeout_exception(schema->ks_name(), schema->cf_name(), cl_for_paxos, 0, handler->block_for(), db::write_type::CAS); + } else { + get_stats().cas_read_timeouts.mark(); + throw read_timeout_exception(schema->ks_name(), schema->cf_name(), cl_for_paxos, 0, handler->block_for(), 0); + } + } + + co_return condition_met; } std::vector storage_proxy::get_live_endpoints(keyspace& ks, const dht::token& token) const { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 8294aba460..98859d0d31 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -603,7 +603,7 @@ public: tracing::trace_state_ptr trace_state = nullptr); future cas(schema_ptr schema, shared_ptr request, lw_shared_ptr cmd, - dht::partition_range_vector&& partition_ranges, coordinator_query_options query_options, + dht::partition_range_vector partition_ranges, coordinator_query_options query_options, db::consistency_level cl_for_paxos, db::consistency_level cl_for_learn, clock_type::time_point write_timeout, clock_type::time_point cas_timeout, bool write = true);