diff --git a/index/secondary_index_manager.cc b/index/secondary_index_manager.cc index b363c6352b..690ccd5236 100644 --- a/index/secondary_index_manager.cc +++ b/index/secondary_index_manager.cc @@ -20,6 +20,7 @@ #include "replica/database.hh" #include "db/view/view.hh" #include "concrete_types.hh" +#include "db/tags/extension.hh" #include #include @@ -281,6 +282,13 @@ view_ptr secondary_index_manager::create_view_for_index(const index_metadata& im format("{} IS NOT NULL", index_target->name_as_cql_string()) : ""; builder.with_view_info(*schema, false, where_clause); + // A local secondary index should be backed by a *synchronous* view, + // see #16371. A view is marked synchronous with a tag. Non-local indexes + // do not need the tags schema extension at all. + if (im.local()) { + std::map tags_map = {{db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY, "true"}}; + builder.add_extension(db::tags_extension::NAME, ::make_shared(tags_map)); + } return view_ptr{builder.build()}; } diff --git a/test/topology_experimental_raft/test_mv_tablets.py b/test/topology_experimental_raft/test_mv_tablets.py index d7346e026f..c59d583ca1 100644 --- a/test/topology_experimental_raft/test_mv_tablets.py +++ b/test/topology_experimental_raft/test_mv_tablets.py @@ -7,16 +7,74 @@ # Tests for interaction of materialized views with *tablets* from test.pylib.manager_client import ManagerClient +from test.pylib.util import wait_for_cql_and_get_hosts, read_barrier +from test.pylib.internal_types import ServerInfo from test.topology.conftest import skip_mode import pytest import asyncio import logging import random +import time logger = logging.getLogger(__name__) +# This convenience function takes the name of a table or a view, and a token, +# and returns the list of host_id,shard pairs holding tablets for this token +# and view. +# You also need to specify a specific server to use for the requests, to +# ensure that if you send tablet-migration commands to one server, you also +# read the replicas information from the same server (it takes time for this +# information to propagate to all servers). +async def get_tablet_replicas(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_or_view_name: str, token: int): + host = (await wait_for_cql_and_get_hosts(manager.cql, [server], time.time() + 60))[0] + await read_barrier(manager.cql, host) + + # "table_or_view_name" may be either a table or a view, and those are + # listed in different system tables so we may need to search both: + matches = list(await manager.cql.run_async(f"select id from system_schema.tables where keyspace_name = '{keyspace_name}' and table_name = '{table_or_view_name}'")) + if not matches: + matches = list(await manager.cql.run_async(f"select id from system_schema.views where keyspace_name = '{keyspace_name}' and view_name = '{table_or_view_name}'")) + assert len(matches) == 1 + table_id = matches[0].id + + rows = await manager.cql.run_async(f"SELECT last_token, replicas FROM system.tablets where " + f"keyspace_name = '{keyspace_name}' and " + f"table_id = {table_id}", host=host) + for row in rows: + if row.last_token >= token: + return row.replicas + +# This convenience function assumes a table has RF=1 and only a single tablet, +# and moves it to one specific node "server" - and pins it there (disabling +# further tablet load-balancing). It is not specified which *shard* on that +# node will receive the tablet. +async def pin_the_only_tablet(manager, keyspace_name, table_or_view_name, server): + # We need to send load-balancing commands to one of the nodes and they + # will be propagated to all of them. Since we already know of + # target_server, let's just use that. + await manager.api.disable_tablet_balancing(server.ip_addr) + tablet_token = 0 # Doesn't matter since there is one tablet + source_replicas = await get_tablet_replicas(manager, server, keyspace_name, table_or_view_name, tablet_token) + # We assume RF=1 so get_tablet_replicas() returns just one replica + assert len(source_replicas) == 1 + source_host_id, source_shard = source_replicas[0] + + target_host_id = await manager.get_host_id(server.server_id) + target_shard = 0 # We don't care which shard to use + + # Currently migrating a tablet in the same node is not allowed. + # We need to just do nothing in this case - the tablet is already in + # its desired node (and we didn't specify which shard is desired). + # The str() is needed because we can't compare HostId to string :-( + if str(target_host_id) == str(source_host_id): + return + + # Finally move the tablet. We can send the command to any of the hosts, + # it will propagate it to all of them. + await manager.api.move_tablet(server.ip_addr, keyspace_name, table_or_view_name, source_host_id, source_shard, target_host_id, target_shard, tablet_token) + @pytest.mark.asyncio async def test_tablet_mv_create(manager: ManagerClient): @@ -218,3 +276,56 @@ async def test_tablet_lsi_create(manager: ManagerClient): await cql.run_async("CREATE INDEX my_idx ON test.test((pk),c)") await cql.run_async("DROP INDEX test.my_idx") await cql.run_async("DROP KEYSPACE test") + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_cql_lsi(manager: ManagerClient): + """A simple reproducer for issue #16371 where CQL LSI (local secondary + index) was not using synchronous view updates when tablets are enabled, + contrary to what the documentation for local SI says. In other words, + we could write to a table with CL=QUORUM and then try to read with + CL=QUORUM using the index - and not find the data. + + We use a cluster of just two nodes and RF=1, and control the tablets + so all base tablets will be in node 0 and all view tablets will be + in node 1, to ensure that the view update is remote and therefore + not synchronous by default. To make the test failure even more + likely on a fast machine, we use the "delay_before_remote_view_update" + injection point to add a delay to the view update more than usual. + Reproduces #16371. + """ + servers = await manager.servers_add(2) + cql = manager.get_cql() + + # Create a table with an LSI, using tablets. Use just 1 tablets, + # which is silly in any real-world use case, but makes this test simpler + # and faster. + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 1}") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int)") + await cql.run_async("CREATE INDEX my_idx ON test.test((pk),c)") + + # Move the base tablet (there's just one) to node 0, and the view tablet + # (of the view backing the index) to node 1. In particular all view + # updates will then be remote: node 0 will send view updates to node 1. + await pin_the_only_tablet(manager, 'test', 'test', servers[0]) + await pin_the_only_tablet(manager, 'test', 'my_idx_index', servers[1]) + + # Add a fixed (0.5 second) delay before view updates, to increase the + # likehood that if the write didn't wait for the view update, we can try + # reading before the view update happened and fail the test. + await inject_error_on(manager, "delay_before_remote_view_update", servers); + + # Write to the base table (whose only replica is on node 0). + zzz = time.time() + await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES (7, 42)") + # If synchronous update worked, this log message should say more + # than 0.5 seconds (the delay added by injection). If it didn't work, + # the time will be less than 0.5 seconds and the read is likely to fail. + logger.info(f"Insert took {time.time()-zzz}") + # Read using the index (whose only replica is on node 1, and delayed + # by the injection above). LSI should use synchronous view updates, + # so the data should be searchable through the local secondary index + # immediately after the previous INSERT returned. + assert [(7,42)] == list(await cql.run_async(f"SELECT * FROM test.test WHERE pk=7 AND c=42")) + + await cql.run_async("DROP KEYSPACE test")