diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 9ecb687e15..cbfb4dec50 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -335,7 +335,7 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(mutation // to be generated as a result of hints sending. if (boost::range::find(natural_endpoints, end_point_key()) != natural_endpoints.end()) { manager_logger.trace("Sending directly to {}", end_point_key()); - return _proxy.send_to_endpoint(std::move(m), end_point_key(), write_type::SIMPLE); + return _proxy.send_to_endpoint(std::move(m), end_point_key(), { }, write_type::SIMPLE); } else { manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key()); return _proxy.mutate({std::move(m)}, consistency_level::ALL, nullptr); diff --git a/db/view/view.cc b/db/view/view.cc index fbe572136e..ee50791d30 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -845,7 +845,7 @@ void mutate_MV(const dht::token& base_token, // FIXME: Temporary hack: send the write directly to paired_endpoint, // without a batchlog, and without checking for success // Note we don't wait for the asynchronous operation to complete - service::get_local_storage_proxy().send_to_endpoint(mut, *paired_endpoint, db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) { + service::get_local_storage_proxy().send_to_endpoint(mut, *paired_endpoint, { }, db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) { vlogger.error("Error applying view update to {}: {}", *paired_endpoint, ep); });; } @@ -855,7 +855,7 @@ void mutate_MV(const dht::token& base_token, // the base replicas, but this is probably excessive - see // See https://issues.apache.org/jira/browse/CASSANDRA-14262 for (auto&& pending : pending_endpoints) { - service::get_local_storage_proxy().send_to_endpoint(mut, pending, db::write_type::VIEW).handle_exception([pending] (auto ep) { + service::get_local_storage_proxy().send_to_endpoint(mut, pending, { }, db::write_type::VIEW).handle_exception([pending] (auto ep) { vlogger.error("Error applying view update to pending endpoint {}: {}", pending, ep); });; } diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 68fc490f64..b01959041a 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1563,14 +1563,31 @@ bool storage_proxy::cannot_hint(gms::inet_address target) { return hints_enabled() && _hints_manager->too_many_in_flight_hints_for(target); } -future<> storage_proxy::send_to_endpoint(mutation m, gms::inet_address target, db::write_type type) { +future<> storage_proxy::send_to_endpoint( + mutation m, + gms::inet_address target, + std::vector pending_endpoints, + db::write_type type) { utils::latency_counter lc; lc.start(); + std::unordered_set targets(pending_endpoints.begin(), pending_endpoints.end()); + targets.insert(std::move(target)); return mutate_prepare(std::array{std::move(m)}, db::consistency_level::ONE, type, - [this, target] (const mutation& m, db::consistency_level cl, db::write_type type) { + [this, targets = std::move(targets), pending_endpoints = std::move(pending_endpoints)] ( + const mutation& m, + db::consistency_level cl, + db::write_type type) mutable { auto& ks = _db.local().find_keyspace(m.schema()->ks_name()); - return create_write_response_handler(ks, cl, type, std::make_unique(m), {target}, {}, {}, nullptr); + return create_write_response_handler( + ks, + cl, + type, + std::make_unique(m), + std::move(targets), + pending_endpoints, + { }, + nullptr); }).then([this] (std::vector ids) { return mutate_begin(std::move(ids), db::consistency_level::ONE); }).then_wrapped([p = shared_from_this(), lc] (future<>&& f) { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index e995c63f15..67298622f6 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -417,7 +417,7 @@ public: // Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without // hinted handoff support, and just one target. See also // send_to_live_endpoints() - another take on the same original function. - future<> send_to_endpoint(mutation m, gms::inet_address target, db::write_type type); + future<> send_to_endpoint(mutation m, gms::inet_address target, std::vector pending_endpoints, db::write_type type); /** * Performs the truncate operatoin, which effectively deletes all data from