materialized views: partial mutate_MV

This adds a function mutate_MV() which takes view mutations and sends
them to the appropriate nodes (this may be the current node, or a
remote node).

This is only a partial implementation - we still don't do the local
batch log (to survive reboots and failures) and some other stuff which
is left commented out.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Nadav Har'El
2017-01-16 17:52:20 +02:00
committed by Duarte Nunes
parent f2fd81ece0
commit 3ae73164a4
2 changed files with 123 additions and 0 deletions

View File

@@ -50,6 +50,8 @@
#include "locator/network_topology_strategy.hh"
#include "service/storage_service.hh"
static logging::logger logger("view");
namespace db {
namespace view {
@@ -674,6 +676,124 @@ get_view_natural_endpoint(const sstring& keyspace_name,
return view_endpoints[base_it - base_endpoints.begin()];
}
// Take the view mutations generated by generate_view_updates(), which pertain
// to a modification of a single base partition, and apply them to the
// appropriate paired replicas. This is done asynchronously - we do not wait
// for the writes to complete.
// FIXME: I dropped a lot of parameters the Cassandra version had,
// we may need them back: writeCommitLog, baseComplete, queryStartNanoTime.
void mutate_MV(const dht::token& base_token,
std::vector<mutation> mutations)
{
#if 0
Tracing.trace("Determining replicas for mutation");
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
long startTime = System.nanoTime();
try
{
// if we haven't joined the ring, write everything to batchlog because paired replicas may be stale
final UUID batchUUID = UUIDGen.getTimeUUID();
if (StorageService.instance.isStarting() || StorageService.instance.isJoining() || StorageService.instance.isMoving())
{
BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(),
mutations), writeCommitLog);
}
else
{
List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size());
List<Mutation> nonPairedMutations = new LinkedList<>();
Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey);
ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
//Since the base -> view replication is 1:1 we only need to store the BL locally
final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress());
BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
() -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
// add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
#endif
for (auto& mut : mutations) {
auto view_token = mut.token();
auto keyspace_name = mut.schema()->ks_name();
auto paired_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token);
auto pending_endpoints = service::get_local_storage_service().get_token_metadata().pending_endpoints_for(view_token, keyspace_name);
if (paired_endpoint) {
// When local node is the endpoint and there are no pending nodes we can
// Just apply the mutation locally.
auto my_address = utils::fb_utilities::get_broadcast_address();
if (*paired_endpoint == my_address && pending_endpoints.empty() &&
service::get_local_storage_service().is_joined()) {
// Note that we start here an asynchronous apply operation, and
// do not wait for it to complete.
// Note also that mutate_locally(mut) copies mut (in
// frozen from) so don't need to increase its lifetime.
service::get_local_storage_proxy().mutate_locally(mut).handle_exception([] (auto ep) {
logger.error("Error applying local view update: {}", ep);
});
} else {
#if 0
wrappers.add(wrapViewBatchResponseHandler(mutation,
consistencyLevel,
consistencyLevel,
Collections.singletonList(pairedEndpoint.get()),
baseComplete,
WriteType.BATCH,
cleanup,
queryStartNanoTime));
#endif
// FIXME: Temporary hack: send the write directly to paired_endpoint,
// without a batchlog, and without checking for success
// Note we don't wait for the asynchronous operation to complete
// FIXME: need to extend mut's lifetime???
service::get_local_storage_proxy().send_to_endpoint(mut, *paired_endpoint, db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) {
logger.error("Error applying view update to {}: {}", *paired_endpoint, ep);
});;
}
} else {
#if 0
//if there are no paired endpoints there are probably range movements going on,
//so we write to the local batchlog to replay later
if (pendingEndpoints.isEmpty())
logger.warn("Received base materialized view mutation for key {} that does not belong " +
"to this node. There is probably a range movement happening (move or decommission)," +
"but this node hasn't updated its ring metadata yet. Adding mutation to " +
"local batchlog to be replayed later.",
mutation.key());
nonPairedMutations.add(mutation);
}
#endif
}
}
#if 0
if (!wrappers.isEmpty())
{
// Apply to local batchlog memtable in this thread
BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), Lists.transform(wrappers, w -> w.mutation)),
writeCommitLog);
// now actually perform the writes and wait for them to complete
asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION);
}
#endif
#if 0
if (!nonPairedMutations.isEmpty())
{
BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), nonPairedMutations),
writeCommitLog);
}
}
#endif
#if 0
}
finally
{
viewWriteMetrics.addNano(System.nanoTime() - startTime);
}
#endif
}
} // namespace view
} // namespace db

View File

@@ -121,6 +121,9 @@ future<std::vector<mutation>> generate_view_updates(
streamed_mutation&& updates,
streamed_mutation&& existings);
void mutate_MV(const dht::token& base_token,
std::vector<mutation> mutations);
}
}