Merge '[Backport 6.1] select from mutation_fragments() + tablets: handle reads for non-owned partitions' from ScyllaDB

Attempting to read a partition via `SELECT * FROM MUTATION_FRAGMENTS()`, which the node doesn't own, from a table using tablets causes a crash.
This is because when using tablets, the replica side simply doesn't handle requests for un-owned tokens and this triggers a crash.
We should probably improve how this is handled (an exception is better than a crash), but this is outside the scope of this PR.
This PR fixes this and also adds a reproducer test.

Fixes: https://github.com/scylladb/scylladb/issues/18786

Fixes a regression introduced in 6.0, so needs backport to 6.0 and 6.1

(cherry picked from commit de5329157c)

(cherry picked from commit 46563d719f)

(cherry picked from commit 4e2d7aa2a2)

 Refs #20109

Closes scylladb/scylladb#20313

* github.com:scylladb/scylladb:
  test/tablets: Test that reading tablets' mutations from MUTATION_FRAGMENTS works
  replica/mutation_dump: enfore pinning of effective replication map
  replica/mutation_dump: handle un-owned tokens (with tablets)
This commit is contained in:
Botond Dénes
2024-08-28 06:23:45 +03:00
5 changed files with 53 additions and 4 deletions

View File

@@ -1702,6 +1702,7 @@ schema_ptr mutation_fragments_select_statement::generate_output_schema(schema_pt
future<exceptions::coordinator_result<service::storage_proxy_coordinator_query_result>>
mutation_fragments_select_statement::do_query(
locator::effective_replication_map_ptr erm_keepalive,
locator::host_id this_node,
service::storage_proxy& sp,
schema_ptr schema,
@@ -1709,7 +1710,7 @@ mutation_fragments_select_statement::do_query(
dht::partition_range_vector partition_ranges,
db::consistency_level cl,
service::storage_proxy_coordinator_query_options optional_params) const {
auto res = co_await replica::mutation_dump::dump_mutations(sp.get_db(), schema, _underlying_schema, partition_ranges, *cmd, optional_params.timeout(sp));
auto res = co_await replica::mutation_dump::dump_mutations(sp.get_db(), std::move(erm_keepalive), schema, _underlying_schema, partition_ranges, *cmd, optional_params.timeout(sp));
service::replicas_per_token_range last_replicas;
if (this_node) {
last_replicas.emplace(dht::token_range::make_open_ended_both_sides(), std::vector<locator::host_id>{this_node});
@@ -1781,7 +1782,7 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
if (!aggregate && !_restrictions_need_filtering && (page_size <= 0
|| !service::pager::query_pagers::may_need_paging(*_schema, page_size,
*command, key_ranges))) {
return do_query({}, qp.proxy(), _schema, command, std::move(key_ranges), cl,
return do_query(erm_keepalive, {}, qp.proxy(), _schema, command, std::move(key_ranges), cl,
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}})
.then(wrap_result_to_error_message([this, erm_keepalive, now, slice = command->slice] (service::storage_proxy_coordinator_query_result&& qr) mutable {
cql3::selection::result_set_builder builder(*_selection, now);
@@ -1820,8 +1821,8 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
std::move(key_ranges),
_restrictions_need_filtering ? _restrictions : nullptr,
[this, erm_keepalive, this_node] (service::storage_proxy& sp, schema_ptr schema, lw_shared_ptr<query::read_command> cmd, dht::partition_range_vector partition_ranges,
db::consistency_level cl, service::storage_proxy_coordinator_query_options optional_params) {
return do_query(this_node, sp, std::move(schema), std::move(cmd), std::move(partition_ranges), cl, std::move(optional_params));
db::consistency_level cl, service::storage_proxy_coordinator_query_options optional_params) mutable {
return do_query(std::move(erm_keepalive), this_node, sp, std::move(schema), std::move(cmd), std::move(partition_ranges), cl, std::move(optional_params));
});
if (_selection->is_trivial() && !_restrictions_need_filtering && !_per_partition_limit) {

View File

@@ -335,6 +335,7 @@ public:
private:
future<exceptions::coordinator_result<service::storage_proxy_coordinator_query_result>>
do_query(
locator::effective_replication_map_ptr erm_keepalive,
locator::host_id this_node,
service::storage_proxy& sp,
schema_ptr schema,

View File

@@ -405,6 +405,20 @@ future<mutation_reader> make_partition_mutation_dump_reader(
tracing::trace_state_ptr ts,
db::timeout_clock::time_point timeout) {
const auto& tbl = db.local().find_column_family(underlying_schema);
// We can get a request for a token we don't own.
// Just return empty reader in this case, otherwise we will hit
// std::terminate because the replica side does not handle requests for
// un-owned tokens.
{
auto erm = tbl.get_effective_replication_map();
auto& topo = erm->get_topology();
const auto endpoints = erm->get_endpoints_for_reading(dk.token());
if (std::ranges::find(endpoints, topo.this_node()->endpoint()) == endpoints.end()) {
co_return make_empty_flat_reader_v2(output_schema, std::move(permit));
}
}
const auto shard = tbl.shard_for_reads(dk.token());
if (shard == this_shard_id()) {
co_return make_mutation_reader<mutation_dump_reader>(std::move(output_schema), std::move(underlying_schema), std::move(permit),
@@ -561,6 +575,7 @@ schema_ptr generate_output_schema_from_underlying_schema(schema_ptr underlying_s
future<foreign_ptr<lw_shared_ptr<query::result>>> dump_mutations(
sharded<database>& db,
locator::effective_replication_map_ptr erm_keepalive,
schema_ptr output_schema,
schema_ptr underlying_schema,
const dht::partition_range_vector& prs,

View File

@@ -11,12 +11,21 @@
#include "db/timeout_clock.hh"
#include "query-result.hh"
namespace locator {
class effective_replication_map;
using effective_replication_map_ptr = seastar::shared_ptr<const effective_replication_map>;
}
namespace replica::mutation_dump {
schema_ptr generate_output_schema_from_underlying_schema(schema_ptr underlying_schema);
future<foreign_ptr<lw_shared_ptr<query::result>>> dump_mutations(
sharded<database>& db,
locator::effective_replication_map_ptr erm_keepalive,
schema_ptr output_schema, // must have been generated from `underlying_schema`, with `generate_output_schema_from_underlying_schema()`
schema_ptr underlying_schema,
const dht::partition_range_vector& pr,

View File

@@ -167,6 +167,29 @@ async def test_tablet_rf_change(manager: ManagerClient, direction):
assert len(fragments[k]) == rf_to, f"Found mutations for {k} key on {fragments[k]} hosts, but expected only {rf_to} of them"
@pytest.mark.asyncio
async def test_tablet_mutation_fragments_unowned_partition(manager: ManagerClient):
"""Check that MUTATION_FRAGMENTS() queries handle the case when a partition
not owned by the node is attempted to be read."""
cfg = {'enable_user_defined_functions': False,
'enable_tablets': True }
servers = await manager.servers_add(3, config=cfg)
cql = manager.get_cql()
await cql.run_async(f"CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
logger.info("Populating table")
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in range(4)])
for s in servers:
host_id = await manager.get_host_id(s.server_id)
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
for k in range(4):
await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS(test.test) WHERE pk={k}", host=host[0])
# Reproducer for https://github.com/scylladb/scylladb/issues/18110
# Check that an existing cached read, will be cleaned up when the tablet it reads
# from is migrated away.