secondary-index, tablets: ensure that LSI are synchronous
CQL Local Secondary Index is a Scylla-only extension to Cassandra's secondary index API where the index is separate per partition. Scylla's documentation guarantees that: "As of Scylla Open Source 4.0, updates for local secondary indexes are performed synchronously. When updates are synchronous, the client acknowledges the write operation only after both the base table modification and the view up date are written." This happened automatically with vnodes, because the base table and the view have the same partition key, so base and view replicas are co-located, and the view update is always local and therefore done synchronously. But with tablets, this does NOT happen automatically - the base and view tablets may be located on different nodes, and the view update may be remote, and NOT synchronous. So in this patch we explicitly mark the view as synchronous_update when building the view for an LSI. The bigger part of this patch is to add a test which reliably fails before this patch, and passes after it. The test creates a two-node cluster and a table with LSI, and pins the base's tablets to one node and the view's to the second node, forcing the view updates to be remote. It also uses an injection point to make the view update slower. The test then writes to the base and immediately tries to use the index to read. Before this patch, the read doesn't find the new data (contrary to the guarantee in the documentation). After this patch, the read does find the new data - because the write waited for the index to be updated. Fixes #16371 Signed-off-by: Nadav Har'El <nyh@scylladb.com>
This commit is contained in:
@@ -20,6 +20,7 @@
|
||||
#include "replica/database.hh"
|
||||
#include "db/view/view.hh"
|
||||
#include "concrete_types.hh"
|
||||
#include "db/tags/extension.hh"
|
||||
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <boost/algorithm/cxx11/any_of.hpp>
|
||||
@@ -281,6 +282,13 @@ view_ptr secondary_index_manager::create_view_for_index(const index_metadata& im
|
||||
format("{} IS NOT NULL", index_target->name_as_cql_string()) :
|
||||
"";
|
||||
builder.with_view_info(*schema, false, where_clause);
|
||||
// A local secondary index should be backed by a *synchronous* view,
|
||||
// see #16371. A view is marked synchronous with a tag. Non-local indexes
|
||||
// do not need the tags schema extension at all.
|
||||
if (im.local()) {
|
||||
std::map<sstring, sstring> tags_map = {{db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY, "true"}};
|
||||
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
|
||||
}
|
||||
return view_ptr{builder.build()};
|
||||
}
|
||||
|
||||
|
||||
@@ -7,16 +7,74 @@
|
||||
# Tests for interaction of materialized views with *tablets*
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, read_barrier
|
||||
from test.pylib.internal_types import ServerInfo
|
||||
from test.topology.conftest import skip_mode
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# This convenience function takes the name of a table or a view, and a token,
|
||||
# and returns the list of host_id,shard pairs holding tablets for this token
|
||||
# and view.
|
||||
# You also need to specify a specific server to use for the requests, to
|
||||
# ensure that if you send tablet-migration commands to one server, you also
|
||||
# read the replicas information from the same server (it takes time for this
|
||||
# information to propagate to all servers).
|
||||
async def get_tablet_replicas(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_or_view_name: str, token: int):
|
||||
host = (await wait_for_cql_and_get_hosts(manager.cql, [server], time.time() + 60))[0]
|
||||
await read_barrier(manager.cql, host)
|
||||
|
||||
# "table_or_view_name" may be either a table or a view, and those are
|
||||
# listed in different system tables so we may need to search both:
|
||||
matches = list(await manager.cql.run_async(f"select id from system_schema.tables where keyspace_name = '{keyspace_name}' and table_name = '{table_or_view_name}'"))
|
||||
if not matches:
|
||||
matches = list(await manager.cql.run_async(f"select id from system_schema.views where keyspace_name = '{keyspace_name}' and view_name = '{table_or_view_name}'"))
|
||||
assert len(matches) == 1
|
||||
table_id = matches[0].id
|
||||
|
||||
rows = await manager.cql.run_async(f"SELECT last_token, replicas FROM system.tablets where "
|
||||
f"keyspace_name = '{keyspace_name}' and "
|
||||
f"table_id = {table_id}", host=host)
|
||||
for row in rows:
|
||||
if row.last_token >= token:
|
||||
return row.replicas
|
||||
|
||||
# This convenience function assumes a table has RF=1 and only a single tablet,
|
||||
# and moves it to one specific node "server" - and pins it there (disabling
|
||||
# further tablet load-balancing). It is not specified which *shard* on that
|
||||
# node will receive the tablet.
|
||||
async def pin_the_only_tablet(manager, keyspace_name, table_or_view_name, server):
|
||||
# We need to send load-balancing commands to one of the nodes and they
|
||||
# will be propagated to all of them. Since we already know of
|
||||
# target_server, let's just use that.
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
tablet_token = 0 # Doesn't matter since there is one tablet
|
||||
source_replicas = await get_tablet_replicas(manager, server, keyspace_name, table_or_view_name, tablet_token)
|
||||
# We assume RF=1 so get_tablet_replicas() returns just one replica
|
||||
assert len(source_replicas) == 1
|
||||
source_host_id, source_shard = source_replicas[0]
|
||||
|
||||
target_host_id = await manager.get_host_id(server.server_id)
|
||||
target_shard = 0 # We don't care which shard to use
|
||||
|
||||
# Currently migrating a tablet in the same node is not allowed.
|
||||
# We need to just do nothing in this case - the tablet is already in
|
||||
# its desired node (and we didn't specify which shard is desired).
|
||||
# The str() is needed because we can't compare HostId to string :-(
|
||||
if str(target_host_id) == str(source_host_id):
|
||||
return
|
||||
|
||||
# Finally move the tablet. We can send the command to any of the hosts,
|
||||
# it will propagate it to all of them.
|
||||
await manager.api.move_tablet(server.ip_addr, keyspace_name, table_or_view_name, source_host_id, source_shard, target_host_id, target_shard, tablet_token)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_mv_create(manager: ManagerClient):
|
||||
@@ -218,3 +276,56 @@ async def test_tablet_lsi_create(manager: ManagerClient):
|
||||
await cql.run_async("CREATE INDEX my_idx ON test.test((pk),c)")
|
||||
await cql.run_async("DROP INDEX test.my_idx")
|
||||
await cql.run_async("DROP KEYSPACE test")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_cql_lsi(manager: ManagerClient):
|
||||
"""A simple reproducer for issue #16371 where CQL LSI (local secondary
|
||||
index) was not using synchronous view updates when tablets are enabled,
|
||||
contrary to what the documentation for local SI says. In other words,
|
||||
we could write to a table with CL=QUORUM and then try to read with
|
||||
CL=QUORUM using the index - and not find the data.
|
||||
|
||||
We use a cluster of just two nodes and RF=1, and control the tablets
|
||||
so all base tablets will be in node 0 and all view tablets will be
|
||||
in node 1, to ensure that the view update is remote and therefore
|
||||
not synchronous by default. To make the test failure even more
|
||||
likely on a fast machine, we use the "delay_before_remote_view_update"
|
||||
injection point to add a delay to the view update more than usual.
|
||||
Reproduces #16371.
|
||||
"""
|
||||
servers = await manager.servers_add(2)
|
||||
cql = manager.get_cql()
|
||||
|
||||
# Create a table with an LSI, using tablets. Use just 1 tablets,
|
||||
# which is silly in any real-world use case, but makes this test simpler
|
||||
# and faster.
|
||||
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1, 'initial_tablets': 1}")
|
||||
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int)")
|
||||
await cql.run_async("CREATE INDEX my_idx ON test.test((pk),c)")
|
||||
|
||||
# Move the base tablet (there's just one) to node 0, and the view tablet
|
||||
# (of the view backing the index) to node 1. In particular all view
|
||||
# updates will then be remote: node 0 will send view updates to node 1.
|
||||
await pin_the_only_tablet(manager, 'test', 'test', servers[0])
|
||||
await pin_the_only_tablet(manager, 'test', 'my_idx_index', servers[1])
|
||||
|
||||
# Add a fixed (0.5 second) delay before view updates, to increase the
|
||||
# likehood that if the write didn't wait for the view update, we can try
|
||||
# reading before the view update happened and fail the test.
|
||||
await inject_error_on(manager, "delay_before_remote_view_update", servers);
|
||||
|
||||
# Write to the base table (whose only replica is on node 0).
|
||||
zzz = time.time()
|
||||
await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES (7, 42)")
|
||||
# If synchronous update worked, this log message should say more
|
||||
# than 0.5 seconds (the delay added by injection). If it didn't work,
|
||||
# the time will be less than 0.5 seconds and the read is likely to fail.
|
||||
logger.info(f"Insert took {time.time()-zzz}")
|
||||
# Read using the index (whose only replica is on node 1, and delayed
|
||||
# by the injection above). LSI should use synchronous view updates,
|
||||
# so the data should be searchable through the local secondary index
|
||||
# immediately after the previous INSERT returned.
|
||||
assert [(7,42)] == list(await cql.run_async(f"SELECT * FROM test.test WHERE pk=7 AND c=42"))
|
||||
|
||||
await cql.run_async("DROP KEYSPACE test")
|
||||
|
||||
Reference in New Issue
Block a user