Materialized Views: don't lose updates while cluster is changing

When the cluster is changed (nodes added or removed), ranges of tokens
are moved between nodes. Scylla initiates a streaming process between an
old and a new owner of the range, which can take a long time. During
that streaming time, the new owner of the range is known as a "pending node"
for this range, and all updates must go to both the old owner (in case the
movement fails!) and the pending node (in case the movement succeeds).

For materialized views, because they are ordinary tables, streaming moves
all the view's data that existed before the streaming started. But we did
not send updates done to the view *during* the streaming. A dtest
demonstrates that the new node will miss some of the view update, and will
require a repair of the view tables immediately after the cluster change
ends, which is not good. To fix that, we need to send every new update
that happens during the streaming also to the "pending node". We already
did this properly for base-table updates, but not to the view updates:
Each base table replica wrote to only one paired view table replica,
and nobody wrote to the new pending node (in case where there is one,
for the particular view token involved).

In this patch, we make sure that all view updates go also to the "pending
nodes" when there are any. We do the same thing that Cassandra does, which
is - *all* base replicas write the update to the pending node(s).
Arguably, it is inefficient that all replicas send the update to the same
node. In most cases it is enough to send it from just one base replica -
the one who is slated to be the new node's pair.  I opened
https://issues.apache.org/jira/browse/CASSANDRA-14262 about this idea.
But that is an optimization. The patch as-is already fixes the bug.

Fixes #3211

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20180313171853.17283-1-nyh@scylladb.com>
This commit is contained in:
Nadav Har'El
2018-03-13 19:18:53 +02:00
committed by Duarte Nunes
parent 934d805b4b
commit e9702aa126

View File

@@ -829,37 +829,36 @@ void mutate_MV(const dht::token& base_token,
auto paired_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token); auto paired_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token);
auto pending_endpoints = service::get_local_storage_service().get_token_metadata().pending_endpoints_for(view_token, keyspace_name); auto pending_endpoints = service::get_local_storage_service().get_token_metadata().pending_endpoints_for(view_token, keyspace_name);
if (paired_endpoint) { if (paired_endpoint) {
// When local node is the endpoint and there are no pending nodes we can // When paired endpoint is the local node, we can just apply
// Just apply the mutation locally. // the mutation locally.
auto my_address = utils::fb_utilities::get_broadcast_address(); auto my_address = utils::fb_utilities::get_broadcast_address();
if (*paired_endpoint == my_address && pending_endpoints.empty() && if (*paired_endpoint == my_address &&
service::get_local_storage_service().is_joined()) { service::get_local_storage_service().is_joined()) {
// Note that we start here an asynchronous apply operation, and // Note that we start here an asynchronous apply operation, and
// do not wait for it to complete. // do not wait for it to complete.
// Note also that mutate_locally(mut) copies mut (in // Note also that mutate_locally(mut) copies mut (in
// frozen from) so don't need to increase its lifetime. // frozen form) so don't need to increase its lifetime.
service::get_local_storage_proxy().mutate_locally(mut).handle_exception([] (auto ep) { service::get_local_storage_proxy().mutate_locally(mut).handle_exception([] (auto ep) {
vlogger.error("Error applying local view update: {}", ep); vlogger.error("Error applying local view update: {}", ep);
}); });
} else { } else {
#if 0
wrappers.add(wrapViewBatchResponseHandler(mutation,
consistencyLevel,
consistencyLevel,
Collections.singletonList(pairedEndpoint.get()),
baseComplete,
WriteType.BATCH,
cleanup,
queryStartNanoTime));
#endif
// FIXME: Temporary hack: send the write directly to paired_endpoint, // FIXME: Temporary hack: send the write directly to paired_endpoint,
// without a batchlog, and without checking for success // without a batchlog, and without checking for success
// Note we don't wait for the asynchronous operation to complete // Note we don't wait for the asynchronous operation to complete
// FIXME: need to extend mut's lifetime???
service::get_local_storage_proxy().send_to_endpoint(mut, *paired_endpoint, db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) { service::get_local_storage_proxy().send_to_endpoint(mut, *paired_endpoint, db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) {
vlogger.error("Error applying view update to {}: {}", *paired_endpoint, ep); vlogger.error("Error applying view update to {}: {}", *paired_endpoint, ep);
});; });;
} }
// When the ownership of the view partition is being moved to a
// new node (or nodes), listed in pending_enpoints, we also need
// to send the update there. Currently, we do this from *each* of
// the base replicas, but this is probably excessive - see
// See https://issues.apache.org/jira/browse/CASSANDRA-14262
for (auto&& pending : pending_endpoints) {
service::get_local_storage_proxy().send_to_endpoint(mut, pending, db::write_type::VIEW).handle_exception([pending] (auto ep) {
vlogger.error("Error applying view update to pending endpoint {}: {}", pending, ep);
});;
}
} else { } else {
#if 0 #if 0
//if there are no paired endpoints there are probably range movements going on, //if there are no paired endpoints there are probably range movements going on,