tablets, mv: fix base-view pairing to consider base replication map

In the view update code, the function get_view_natural_endpoint()
determines which view replica this base replica should send an update
to. It currently gets the *view* table's replication map (i.e., the map
from view tokens to lists of replicas holding the token), but assumes
that this is also the *base* table's replication map.

This assumption was true with vnodes, but is no longer true with
tablets - the base table's replication map can be completely different
from the view table's. By looking at the wrong mapping,
get_view_natural_endpoint() can believe that this node isn't really
a base-replica and drop the view update. Alternatively, it can think
it is a base replica - but use the wrong base-view pairing and create
base-view inconsistencies.

This patch solves this bug - get_view_natural_endpoint() now gets two
separate replication maps - the base's and the view's. The callers
need to remember what the base table was (in some cases they didn't
care at the point of the call), and pass it to the function call.

This patch also includes a simple test that reproduces the bug, and
confirms it is fixed: The test has a 6-node cluster using tablets
and a base table with RF=1, and writes one row to it. Before this
patch, the code usually gets confused, thinking the base replica
isn't a replica and loses the view update. With this patch, the
view update works.

Fixes #16227.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes scylladb/scylladb#16228
This commit is contained in:
Nadav Har'El
2023-11-29 23:20:27 +02:00
committed by Avi Kivity
parent 60af2f3cb2
commit 4505a86f46
4 changed files with 44 additions and 14 deletions

View File

