From 4505a86f46dfafa039246369c46140a714efadbd Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Wed, 29 Nov 2023 23:20:27 +0200 Subject: [PATCH] tablets, mv: fix base-view pairing to consider base replication map In the view update code, the function get_view_natural_endpoint() determines which view replica this base replica should send an update to. It currently gets the *view* table's replication map (i.e., the map from view tokens to lists of replicas holding the token), but assumes that this is also the *base* table's replication map. This assumption was true with vnodes, but is no longer true with tablets - the base table's replication map can be completely different from the view table's. By looking at the wrong mapping, get_view_natural_endpoint() can believe that this node isn't really a base-replica and drop the view update. Alternatively, it can think it is a base replica - but use the wrong base-view pairing and create base-view inconsistencies. This patch solves this bug - get_view_natural_endpoint() now gets two separate replication maps - the base's and the view's. The callers need to remember what the base table was (in some cases they didn't care at the point of the call), and pass it to the function call. This patch also includes a simple test that reproduces the bug, and confirms it is fixed: The test has a 6-node cluster using tablets and a base table with RF=1, and writes one row to it. Before this patch, the code usually gets confused, thinking the base replica isn't a replica and loses the view update. With this patch, the view update works. Fixes #16227. Signed-off-by: Nadav Har'El Closes scylladb/scylladb#16228 --- db/view/view.cc | 26 ++++++++++++------- db/view/view_update_generator.hh | 2 ++ replica/table.cc | 4 +-- .../test_mv_tablets.py | 26 +++++++++++++++++-- 4 files changed, 44 insertions(+), 14 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 1e541eb945..9da17d67aa 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1546,19 +1546,23 @@ bool needs_static_row(const mutation_partition& mp, const std::vector -get_view_natural_endpoint(const locator::effective_replication_map_ptr& erm, bool network_topology, - const dht::token& base_token, const dht::token& view_token) { - auto& topology = erm->get_token_metadata_ptr()->get_topology(); +get_view_natural_endpoint( + const locator::effective_replication_map_ptr& base_erm, + const locator::effective_replication_map_ptr& view_erm, + bool network_topology, + const dht::token& base_token, + const dht::token& view_token) { + auto& topology = base_erm->get_token_metadata_ptr()->get_topology(); auto my_address = utils::fb_utilities::get_broadcast_address(); auto my_datacenter = topology.get_datacenter(); std::vector base_endpoints, view_endpoints; - for (auto&& base_endpoint : erm->get_natural_endpoints(base_token)) { + for (auto&& base_endpoint : base_erm->get_natural_endpoints(base_token)) { if (!network_topology || topology.get_datacenter(base_endpoint) == my_datacenter) { base_endpoints.push_back(base_endpoint); } } - for (auto&& view_endpoint : erm->get_natural_endpoints(view_token)) { + for (auto&& view_endpoint : view_erm->get_natural_endpoints(view_token)) { // If this base replica is also one of the view replicas, we use // ourselves as the view replica. if (view_endpoint == my_address) { @@ -1617,6 +1621,7 @@ static bool should_update_synchronously(const schema& s) { // appropriate paired replicas. This is done asynchronously - we do not wait // for the writes to complete. future<> view_update_generator::mutate_MV( + schema_ptr base, dht::token base_token, utils::chunked_vector view_updates, db::view::stats& stats, @@ -1626,15 +1631,16 @@ future<> view_update_generator::mutate_MV( service::allow_hints allow_hints, wait_for_all_updates wait_for_all) { + auto base_ermp = base->table().get_effective_replication_map(); static constexpr size_t max_concurrent_updates = 128; co_await max_concurrent_for_each(view_updates, max_concurrent_updates, - [this, base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all] (frozen_mutation_and_schema mut) mutable -> future<> { + [this, base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all, base_ermp] (frozen_mutation_and_schema mut) mutable -> future<> { auto view_token = dht::get_token(*mut.s, mut.fm.key()); - auto ermp = mut.s->table().get_effective_replication_map(); + auto view_ermp = mut.s->table().get_effective_replication_map(); auto& ks = _proxy.local().local_db().find_keyspace(mut.s->ks_name()); bool network_topology = dynamic_cast(&ks.get_replication_strategy()); - auto target_endpoint = get_view_natural_endpoint(ermp, network_topology, base_token, view_token); - auto remote_endpoints = ermp->get_pending_endpoints(view_token); + auto target_endpoint = get_view_natural_endpoint(base_ermp, view_ermp, network_topology, base_token, view_token); + auto remote_endpoints = view_ermp->get_pending_endpoints(view_token); auto sem_units = pending_view_updates.split(mut.fm.representation().size()); const bool update_synchronously = should_update_synchronously(*mut.s); @@ -1718,7 +1724,7 @@ future<> view_update_generator::mutate_MV( stats.view_updates_pushed_remote += updates_pushed_remote; cf_stats.total_view_updates_pushed_remote += updates_pushed_remote; schema_ptr s = mut.s; - future<> view_update = apply_to_remote_endpoints(_proxy.local(), std::move(ermp), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped( + future<> view_update = apply_to_remote_endpoints(_proxy.local(), std::move(view_ermp), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped( [s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote, units = sem_units.split(sem_units.count()), apply_update_synchronously] (future<>&& f) mutable { if (f.failed()) { diff --git a/db/view/view_update_generator.hh b/db/view/view_update_generator.hh index 7efaac5bfa..df2be95842 100644 --- a/db/view/view_update_generator.hh +++ b/db/view/view_update_generator.hh @@ -11,6 +11,7 @@ #include "sstables/shared_sstable.hh" #include "db/timeout_clock.hh" #include "utils/chunked_vector.hh" +#include "schema/schema_fwd.hh" #include #include @@ -77,6 +78,7 @@ public: replica::database& get_db() noexcept { return _db; } future<> mutate_MV( + schema_ptr base, dht::token base_token, utils::chunked_vector view_updates, db::view::stats& stats, diff --git a/replica/table.cc b/replica/table.cc index e63669fcdf..d4042c5de8 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2206,7 +2206,7 @@ future<> table::generate_and_propagate_view_updates(shared_ptrsize()); auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(*updates)); try { - co_await gen->mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats, tr_state, + co_await gen->mutate_MV(base, base_token, std::move(*updates), _view_stats, *_config.cf_stats, tr_state, std::move(units), service::allow_hints::yes, db::view::wait_for_all_updates::no); } catch (...) { // Ignore exceptions: any individual failure to propagate a view update will be reported @@ -2340,7 +2340,7 @@ future<> table::populate_views( size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size); auto units = co_await seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for); units.adopt(seastar::consume_units(*_config.view_update_concurrency_semaphore, update_size - units_to_wait_for)); - co_await gen->mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats, + co_await gen->mutate_MV(schema, base_token, std::move(*updates), _view_stats, *_config.cf_stats, tracing::trace_state_ptr(), std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes); } catch (...) { if (!err) { diff --git a/test/topology_experimental_raft/test_mv_tablets.py b/test/topology_experimental_raft/test_mv_tablets.py index 1d3db4653d..147d954892 100644 --- a/test/topology_experimental_raft/test_mv_tablets.py +++ b/test/topology_experimental_raft/test_mv_tablets.py @@ -23,7 +23,7 @@ async def test_tablet_mv_create(manager: ManagerClient): delete it - that's it, we don't read or write the table. Reproduces issue #16194. """ - servers = [await manager.server_add() for i in range(1)] + servers = await manager.servers_add(1) cql = manager.get_cql() await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 100}") @@ -41,7 +41,7 @@ async def test_tablet_mv_simple(manager: ManagerClient): node anyway. Reproduces issue #16209. """ - servers = [await manager.server_add() for i in range(1)] + servers = await manager.servers_add(1) cql = manager.get_cql() await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 100}") @@ -51,3 +51,25 @@ async def test_tablet_mv_simple(manager: ManagerClient): # We used SYNCHRONOUS_UPDATES=TRUE, so the view should be updated: assert [(3,2)] == list(await cql.run_async("SELECT * FROM test.tv WHERE c=3")) await cql.run_async("DROP KEYSPACE test") + +@pytest.mark.asyncio +async def test_tablet_mv_simple_6node(manager: ManagerClient): + """A simple reproducer for a bug of forgetting that the view table has a + different tablet mapping from the base: Using the wrong tablet mapping + for the base table or view table can cause us to send a view update + to the wrong view replica - or not send a view update at all. A row + that we write on the base table will not be readable in the view. + We start a large-enough cluster (6 nodes) to increase the probability + that if the mapping is different for the one row we write, and the test + will fail if the bug exists. + Reproduces #16227. + """ + servers = await manager.servers_add(6) + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 100}") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int)") + await cql.run_async("CREATE MATERIALIZED VIEW test.tv AS SELECT * FROM test.test WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (c, pk) WITH SYNCHRONOUS_UPDATES = TRUE") + await cql.run_async("INSERT INTO test.test (pk, c) VALUES (2, 3)") + # We used SYNCHRONOUS_UPDATES=TRUE, so the view should be updated: + assert [(3,2)] == list(await cql.run_async("SELECT * FROM test.tv WHERE c=3")) + await cql.run_async("DROP KEYSPACE test")