From 300e549267b2f51d0afc25b11707ca1d23b441a9 Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Mon, 4 Dec 2023 11:22:04 +0200 Subject: [PATCH] tablets, mv: disable self-pairing when tablets are used A write to a base table can generate one or more writes to a materialized view. The write to RF base replicas need to cause writes to RF view replicas. Our MV implementation, based on Cassandra's implementation, does this via "pairing": Each one of the base replicas involved in this write sends each view update to exactly one view replica. The function get_view_natural_endpoint() tells a base replica which of the view replicas it should send the update to. The standard pairing is based on the ring order: The first owner of the base token sends to the first owner of the view token, the second to the second, and so on. However, the existing code also uses an optimization we call self-pairing: If a single node is both a base replica and a base replica, the pairing is modified so this node sends the update to itself. This patch *disables* the self-pairing optimization in keyspaces that use tablets: The self-pairing optimization can cause the pairing to change after token ranges are moved between nodes, so it can break base-view consistency in some edge cases, leading to "ghost rows". With tablets, these range movements become even more frequent - they can happen even if the cluster doesn't grow. This is why we want to solve this problem for tablets. For backward compatibility and to avoid sudden inconsistencies emerging during upgrades, we decided to continue using the self-pairing optimization for keyspaces that are *not* using tablets (i.e., using vnoodes). Currently, we don't introduce a "CREATE MATERIALIZED VIEW" option to override these defaults - i.e., we don't provide a way to disable self-pairing with vnodes or to enable them with tablets. We could introduce such a schema flag later, if we ever want to (and I'm not sure we want to). It's important to note, that in some cases, this change has implications on when view updates become synchronous, in the tablets case. For example: * If we have 3 nodes and RF=3, with the self-pairing optimization each node is paired with itself, the view update is local, and is implicitly synchronous (without requiring a "synchronous_updates" flag). * In the same setup with tablets, without the self-pairing optimization (due to this patch), this is not guaranteed. Some view updates may not be synchronous, i.e., the base write will not wait for the view write. If the user really wants synchronous updates, they should be requested explicitly, with the "synchronous_updates" view option. Fixes #16260. Signed-off-by: Nadav Har'El Closes scylladb/scylladb#16272 --- db/view/view.cc | 55 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 4d3e5a2345..34000c47a7 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1536,6 +1536,14 @@ bool needs_static_row(const mutation_partition& mp, const std::vectorget_token_metadata_ptr()->get_topology(); auto my_address = topology.my_address(); auto my_datacenter = topology.get_datacenter(); @@ -1562,20 +1571,26 @@ get_view_natural_endpoint( } for (auto&& view_endpoint : view_erm->get_natural_endpoints(view_token)) { - // If this base replica is also one of the view replicas, we use - // ourselves as the view replica. - if (view_endpoint == my_address) { - return view_endpoint; - } - // We have to remove any endpoint which is shared between the base - // and the view, as it will select itself and throw off the counts - // otherwise. - auto it = std::find(base_endpoints.begin(), base_endpoints.end(), - view_endpoint); - if (it != base_endpoints.end()) { - base_endpoints.erase(it); - } else if (!network_topology || topology.get_datacenter(view_endpoint) == my_datacenter) { - view_endpoints.push_back(view_endpoint); + if (use_legacy_self_pairing) { + // If this base replica is also one of the view replicas, we use + // ourselves as the view replica. + if (view_endpoint == my_address) { + return view_endpoint; + } + // We have to remove any endpoint which is shared between the base + // and the view, as it will select itself and throw off the counts + // otherwise. + auto it = std::find(base_endpoints.begin(), base_endpoints.end(), + view_endpoint); + if (it != base_endpoints.end()) { + base_endpoints.erase(it); + } else if (!network_topology || topology.get_datacenter(view_endpoint) == my_datacenter) { + view_endpoints.push_back(view_endpoint); + } + } else { + if (!network_topology || topology.get_datacenter(view_endpoint) == my_datacenter) { + view_endpoints.push_back(view_endpoint); + } } } @@ -1583,6 +1598,8 @@ get_view_natural_endpoint( auto base_it = std::find(base_endpoints.begin(), base_endpoints.end(), my_address); if (base_it == base_endpoints.end()) { // This node is not a base replica of this key, so we return empty + // FIXME: This case shouldn't happen, and if it happens, a view update + // would be lost. We should reported or count this case. return {}; } return view_endpoints[base_it - base_endpoints.begin()]; @@ -1638,7 +1655,13 @@ future<> view_update_generator::mutate_MV( auto view_ermp = mut.s->table().get_effective_replication_map(); auto& ks = _proxy.local().local_db().find_keyspace(mut.s->ks_name()); bool network_topology = dynamic_cast(&ks.get_replication_strategy()); - auto target_endpoint = get_view_natural_endpoint(base_ermp, view_ermp, network_topology, base_token, view_token); + // We set legacy self-pairing for old vnode-based tables (for backward + // compatibility), and unset it for tablets - where range movements + // are more frequent and backward compatibility is less important. + // TODO: Maybe allow users to set use_legacy_self_pairing explicitly + // on a view, like we have the synchronous_updates_flag. + bool use_legacy_self_pairing = !ks.get_replication_strategy().uses_tablets(); + auto target_endpoint = get_view_natural_endpoint(base_ermp, view_ermp, network_topology, base_token, view_token, use_legacy_self_pairing); auto remote_endpoints = view_ermp->get_pending_endpoints(view_token); auto sem_units = pending_view_updates.split(mut.fm.representation().size());