diff --git a/db/view/view.cc b/db/view/view.cc index 3a4d36bc30..5094e405e9 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -50,6 +50,8 @@ #include "locator/network_topology_strategy.hh" #include "service/storage_service.hh" +static logging::logger logger("view"); + namespace db { namespace view { @@ -674,6 +676,124 @@ get_view_natural_endpoint(const sstring& keyspace_name, return view_endpoints[base_it - base_endpoints.begin()]; } +// Take the view mutations generated by generate_view_updates(), which pertain +// to a modification of a single base partition, and apply them to the +// appropriate paired replicas. This is done asynchronously - we do not wait +// for the writes to complete. +// FIXME: I dropped a lot of parameters the Cassandra version had, +// we may need them back: writeCommitLog, baseComplete, queryStartNanoTime. +void mutate_MV(const dht::token& base_token, + std::vector mutations) +{ +#if 0 + Tracing.trace("Determining replicas for mutation"); + final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + long startTime = System.nanoTime(); + + try + { + // if we haven't joined the ring, write everything to batchlog because paired replicas may be stale + final UUID batchUUID = UUIDGen.getTimeUUID(); + + if (StorageService.instance.isStarting() || StorageService.instance.isJoining() || StorageService.instance.isMoving()) + { + BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), + mutations), writeCommitLog); + } + else + { + List wrappers = new ArrayList<>(mutations.size()); + List nonPairedMutations = new LinkedList<>(); + Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey); + + ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; + + //Since the base -> view replication is 1:1 we only need to store the BL locally + final Collection batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress()); + BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), + () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); + // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet +#endif + for (auto& mut : mutations) { + auto view_token = mut.token(); + auto keyspace_name = mut.schema()->ks_name(); + auto paired_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token); + auto pending_endpoints = service::get_local_storage_service().get_token_metadata().pending_endpoints_for(view_token, keyspace_name); + if (paired_endpoint) { + // When local node is the endpoint and there are no pending nodes we can + // Just apply the mutation locally. + auto my_address = utils::fb_utilities::get_broadcast_address(); + if (*paired_endpoint == my_address && pending_endpoints.empty() && + service::get_local_storage_service().is_joined()) { + // Note that we start here an asynchronous apply operation, and + // do not wait for it to complete. + // Note also that mutate_locally(mut) copies mut (in + // frozen from) so don't need to increase its lifetime. + service::get_local_storage_proxy().mutate_locally(mut).handle_exception([] (auto ep) { + logger.error("Error applying local view update: {}", ep); + }); + } else { +#if 0 + wrappers.add(wrapViewBatchResponseHandler(mutation, + consistencyLevel, + consistencyLevel, + Collections.singletonList(pairedEndpoint.get()), + baseComplete, + WriteType.BATCH, + cleanup, + queryStartNanoTime)); +#endif + // FIXME: Temporary hack: send the write directly to paired_endpoint, + // without a batchlog, and without checking for success + // Note we don't wait for the asynchronous operation to complete + // FIXME: need to extend mut's lifetime??? + service::get_local_storage_proxy().send_to_endpoint(mut, *paired_endpoint, db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) { + logger.error("Error applying view update to {}: {}", *paired_endpoint, ep); + });; + } + } else { +#if 0 + //if there are no paired endpoints there are probably range movements going on, + //so we write to the local batchlog to replay later + if (pendingEndpoints.isEmpty()) + logger.warn("Received base materialized view mutation for key {} that does not belong " + + "to this node. There is probably a range movement happening (move or decommission)," + + "but this node hasn't updated its ring metadata yet. Adding mutation to " + + "local batchlog to be replayed later.", + mutation.key()); + nonPairedMutations.add(mutation); + } +#endif + } + } +#if 0 + if (!wrappers.isEmpty()) + { + // Apply to local batchlog memtable in this thread + BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), Lists.transform(wrappers, w -> w.mutation)), + writeCommitLog); + + // now actually perform the writes and wait for them to complete + asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION); + } +#endif +#if 0 + if (!nonPairedMutations.isEmpty()) + { + BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), nonPairedMutations), + writeCommitLog); + } + } +#endif +#if 0 + } + finally + { + viewWriteMetrics.addNano(System.nanoTime() - startTime); + } +#endif +} + } // namespace view } // namespace db diff --git a/db/view/view.hh b/db/view/view.hh index b8c8e73b3c..f804ee764c 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -121,6 +121,9 @@ future> generate_view_updates( streamed_mutation&& updates, streamed_mutation&& existings); +void mutate_MV(const dht::token& base_token, + std::vector mutations); + } }