service/storage_proxy: Pass pending endpoints to send_to_endpoint()

This will allow us to minimize the number of mutation copies in
mutate_MV().

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <20180325121412.76844-1-duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2018-03-25 13:14:11 +01:00
committed by Avi Kivity
parent 389fb54a42
commit fb54c09e0b
4 changed files with 24 additions and 7 deletions

View File

@@ -335,7 +335,7 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(mutation
// to be generated as a result of hints sending.
if (boost::range::find(natural_endpoints, end_point_key()) != natural_endpoints.end()) {
manager_logger.trace("Sending directly to {}", end_point_key());
return _proxy.send_to_endpoint(std::move(m), end_point_key(), write_type::SIMPLE);
return _proxy.send_to_endpoint(std::move(m), end_point_key(), { }, write_type::SIMPLE);
} else {
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
return _proxy.mutate({std::move(m)}, consistency_level::ALL, nullptr);

View File

@@ -845,7 +845,7 @@ void mutate_MV(const dht::token& base_token,
// FIXME: Temporary hack: send the write directly to paired_endpoint,
// without a batchlog, and without checking for success
// Note we don't wait for the asynchronous operation to complete
service::get_local_storage_proxy().send_to_endpoint(mut, *paired_endpoint, db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) {
service::get_local_storage_proxy().send_to_endpoint(mut, *paired_endpoint, { }, db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) {
vlogger.error("Error applying view update to {}: {}", *paired_endpoint, ep);
});;
}
@@ -855,7 +855,7 @@ void mutate_MV(const dht::token& base_token,
// the base replicas, but this is probably excessive - see
// See https://issues.apache.org/jira/browse/CASSANDRA-14262
for (auto&& pending : pending_endpoints) {
service::get_local_storage_proxy().send_to_endpoint(mut, pending, db::write_type::VIEW).handle_exception([pending] (auto ep) {
service::get_local_storage_proxy().send_to_endpoint(mut, pending, { }, db::write_type::VIEW).handle_exception([pending] (auto ep) {
vlogger.error("Error applying view update to pending endpoint {}: {}", pending, ep);
});;
}

View File

@@ -1563,14 +1563,31 @@ bool storage_proxy::cannot_hint(gms::inet_address target) {
return hints_enabled() && _hints_manager->too_many_in_flight_hints_for(target);
}
future<> storage_proxy::send_to_endpoint(mutation m, gms::inet_address target, db::write_type type) {
future<> storage_proxy::send_to_endpoint(
mutation m,
gms::inet_address target,
std::vector<gms::inet_address> pending_endpoints,
db::write_type type) {
utils::latency_counter lc;
lc.start();
std::unordered_set<gms::inet_address> targets(pending_endpoints.begin(), pending_endpoints.end());
targets.insert(std::move(target));
return mutate_prepare(std::array<mutation, 1>{std::move(m)}, db::consistency_level::ONE, type,
[this, target] (const mutation& m, db::consistency_level cl, db::write_type type) {
[this, targets = std::move(targets), pending_endpoints = std::move(pending_endpoints)] (
const mutation& m,
db::consistency_level cl,
db::write_type type) mutable {
auto& ks = _db.local().find_keyspace(m.schema()->ks_name());
return create_write_response_handler(ks, cl, type, std::make_unique<shared_mutation>(m), {target}, {}, {}, nullptr);
return create_write_response_handler(
ks,
cl,
type,
std::make_unique<shared_mutation>(m),
std::move(targets),
pending_endpoints,
{ },
nullptr);
}).then([this] (std::vector<unique_response_handler> ids) {
return mutate_begin(std::move(ids), db::consistency_level::ONE);
}).then_wrapped([p = shared_from_this(), lc] (future<>&& f) {

View File

@@ -417,7 +417,7 @@ public:
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
// hinted handoff support, and just one target. See also
// send_to_live_endpoints() - another take on the same original function.
future<> send_to_endpoint(mutation m, gms::inet_address target, db::write_type type);
future<> send_to_endpoint(mutation m, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type);
/**
* Performs the truncate operatoin, which effectively deletes all data from