lwt: rewrite storage_proxy::cas using coroutings
Makes code much simpler to understand. Message-Id: <20201201160213.GW1655743@scylladb.com>
This commit is contained in:
committed by
Tomasz Grabiec
parent
a60c81b615
commit
85cffd1aeb
@@ -20,7 +20,7 @@
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "seastar/core/coroutine.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/paxos/proposal.hh"
|
||||
#include "service/paxos/paxos_state.hh"
|
||||
@@ -52,6 +52,12 @@ void paxos_state::key_lock_map::release_semaphore_for_key(const dht::token& key)
|
||||
}
|
||||
}
|
||||
|
||||
future<paxos_state::guard> paxos_state::get_cas_lock(const dht::token& key, clock_type::time_point timeout) {
|
||||
guard m(_coordinator_lock, key, timeout);
|
||||
co_await m.lock();
|
||||
co_return m;
|
||||
}
|
||||
|
||||
future<prepare_response> paxos_state::prepare(tracing::trace_state_ptr tr_state, schema_ptr schema,
|
||||
const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
|
||||
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
|
||||
|
||||
@@ -54,6 +54,10 @@ using clock_type = db::timeout_clock;
|
||||
|
||||
// The state of a CAS update of a given primary key as persisted in the paxos table.
|
||||
class paxos_state {
|
||||
public:
|
||||
class guard;
|
||||
private:
|
||||
|
||||
class key_lock_map {
|
||||
using semaphore = basic_semaphore<semaphore_default_exception_factory, clock_type>;
|
||||
using map = std::unordered_map<dht::token, semaphore>;
|
||||
@@ -75,6 +79,8 @@ class paxos_state {
|
||||
release_semaphore_for_key(key);
|
||||
});
|
||||
}
|
||||
|
||||
friend class guard;
|
||||
};
|
||||
|
||||
// Locks are local to the shard which owns the corresponding token range.
|
||||
@@ -100,12 +106,30 @@ class paxos_state {
|
||||
|
||||
public:
|
||||
|
||||
// taken by the coordinator code to queue concurrent requests
|
||||
template<typename Func>
|
||||
static
|
||||
futurize_t<std::result_of_t<Func()>> with_cas_lock(const dht::token& key, clock_type::time_point timeout, Func func) {
|
||||
return _coordinator_lock.with_locked_key(key, timeout, std::move(func));
|
||||
}
|
||||
class guard {
|
||||
key_lock_map& _map;
|
||||
dht::token _key;
|
||||
clock_type::time_point _timeout;
|
||||
bool _locked = false;
|
||||
public:
|
||||
future<> lock() {
|
||||
auto f = _map.get_semaphore_for_key(_key).wait(_timeout, 1);
|
||||
_locked = true;
|
||||
return f;
|
||||
}
|
||||
guard(key_lock_map& map, const dht::token& key, clock_type::time_point timeout) : _map(map), _key(key), _timeout(timeout) {};
|
||||
guard(guard&& o) noexcept : _map(o._map), _key(std::move(o._key)), _timeout(o._timeout), _locked(o._locked) {
|
||||
o._locked = false;
|
||||
}
|
||||
~guard() {
|
||||
if (_locked) {
|
||||
_map.get_semaphore_for_key(_key).signal(1);
|
||||
_map.release_semaphore_for_key(_key);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
static future<guard> get_cas_lock(const dht::token& key, clock_type::time_point timeout);
|
||||
|
||||
static logging::logger logger;
|
||||
|
||||
|
||||
@@ -95,6 +95,7 @@
|
||||
#include "service/paxos/prepare_summary.hh"
|
||||
#include "service/paxos/proposal.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "seastar/core/coroutine.hh"
|
||||
|
||||
namespace bi = boost::intrusive;
|
||||
|
||||
@@ -4403,7 +4404,7 @@ static mutation_write_failure_exception read_failure_to_write(schema_ptr s, read
|
||||
* WARNING: the function should be called on a shard that owns the key cas() operates on
|
||||
*/
|
||||
future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> request, lw_shared_ptr<query::read_command> cmd,
|
||||
dht::partition_range_vector&& partition_ranges, storage_proxy::coordinator_query_options query_options,
|
||||
dht::partition_range_vector partition_ranges, storage_proxy::coordinator_query_options query_options,
|
||||
db::consistency_level cl_for_paxos, db::consistency_level cl_for_learn,
|
||||
clock_type::time_point write_timeout, clock_type::time_point cas_timeout, bool write) {
|
||||
|
||||
@@ -4439,94 +4440,16 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
|
||||
db::consistency_level cl = cl_for_paxos == db::consistency_level::LOCAL_SERIAL ?
|
||||
db::consistency_level::LOCAL_QUORUM : db::consistency_level::QUORUM;
|
||||
|
||||
return do_with(unsigned(0), [this, handler, schema, cmd, request, partition_ranges = std::move(partition_ranges),
|
||||
query_options = std::move(query_options), cl, write_timeout, cl_for_paxos, write] (unsigned& contentions) mutable {
|
||||
dht::token token = partition_ranges[0].start()->value().as_decorated_key().token();
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
unsigned contentions;
|
||||
|
||||
return paxos::paxos_state::with_cas_lock(token, write_timeout, [this, lc, handler, schema, cmd, request,
|
||||
partition_ranges = std::move(partition_ranges), query_options = std::move(query_options), cl,
|
||||
&contentions, write] () mutable {
|
||||
return repeat_until_value([this, handler, schema, cmd, request, partition_ranges = std::move(partition_ranges),
|
||||
query_options = std::move(query_options), cl, &contentions, write] () mutable {
|
||||
// Finish the previous PAXOS round, if any, and, as a side effect, compute
|
||||
// a ballot (round identifier) which is a) unique b) has good chances of being
|
||||
// recent enough.
|
||||
return handler->begin_and_repair_paxos(query_options.cstate, contentions, write)
|
||||
.then([this, handler, schema, cmd, request, partition_ranges, query_options, cl, &contentions, write]
|
||||
(paxos_response_handler::ballot_and_data v) mutable {
|
||||
// Read the current values and check they validate the conditions.
|
||||
auto f = [&]() {
|
||||
if (v.data) {
|
||||
paxos::paxos_state::logger.debug("CAS[{}]: Using prefetched values for CAS precondition",
|
||||
handler->id());
|
||||
tracing::trace(handler->tr_state, "Using prefetched values for CAS precondition");
|
||||
dht::token token = partition_ranges[0].start()->value().as_decorated_key().token();
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>>(std::move(v.data));
|
||||
} else {
|
||||
paxos::paxos_state::logger.debug("CAS[{}]: Reading existing values for CAS precondition",
|
||||
handler->id());
|
||||
tracing::trace(handler->tr_state, "Reading existing values for CAS precondition");
|
||||
++get_stats().cas_failed_read_round_optimization;
|
||||
return query(schema, cmd, std::move(partition_ranges), cl, query_options).then([](coordinator_query_result&& qr) {
|
||||
bool condition_met;
|
||||
|
||||
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>>(std::move(qr.query_result));
|
||||
});
|
||||
}
|
||||
}();
|
||||
return f.then([this, handler, schema, cmd, request, ballot = v.ballot, &contentions, write] (auto&& qr) {
|
||||
auto mutation = request->apply(std::move(qr), cmd->slice, utils::UUID_gen::micros_timestamp(ballot));
|
||||
bool condition_met = true;
|
||||
if (!mutation) {
|
||||
if (write) {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] precondition does not match current values", handler->id());
|
||||
tracing::trace(handler->tr_state, "CAS precondition does not match current values");
|
||||
++get_stats().cas_write_condition_not_met;
|
||||
condition_met = false;
|
||||
}
|
||||
// If a condition is not met we still need to complete paxos round to achieve
|
||||
// linearizability otherwise next write attempt may read differnt value as described
|
||||
// in https://github.com/scylladb/scylla/issues/6299
|
||||
// Let's use empty mutation as a value and proceed
|
||||
mutation.emplace(handler->schema(), handler->key());
|
||||
// since the value we are writing is dummy we may use minimal consistency level for learn
|
||||
handler->set_cl_for_learn(db::consistency_level::ANY);
|
||||
} else {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] precondition is met; proposing client-requested updates for {}",
|
||||
handler->id(), ballot);
|
||||
tracing::trace(handler->tr_state, "CAS precondition is met; proposing client-requested updates for {}", ballot);
|
||||
}
|
||||
|
||||
auto proposal = make_lw_shared<paxos::proposal>(ballot, freeze(*mutation));
|
||||
|
||||
return handler->accept_proposal(proposal).then([handler, proposal, &contentions, condition_met] (bool is_accepted) mutable {
|
||||
if (is_accepted) {
|
||||
// The majority (aka a QUORUM) has promised the coordinator to
|
||||
// accept the action associated with the computed ballot.
|
||||
// Apply the mutation.
|
||||
return handler->learn_decision(std::move(proposal)).then([handler, condition_met] {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] successful", handler->id());
|
||||
tracing::trace(handler->tr_state, "CAS successful");
|
||||
return std::optional<bool>(condition_met);
|
||||
}).handle_exception_type([handler] (unavailable_exception& e) {
|
||||
// if learning stage encountered unavailablity error lets re-map it to a write error
|
||||
// since unavailable error means that operation has never ever started which is not the case here
|
||||
schema_ptr schema = handler->schema();
|
||||
return make_exception_future<std::optional<bool>>(mutation_write_timeout_exception(schema->ks_name(), schema->cf_name(),
|
||||
e.consistency, e.alive, e.required, db::write_type::CAS));
|
||||
});
|
||||
}
|
||||
paxos::paxos_state::logger.debug("CAS[{}] PAXOS proposal not accepted (pre-empted by a higher ballot)",
|
||||
handler->id());
|
||||
tracing::trace(handler->tr_state, "PAXOS proposal not accepted (pre-empted by a higher ballot)");
|
||||
++contentions;
|
||||
return sleep_approx_50ms().then([] { return std::optional<bool>(); });
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then_wrapped([this, lc, &contentions, handler, schema, cl_for_paxos, write] (future<bool> f) mutable {
|
||||
try {
|
||||
auto update_stats = seastar::defer ([&] {
|
||||
get_stats().cas_foreground--;
|
||||
write ? get_stats().cas_write.mark(lc.stop().latency()) : get_stats().cas_read.mark(lc.stop().latency());
|
||||
if (lc.is_start()) {
|
||||
@@ -4536,45 +4459,116 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
|
||||
if (contentions > 0) {
|
||||
write ? get_stats().cas_write_contention.add(contentions) : get_stats().cas_read_contention.add(contentions);
|
||||
}
|
||||
try {
|
||||
return make_ready_future<bool>(f.get0());
|
||||
} catch (read_failure_exception& ex) {
|
||||
return write ? make_exception_future<bool>(read_failure_to_write(schema, ex)) : make_exception_future<bool>(std::move(ex));
|
||||
} catch (read_timeout_exception& ex) {
|
||||
if (write) {
|
||||
get_stats().cas_write_timeouts.mark();
|
||||
return make_exception_future<bool>(read_timeout_to_write(schema, ex));
|
||||
} else {
|
||||
get_stats().cas_read_timeouts.mark();
|
||||
return make_exception_future<bool>(std::move(ex));
|
||||
}
|
||||
} catch (mutation_write_failure_exception& ex) {
|
||||
return write ? make_exception_future<bool>(std::move(ex)) : make_exception_future<bool>(write_failure_to_read(schema, ex));
|
||||
} catch (mutation_write_timeout_exception& ex) {
|
||||
if (write) {
|
||||
get_stats().cas_write_timeouts.mark();
|
||||
return make_exception_future<bool>(std::move(ex));
|
||||
} else {
|
||||
get_stats().cas_read_timeouts.mark();
|
||||
return make_exception_future<bool>(write_timeout_to_read(schema, ex));
|
||||
}
|
||||
} catch (exceptions::unavailable_exception& ex) {
|
||||
write ? get_stats().cas_write_unavailables.mark() : get_stats().cas_read_unavailables.mark();
|
||||
return make_exception_future<bool>(std::move(ex));
|
||||
} catch (seastar::semaphore_timed_out& ex) {
|
||||
paxos::paxos_state::logger.trace("CAS[{}]: timeout while waiting for row lock {}", handler->id());
|
||||
if (write) {
|
||||
get_stats().cas_write_timeouts.mark();
|
||||
return make_exception_future<bool>(mutation_write_timeout_exception(schema->ks_name(), schema->cf_name(),
|
||||
cl_for_paxos, 0, handler->block_for(), db::write_type::CAS));
|
||||
} else {
|
||||
get_stats().cas_read_timeouts.mark();
|
||||
return make_exception_future<bool>(read_timeout_exception(schema->ks_name(), schema->cf_name(),
|
||||
cl_for_paxos, 0, handler->block_for(), 0));
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
paxos::paxos_state::guard l = co_await paxos::paxos_state::get_cas_lock(token, write_timeout);
|
||||
|
||||
while (true) {
|
||||
// Finish the previous PAXOS round, if any, and, as a side effect, compute
|
||||
// a ballot (round identifier) which is a) unique b) has good chances of being
|
||||
// recent enough.
|
||||
auto [ballot, qr] = co_await handler->begin_and_repair_paxos(query_options.cstate, contentions, write);
|
||||
// Read the current values and check they validate the conditions.
|
||||
if (qr) {
|
||||
paxos::paxos_state::logger.debug("CAS[{}]: Using prefetched values for CAS precondition",
|
||||
handler->id());
|
||||
tracing::trace(handler->tr_state, "Using prefetched values for CAS precondition");
|
||||
} else {
|
||||
paxos::paxos_state::logger.debug("CAS[{}]: Reading existing values for CAS precondition",
|
||||
handler->id());
|
||||
tracing::trace(handler->tr_state, "Reading existing values for CAS precondition");
|
||||
++get_stats().cas_failed_read_round_optimization;
|
||||
|
||||
auto pr = partition_ranges; // cannot move original because it can be reused during retry
|
||||
auto cqr = co_await query(schema, cmd, std::move(pr), cl, query_options);
|
||||
qr = std::move(cqr.query_result);
|
||||
}
|
||||
|
||||
auto mutation = request->apply(std::move(qr), cmd->slice, utils::UUID_gen::micros_timestamp(ballot));
|
||||
condition_met = true;
|
||||
if (!mutation) {
|
||||
if (write) {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] precondition does not match current values", handler->id());
|
||||
tracing::trace(handler->tr_state, "CAS precondition does not match current values");
|
||||
++get_stats().cas_write_condition_not_met;
|
||||
condition_met = false;
|
||||
}
|
||||
// If a condition is not met we still need to complete paxos round to achieve
|
||||
// linearizability otherwise next write attempt may read differnt value as described
|
||||
// in https://github.com/scylladb/scylla/issues/6299
|
||||
// Let's use empty mutation as a value and proceed
|
||||
mutation.emplace(handler->schema(), handler->key());
|
||||
// since the value we are writing is dummy we may use minimal consistency level for learn
|
||||
handler->set_cl_for_learn(db::consistency_level::ANY);
|
||||
} else {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] precondition is met; proposing client-requested updates for {}",
|
||||
handler->id(), ballot);
|
||||
tracing::trace(handler->tr_state, "CAS precondition is met; proposing client-requested updates for {}", ballot);
|
||||
}
|
||||
|
||||
auto proposal = make_lw_shared<paxos::proposal>(ballot, freeze(*mutation));
|
||||
|
||||
bool is_accepted = co_await handler->accept_proposal(proposal);
|
||||
if (is_accepted) {
|
||||
// The majority (aka a QUORUM) has promised the coordinator to
|
||||
// accept the action associated with the computed ballot.
|
||||
// Apply the mutation.
|
||||
try {
|
||||
co_await handler->learn_decision(std::move(proposal));
|
||||
} catch (unavailable_exception& e) {
|
||||
// if learning stage encountered unavailablity error lets re-map it to a write error
|
||||
// since unavailable error means that operation has never ever started which is not
|
||||
// the case here
|
||||
schema_ptr schema = handler->schema();
|
||||
throw mutation_write_timeout_exception(schema->ks_name(), schema->cf_name(),
|
||||
e.consistency, e.alive, e.required, db::write_type::CAS);
|
||||
}
|
||||
paxos::paxos_state::logger.debug("CAS[{}] successful", handler->id());
|
||||
tracing::trace(handler->tr_state, "CAS successful");
|
||||
break;
|
||||
} else {
|
||||
paxos::paxos_state::logger.debug("CAS[{}] PAXOS proposal not accepted (pre-empted by a higher ballot)",
|
||||
handler->id());
|
||||
tracing::trace(handler->tr_state, "PAXOS proposal not accepted (pre-empted by a higher ballot)");
|
||||
++contentions;
|
||||
co_await sleep_approx_50ms();
|
||||
}
|
||||
}
|
||||
} catch (read_failure_exception& ex) {
|
||||
write ? throw read_failure_to_write(schema, ex) : throw;
|
||||
} catch (read_timeout_exception& ex) {
|
||||
if (write) {
|
||||
get_stats().cas_write_timeouts.mark();
|
||||
throw read_timeout_to_write(schema, ex);
|
||||
} else {
|
||||
get_stats().cas_read_timeouts.mark();
|
||||
throw;
|
||||
}
|
||||
} catch (mutation_write_failure_exception& ex) {
|
||||
write ? throw : throw write_failure_to_read(schema, ex);
|
||||
} catch (mutation_write_timeout_exception& ex) {
|
||||
if (write) {
|
||||
get_stats().cas_write_timeouts.mark();
|
||||
throw;
|
||||
} else {
|
||||
get_stats().cas_read_timeouts.mark();
|
||||
throw write_timeout_to_read(schema, ex);
|
||||
}
|
||||
} catch (exceptions::unavailable_exception& ex) {
|
||||
write ? get_stats().cas_write_unavailables.mark() : get_stats().cas_read_unavailables.mark();
|
||||
throw;
|
||||
} catch (seastar::semaphore_timed_out& ex) {
|
||||
paxos::paxos_state::logger.trace("CAS[{}]: timeout while waiting for row lock {}", handler->id());
|
||||
if (write) {
|
||||
get_stats().cas_write_timeouts.mark();
|
||||
throw mutation_write_timeout_exception(schema->ks_name(), schema->cf_name(), cl_for_paxos, 0, handler->block_for(), db::write_type::CAS);
|
||||
} else {
|
||||
get_stats().cas_read_timeouts.mark();
|
||||
throw read_timeout_exception(schema->ks_name(), schema->cf_name(), cl_for_paxos, 0, handler->block_for(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
co_return condition_met;
|
||||
}
|
||||
|
||||
std::vector<gms::inet_address> storage_proxy::get_live_endpoints(keyspace& ks, const dht::token& token) const {
|
||||
|
||||
@@ -603,7 +603,7 @@ public:
|
||||
tracing::trace_state_ptr trace_state = nullptr);
|
||||
|
||||
future<bool> cas(schema_ptr schema, shared_ptr<cas_request> request, lw_shared_ptr<query::read_command> cmd,
|
||||
dht::partition_range_vector&& partition_ranges, coordinator_query_options query_options,
|
||||
dht::partition_range_vector partition_ranges, coordinator_query_options query_options,
|
||||
db::consistency_level cl_for_paxos, db::consistency_level cl_for_learn,
|
||||
clock_type::time_point write_timeout, clock_type::time_point cas_timeout, bool write = true);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user