service: storage_proxy: make hint write handlers cancellable
Whether a write handler should be cancellable is now controlled by a parameter passed to `create_write_response_handler`. We plumb it down from `send_to_endpoint` which is called by hints manager. This will cause hint write handlers to immediately timeout when we shutdown or when a destination node is marked as dead. Fixes #8079
This commit is contained in:
@@ -1605,7 +1605,8 @@ static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, gms::in
|
||||
std::move(pending_endpoints),
|
||||
db::write_type::VIEW,
|
||||
std::move(tr_state),
|
||||
allow_hints);
|
||||
allow_hints,
|
||||
service::is_cancellable::yes);
|
||||
}
|
||||
|
||||
static bool should_update_synchronously(const schema& s) {
|
||||
|
||||
@@ -1165,8 +1165,6 @@ public:
|
||||
};
|
||||
};
|
||||
|
||||
using is_cancellable = bool_class<struct cancellable_tag>;
|
||||
|
||||
class abstract_write_response_handler : public seastar::enable_shared_from_this<abstract_write_response_handler>, public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
|
||||
protected:
|
||||
using error = storage_proxy::error;
|
||||
@@ -2307,7 +2305,7 @@ future<result<>> storage_proxy::response_wait(storage_proxy::response_id_type id
|
||||
result<storage_proxy::response_id_type> storage_proxy::create_write_response_handler(locator::effective_replication_map_ptr ermp,
|
||||
db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
|
||||
inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
|
||||
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info)
|
||||
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable cancellable)
|
||||
{
|
||||
shared_ptr<abstract_write_response_handler> h;
|
||||
auto& rs = ermp->get_replication_strategy();
|
||||
@@ -2317,7 +2315,6 @@ result<storage_proxy::response_id_type> storage_proxy::create_write_response_han
|
||||
} else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){
|
||||
h = ::make_shared<datacenter_sync_write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info);
|
||||
} else {
|
||||
is_cancellable cancellable{type == db::write_type::VIEW};
|
||||
h = ::make_shared<write_response_handler>(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info, cancellable);
|
||||
}
|
||||
return bo::success(register_response_handler(std::move(h)));
|
||||
@@ -2843,7 +2840,7 @@ storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, froze
|
||||
|
||||
result<storage_proxy::response_id_type>
|
||||
storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr<mutation_holder> mh,
|
||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
|
||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable cancellable) {
|
||||
replica::table& table = _db.local().find_column_family(s->id());
|
||||
auto erm = table.get_effective_replication_map();
|
||||
inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints_without_node_being_replaced(token);
|
||||
@@ -2907,7 +2904,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
|
||||
db::assure_sufficient_live_nodes(cl, *erm, live_endpoints, pending_endpoints);
|
||||
|
||||
return create_write_response_handler(std::move(erm), cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints,
|
||||
std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info);
|
||||
std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info, cancellable);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2920,13 +2917,13 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
|
||||
result<storage_proxy::response_id_type>
|
||||
storage_proxy::create_write_response_handler(const mutation& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
|
||||
return create_write_response_handler_helper(m.schema(), m.token(), std::make_unique<shared_mutation>(m), cl, type, tr_state,
|
||||
std::move(permit), allow_limit);
|
||||
std::move(permit), allow_limit, is_cancellable::no);
|
||||
}
|
||||
|
||||
result<storage_proxy::response_id_type>
|
||||
storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
|
||||
return create_write_response_handler_helper(h.mut.schema(), h.mut.token(), std::make_unique<hint_mutation>(h.mut), cl, type, tr_state,
|
||||
std::move(permit), allow_limit);
|
||||
std::move(permit), allow_limit, is_cancellable::yes);
|
||||
}
|
||||
|
||||
result<storage_proxy::response_id_type>
|
||||
@@ -2941,7 +2938,7 @@ storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db
|
||||
tracing::trace(tr_state, "Creating write handler for read repair token: {} endpoint: {}", mh->token(), endpoints);
|
||||
|
||||
// No rate limiting for read repair
|
||||
return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
|
||||
return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no);
|
||||
}
|
||||
|
||||
result<storage_proxy::response_id_type>
|
||||
@@ -2950,7 +2947,7 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo
|
||||
auto& [commit, s, h, t] = meta;
|
||||
|
||||
return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s, std::move(h)), cl,
|
||||
db::write_type::CAS, tr_state, std::move(permit), allow_limit);
|
||||
db::write_type::CAS, tr_state, std::move(permit), allow_limit, is_cancellable::no);
|
||||
}
|
||||
|
||||
result<storage_proxy::response_id_type>
|
||||
@@ -2967,7 +2964,7 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo
|
||||
|
||||
// No rate limiting for paxos (yet)
|
||||
return create_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
|
||||
inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
|
||||
inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no);
|
||||
}
|
||||
|
||||
void storage_proxy::register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker) {
|
||||
@@ -3495,7 +3492,7 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
|
||||
return _p.mutate_prepare<>(std::array<mutation, 1>{std::move(m)}, cl, db::write_type::BATCH_LOG, _permit, [this] (const mutation& m, db::consistency_level cl, db::write_type type, service_permit permit) {
|
||||
auto& table = _p._db.local().find_column_family(m.schema()->id());
|
||||
auto ermp = table.get_effective_replication_map();
|
||||
return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate());
|
||||
return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate(), is_cancellable::no);
|
||||
}).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) {
|
||||
_p.register_cdc_operation_result_tracker(ids, _cdc_tracker);
|
||||
return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout);
|
||||
@@ -3600,7 +3597,8 @@ future<> storage_proxy::send_to_endpoint(
|
||||
db::write_type type,
|
||||
tracing::trace_state_ptr tr_state,
|
||||
write_stats& stats,
|
||||
allow_hints allow_hints) {
|
||||
allow_hints allow_hints,
|
||||
is_cancellable cancellable) {
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
@@ -3612,7 +3610,7 @@ future<> storage_proxy::send_to_endpoint(
|
||||
timeout = clock_type::now() + 5min;
|
||||
}
|
||||
return mutate_prepare(std::array{std::move(m)}, cl, type, /* does view building should hold a real permit */ empty_service_permit(),
|
||||
[this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats] (
|
||||
[this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats, cancellable] (
|
||||
std::unique_ptr<mutation_holder>& m,
|
||||
db::consistency_level cl,
|
||||
db::write_type type, service_permit permit) mutable {
|
||||
@@ -3639,7 +3637,8 @@ future<> storage_proxy::send_to_endpoint(
|
||||
tr_state,
|
||||
stats,
|
||||
std::move(permit),
|
||||
std::monostate()); // TODO: Pass the correct enforcement type
|
||||
std::monostate(), // TODO: Pass the correct enforcement type
|
||||
cancellable);
|
||||
}).then(utils::result_wrap([this, cl, tr_state = std::move(tr_state), timeout = std::move(timeout)] (unique_response_handler_vector ids) mutable {
|
||||
return mutate_begin(std::move(ids), cl, std::move(tr_state), std::move(timeout));
|
||||
})).then_wrapped([p = shared_from_this(), lc, &stats] (future<result<>> f) {
|
||||
@@ -3653,7 +3652,8 @@ future<> storage_proxy::send_to_endpoint(
|
||||
inet_address_vector_topology_change pending_endpoints,
|
||||
db::write_type type,
|
||||
tracing::trace_state_ptr tr_state,
|
||||
allow_hints allow_hints) {
|
||||
allow_hints allow_hints,
|
||||
is_cancellable cancellable) {
|
||||
return send_to_endpoint(
|
||||
std::make_unique<shared_mutation>(std::move(fm_a_s)),
|
||||
std::move(target),
|
||||
@@ -3661,7 +3661,8 @@ future<> storage_proxy::send_to_endpoint(
|
||||
type,
|
||||
std::move(tr_state),
|
||||
get_stats(),
|
||||
allow_hints);
|
||||
allow_hints,
|
||||
cancellable);
|
||||
}
|
||||
|
||||
future<> storage_proxy::send_to_endpoint(
|
||||
@@ -3671,7 +3672,8 @@ future<> storage_proxy::send_to_endpoint(
|
||||
db::write_type type,
|
||||
tracing::trace_state_ptr tr_state,
|
||||
write_stats& stats,
|
||||
allow_hints allow_hints) {
|
||||
allow_hints allow_hints,
|
||||
is_cancellable cancellable) {
|
||||
return send_to_endpoint(
|
||||
std::make_unique<shared_mutation>(std::move(fm_a_s)),
|
||||
std::move(target),
|
||||
@@ -3679,7 +3681,8 @@ future<> storage_proxy::send_to_endpoint(
|
||||
type,
|
||||
std::move(tr_state),
|
||||
stats,
|
||||
allow_hints);
|
||||
allow_hints,
|
||||
cancellable);
|
||||
}
|
||||
|
||||
future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target) {
|
||||
@@ -3691,7 +3694,8 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
|
||||
db::write_type::SIMPLE,
|
||||
tracing::trace_state_ptr(),
|
||||
get_stats(),
|
||||
allow_hints::no);
|
||||
allow_hints::no,
|
||||
is_cancellable::yes);
|
||||
}
|
||||
|
||||
return send_to_endpoint(
|
||||
@@ -3701,7 +3705,8 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
|
||||
db::write_type::SIMPLE,
|
||||
tracing::trace_state_ptr(),
|
||||
get_stats(),
|
||||
allow_hints::no);
|
||||
allow_hints::no,
|
||||
is_cancellable::yes);
|
||||
}
|
||||
|
||||
future<> storage_proxy::send_hint_to_all_replicas(frozen_mutation_and_schema fm_a_s) {
|
||||
|
||||
@@ -99,6 +99,8 @@ struct view_update_backlog_timestamped {
|
||||
struct allow_hints_tag {};
|
||||
using allow_hints = bool_class<allow_hints_tag>;
|
||||
|
||||
using is_cancellable = bool_class<struct cancellable_tag>;
|
||||
|
||||
struct storage_proxy_coordinator_query_result {
|
||||
foreign_ptr<lw_shared_ptr<query::result>> query_result;
|
||||
replicas_per_token_range last_replicas;
|
||||
@@ -307,9 +309,9 @@ private:
|
||||
::shared_ptr<abstract_write_response_handler>& get_write_response_handler(storage_proxy::response_id_type id);
|
||||
result<response_id_type> create_write_response_handler_helper(schema_ptr s, const dht::token& token,
|
||||
std::unique_ptr<mutation_holder> mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state,
|
||||
service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable);
|
||||
result<response_id_type> create_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, inet_address_vector_replica_set targets,
|
||||
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info);
|
||||
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable);
|
||||
result<response_id_type> create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
result<response_id_type> create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
result<response_id_type> create_write_response_handler(const read_repair_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
@@ -416,7 +418,8 @@ private:
|
||||
db::write_type type,
|
||||
tracing::trace_state_ptr tr_state,
|
||||
write_stats& stats,
|
||||
allow_hints allow_hints = allow_hints::yes);
|
||||
allow_hints,
|
||||
is_cancellable);
|
||||
|
||||
db::view::update_backlog get_view_update_backlog() const;
|
||||
|
||||
@@ -565,9 +568,9 @@ public:
|
||||
// hinted handoff support, and just one target. See also
|
||||
// send_to_live_endpoints() - another take on the same original function.
|
||||
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type,
|
||||
tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints allow_hints = allow_hints::yes);
|
||||
tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints, is_cancellable);
|
||||
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type,
|
||||
tracing::trace_state_ptr tr_state, allow_hints allow_hints = allow_hints::yes);
|
||||
tracing::trace_state_ptr tr_state, allow_hints, is_cancellable);
|
||||
|
||||
// Send a mutation to a specific remote target as a hint.
|
||||
// Unlike regular mutations during write operations, hints are sent on the streaming connection
|
||||
|
||||
Reference in New Issue
Block a user