diff --git a/db/view/view.cc b/db/view/view.cc index e949cd911d..fbe572136e 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -829,37 +829,36 @@ void mutate_MV(const dht::token& base_token, auto paired_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token); auto pending_endpoints = service::get_local_storage_service().get_token_metadata().pending_endpoints_for(view_token, keyspace_name); if (paired_endpoint) { - // When local node is the endpoint and there are no pending nodes we can - // Just apply the mutation locally. + // When paired endpoint is the local node, we can just apply + // the mutation locally. auto my_address = utils::fb_utilities::get_broadcast_address(); - if (*paired_endpoint == my_address && pending_endpoints.empty() && + if (*paired_endpoint == my_address && service::get_local_storage_service().is_joined()) { // Note that we start here an asynchronous apply operation, and // do not wait for it to complete. // Note also that mutate_locally(mut) copies mut (in - // frozen from) so don't need to increase its lifetime. + // frozen form) so don't need to increase its lifetime. service::get_local_storage_proxy().mutate_locally(mut).handle_exception([] (auto ep) { vlogger.error("Error applying local view update: {}", ep); }); } else { -#if 0 - wrappers.add(wrapViewBatchResponseHandler(mutation, - consistencyLevel, - consistencyLevel, - Collections.singletonList(pairedEndpoint.get()), - baseComplete, - WriteType.BATCH, - cleanup, - queryStartNanoTime)); -#endif // FIXME: Temporary hack: send the write directly to paired_endpoint, // without a batchlog, and without checking for success // Note we don't wait for the asynchronous operation to complete - // FIXME: need to extend mut's lifetime??? service::get_local_storage_proxy().send_to_endpoint(mut, *paired_endpoint, db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) { vlogger.error("Error applying view update to {}: {}", *paired_endpoint, ep); });; } + // When the ownership of the view partition is being moved to a + // new node (or nodes), listed in pending_enpoints, we also need + // to send the update there. Currently, we do this from *each* of + // the base replicas, but this is probably excessive - see + // See https://issues.apache.org/jira/browse/CASSANDRA-14262 + for (auto&& pending : pending_endpoints) { + service::get_local_storage_proxy().send_to_endpoint(mut, pending, db::write_type::VIEW).handle_exception([pending] (auto ep) { + vlogger.error("Error applying view update to pending endpoint {}: {}", pending, ep); + });; + } } else { #if 0 //if there are no paired endpoints there are probably range movements going on,