@@ -1546,19 +1546,23 @@ bool needs_static_row(const mutation_partition& mp, const std::vector<view_and_b
// If the assumption that the given base token belongs to this replica
// does not hold, we return an empty optional.
static std::optional<gms::inet_address>
get_view_natural_endpoint(const locator::effective_replication_map_ptr& erm, bool network_topology,
const dht::token& base_token, const dht::token& view_token) {
auto& topology = erm->get_token_metadata_ptr()->get_topology();
get_view_natural_endpoint(
const locator::effective_replication_map_ptr& base_erm,
const locator::effective_replication_map_ptr& view_erm,
bool network_topology,
const dht::token& base_token,
const dht::token& view_token) {
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
auto my_address = utils::fb_utilities::get_broadcast_address();
auto my_datacenter = topology.get_datacenter();
std::vector<gms::inet_address> base_endpoints, view_endpoints;
for (auto&& base_endpoint : erm->get_natural_endpoints(base_token)) {
for (auto&& base_endpoint : base_erm->get_natural_endpoints(base_token)) {
if (!network_topology || topology.get_datacenter(base_endpoint) == my_datacenter) {
base_endpoints.push_back(base_endpoint);
}
}
for (auto&& view_endpoint : erm->get_natural_endpoints(view_token)) {
for (auto&& view_endpoint : view_erm->get_natural_endpoints(view_token)) {
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
if (view_endpoint == my_address) {
@@ -1617,6 +1621,7 @@ static bool should_update_synchronously(const schema& s) {
// appropriate paired replicas. This is done asynchronously - we do not wait
// for the writes to complete.
future<> view_update_generator::mutate_MV(
schema_ptr base,
dht::token base_token,
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,
@@ -1626,15 +1631,16 @@ future<> view_update_generator::mutate_MV(
service::allow_hints allow_hints,
wait_for_all_updates wait_for_all)
{
auto base_ermp = base->table().get_effective_replication_map();
static constexpr size_t max_concurrent_updates = 128;
co_await max_concurrent_for_each(view_updates, max_concurrent_updates,
[this, base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all] (frozen_mutation_and_schema mut) mutable -> future<> {
[this, base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all, base_ermp] (frozen_mutation_and_schema mut) mutable -> future<> {
auto view_token = dht::get_token(*mut.s, mut.fm.key());
auto ermp = mut.s->table().get_effective_replication_map();
auto view_ermp = mut.s->table().get_effective_replication_map();
auto& ks = _proxy.local().local_db().find_keyspace(mut.s->ks_name());
bool network_topology = dynamic_cast<const locator::network_topology_strategy*>(&ks.get_replication_strategy());
auto target_endpoint = get_view_natural_endpoint(ermp, network_topology, base_token, view_token);
auto remote_endpoints = ermp->get_pending_endpoints(view_token);
auto target_endpoint = get_view_natural_endpoint(base_ermp, view_ermp, network_topology, base_token, view_token);
auto remote_endpoints = view_ermp->get_pending_endpoints(view_token);
auto sem_units = pending_view_updates.split(mut.fm.representation().size());
const bool update_synchronously = should_update_synchronously(*mut.s);
@@ -1718,7 +1724,7 @@ future<> view_update_generator::mutate_MV(
stats.view_updates_pushed_remote += updates_pushed_remote;
cf_stats.total_view_updates_pushed_remote += updates_pushed_remote;
schema_ptr s = mut.s;
future<> view_update = apply_to_remote_endpoints(_proxy.local(), std::move(ermp), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped(
future<> view_update = apply_to_remote_endpoints(_proxy.local(), std::move(view_ermp), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped(
[s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote,
units = sem_units.split(sem_units.count()), apply_update_synchronously] (future<>&& f) mutable {
if (f.failed()) {

View File

@@ -11,6 +11,7 @@
#include "sstables/shared_sstable.hh"
#include "db/timeout_clock.hh"
#include "utils/chunked_vector.hh"
#include "schema/schema_fwd.hh"
#include <seastar/core/sharded.hh>
#include <seastar/core/metrics_registration.hh>
@@ -77,6 +78,7 @@ public:
replica::database& get_db() noexcept { return _db; }
future<> mutate_MV(
schema_ptr base,
dht::token base_token,
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,

View File

@@ -2206,7 +2206,7 @@ future<> table::generate_and_propagate_view_updates(shared_ptr<db::view::view_up
tracing::trace(tr_state, "Generated {} view update mutations", updates->size());
auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(*updates));
try {
co_await gen->mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats, tr_state,
co_await gen->mutate_MV(base, base_token, std::move(*updates), _view_stats, *_config.cf_stats, tr_state,
std::move(units), service::allow_hints::yes, db::view::wait_for_all_updates::no);
} catch (...) {
// Ignore exceptions: any individual failure to propagate a view update will be reported
@@ -2340,7 +2340,7 @@ future<> table::populate_views(
size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size);
auto units = co_await seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for);
units.adopt(seastar::consume_units(*_config.view_update_concurrency_semaphore, update_size - units_to_wait_for));
co_await gen->mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats,
co_await gen->mutate_MV(schema, base_token, std::move(*updates), _view_stats, *_config.cf_stats,
tracing::trace_state_ptr(), std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes);
} catch (...) {
if (!err) {

View File

@@ -23,7 +23,7 @@ async def test_tablet_mv_create(manager: ManagerClient):
delete it - that's it, we don't read or write the table.
Reproduces issue #16194.
"""
servers = [await manager.server_add() for i in range(1)]
servers = await manager.servers_add(1)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 100}")
@@ -41,7 +41,7 @@ async def test_tablet_mv_simple(manager: ManagerClient):
node anyway.
Reproduces issue #16209.
"""
servers = [await manager.server_add() for i in range(1)]
servers = await manager.servers_add(1)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 100}")
@@ -51,3 +51,25 @@ async def test_tablet_mv_simple(manager: ManagerClient):
# We used SYNCHRONOUS_UPDATES=TRUE, so the view should be updated:
assert [(3,2)] == list(await cql.run_async("SELECT * FROM test.tv WHERE c=3"))
await cql.run_async("DROP KEYSPACE test")
@pytest.mark.asyncio
async def test_tablet_mv_simple_6node(manager: ManagerClient):
"""A simple reproducer for a bug of forgetting that the view table has a
different tablet mapping from the base: Using the wrong tablet mapping
for the base table or view table can cause us to send a view update
to the wrong view replica - or not send a view update at all. A row
that we write on the base table will not be readable in the view.
We start a large-enough cluster (6 nodes) to increase the probability
that if the mapping is different for the one row we write, and the test
will fail if the bug exists.
Reproduces #16227.
"""
servers = await manager.servers_add(6)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 100}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int)")
await cql.run_async("CREATE MATERIALIZED VIEW test.tv AS SELECT * FROM test.test WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (c, pk) WITH SYNCHRONOUS_UPDATES = TRUE")
await cql.run_async("INSERT INTO test.test (pk, c) VALUES (2, 3)")
# We used SYNCHRONOUS_UPDATES=TRUE, so the view should be updated:
assert [(3,2)] == list(await cql.run_async("SELECT * FROM test.tv WHERE c=3"))
await cql.run_async("DROP KEYSPACE test")