proxy: Carry replication map with repair mutation(s)

The create_write_response_handler() for read repair needs the e.r.m.
from the caller, because it effectively accepts list of endpoints from
it.

So this patch equips all read_repair_mutation-s with the e.r.m. pointer
so that the handler creation can use it. It's the same for all
mutations, so it's a waste of space, but it's not bad -- there's
typically few mutations in this range and the entry passed there is
temporary, so even lots of them won't occupy lots of memory for long.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2022-12-14 13:12:30 +03:00
parent 140f373e15
commit ab8fc0e166
2 changed files with 6 additions and 9 deletions

View File

@@ -2694,6 +2694,7 @@ inline std::ostream& operator<<(std::ostream& os, const hint_wrapper& h) {
struct read_repair_mutation {
std::unordered_map<gms::inet_address, std::optional<mutation>> value;
locator::effective_replication_map_ptr ermp;
};
inline std::ostream& operator<<(std::ostream& os, const read_repair_mutation& m) {
@@ -2938,12 +2939,8 @@ storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db
slogger.trace("creating write handler for read repair token: {} endpoint: {}", mh->token(), endpoints);
tracing::trace(tr_state, "Creating write handler for read repair token: {} endpoint: {}", mh->token(), endpoints);
auto keyspace_name = mh->schema()->ks_name();
replica::keyspace& ks = _db.local().find_keyspace(keyspace_name);
auto ermp = ks.get_effective_replication_map();
// No rate limiting for read repair
return create_write_response_handler(std::move(ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate());
}
result<storage_proxy::response_id_type>
@@ -3850,12 +3847,12 @@ size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& m
}
}
future<result<>> storage_proxy::schedule_repair(std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state,
future<result<>> storage_proxy::schedule_repair(locator::effective_replication_map_ptr ermp, std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state,
service_permit permit) {
if (diffs.empty()) {
return make_ready_future<result<>>(bo::success());
}
return mutate_internal(diffs | boost::adaptors::map_values | boost::adaptors::transformed([] (auto& v) { return read_repair_mutation{std::move(v)}; }), cl, false, std::move(trace_state), std::move(permit));
return mutate_internal(diffs | boost::adaptors::map_values | boost::adaptors::transformed([ermp] (auto& v) { return read_repair_mutation{std::move(v), ermp}; }), cl, false, std::move(trace_state), std::move(permit));
}
class abstract_read_resolver {
@@ -4807,7 +4804,7 @@ protected:
// trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum
// another read had returned a newer value (but the newer value had not yet been sent to the other replicas)
// Waited on indirectly.
(void)_proxy->schedule_repair(data_resolver->get_diffs_for_repair(), _cl, _trace_state, _permit).then(utils::result_wrap([this, result = std::move(result)] () mutable {
(void)_proxy->schedule_repair(_effective_replication_map_ptr, data_resolver->get_diffs_for_repair(), _cl, _trace_state, _permit).then(utils::result_wrap([this, result = std::move(result)] () mutable {
_result_promise.set_value(std::move(result));
return make_ready_future<::result<>>(bo::success());
})).then_wrapped([this, exec] (future<::result<>>&& f) {

View File

@@ -387,7 +387,7 @@ private:
future<result<unique_response_handler_vector>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
future<result<>> mutate_begin(unique_response_handler_vector ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt = { });
future<result<>> mutate_end(future<result<>> mutate_result, utils::latency_counter, write_stats& stats, tracing::trace_state_ptr trace_state);
future<result<>> schedule_repair(std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
future<result<>> schedule_repair(locator::effective_replication_map_ptr ermp, std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
bool need_throttle_writes() const;
void unthrottle();
void handle_read_error(std::variant<exceptions::coordinator_exception_container, std::exception_ptr> failure, bool range);