From 7150442f6ae380bd03e5be11c4e43e9417e9d607 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 12 Dec 2024 23:37:29 -0500 Subject: [PATCH] service/storage_proxy: schedule_repair(): materialize the range into a vector Said method passes down its `diff` input to `mutate_internal()`, after some std::ranges massaging. Said massaging is destructive -- it moves items from the diff. If the output range is iterated-over multiple times, only the first time will see the actual output, further iterations will get an empty range. When trace-level logging is enabled, this is exactly what happens: `mutate_internal()` iterates over the range multiple times, first to log its content, then to pass it down the stack. This ends up resulting in a range with moved-from elements being pased down and consequently write handlers being created with nullopt mutations. Make the range re-entrant by materializing it into a vector before passing it to `mutate_internal()`. Fixes: scylladb/scylladb#21907 Fixes: scylladb/scylladb#21714 Closes scylladb/scylladb#21910 --- service/storage_proxy.cc | 8 ++++++- test/topology_custom/test_read_repair.py | 29 ++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 555b0a019d..1cf867dd40 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -4401,7 +4401,13 @@ future> storage_proxy::schedule_repair(locator::effective_replication_m if (diffs.empty()) { return make_ready_future>(bo::success()); } - return mutate_internal(diffs | std::views::values | std::views::transform([ermp] (auto& v) { return read_repair_mutation{std::move(v), ermp}; }), cl, false, std::move(trace_state), std::move(permit)); + return mutate_internal( + diffs | + std::views::values | + std::views::transform([ermp] (auto& v) { return read_repair_mutation{std::move(v), ermp}; }) | + // The transform above is destructive, materialize into a vector to make the range re-iterable. + std::ranges::to>() + , cl, false, std::move(trace_state), std::move(permit)); } class abstract_read_resolver { diff --git a/test/topology_custom/test_read_repair.py b/test/topology_custom/test_read_repair.py index 2211cd7f85..c7d992075a 100644 --- a/test/topology_custom/test_read_repair.py +++ b/test/topology_custom/test_read_repair.py @@ -328,3 +328,32 @@ async def test_incremental_read_repair(data_class, workdir, manager): logger.info("Check rows with CL=ONE after read-repair") check_rows(cql, host1, all_rows) check_rows(cql, host2, all_rows) + + +@pytest.mark.asyncio +async def test_read_repair_with_trace_logging(request, manager): + logger.info("Creating a new cluster") + cmdline = ["--hinted-handoff-enabled", "0", "--logger-log-level", "mutation_data=trace"] + + for i in range(2): + await manager.server_add(cmdline=cmdline) + + cql = manager.get_cql() + srvs = await manager.running_servers() + await wait_for_cql_and_get_hosts(cql, srvs, time.time() + 60) + + await cql.run_async("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};") + await cql.run_async("CREATE TABLE ks.t (pk bigint PRIMARY KEY, c int);") + + await cql.run_async("INSERT INTO ks.t (pk, c) VALUES (0, 0)") + + await manager.server_stop(srvs[0].server_id) + prepared = cql.prepare("INSERT INTO ks.t (pk, c) VALUES (0, 1)") + prepared.consistency_level = ConsistencyLevel.ONE + await cql.run_async(prepared) + + await manager.server_start(srvs[0].server_id) + + prepared = cql.prepare("SELECT * FROM ks.t WHERE pk = 0") + prepared.consistency_level = ConsistencyLevel.ALL + await cql.run_async(prepared)