From 6f04de3efd1e15d71ad2af06ecf951ccc469b68d Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 18 Dec 2024 08:12:17 +0800 Subject: [PATCH] streaming: Fail stream plan on stream_mutation_fragments handler in case of error The following is observed in pytest: 1) node1, stream master, tried to pull data from node3 2) node3, stream follower, found node1 restarted 3) node3 killed the rpc stream 4) node1 did not get the stream session failure message from node3. This failure message was supposed to kill the stream plan on node1. That's the reason node1 failed the stream session much later at "2024-08-19 21:07:45,539". Note, node3 failed the stream on its side, so it should have sent the stream session failure message. ``` $ cat node1.log |grep f890bea0-5e68-11ef-99ae-e5bca04385fc INFO 2024-08-19 20:24:01,162 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Executing streaming plan for Tablet migration-ks-index-0 with peers={127.0.34.3}, master ERROR 2024-08-19 20:24:01,190 [shard 1:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Failed to handle STREAM_MUTATION_FRAGMENTS (receive and distribute phase) for ks=ks, cf=cf, peer=127.0.34.3: seastar::nested_exception: seastar::rpc::stream_closed (rpc stream was closed by peer) (while cleaning up after seastar::rpc::stream_closed (rpc stream was closed by peer)) WARN 2024-08-19 21:07:45,539 [shard 0:main] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Streaming plan for Tablet migration-ks-index-0 failed, peers={127.0.34.3}, tx=0 KiB, 0.00 KiB/s, rx=484 KiB, 0.18 KiB/s $ cat node3.log |grep f890bea0-5e68-11ef-99ae-e5bca04385fc INFO 2024-08-19 20:24:01,163 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Executing streaming plan for Tablet migration-ks-index-0 with peers=127.0.34.1, slave INFO 2024-08-19 20:24:01,164 [shard 1:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Start sending ks=ks, cf=cf, estimated_partitions=2560, with new rpc streaming WARN 2024-08-19 20:24:01,187 [shard 0: gms] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Streaming plan for Tablet migration-ks-index-0 failed, peers={127.0.34.1}, tx=633 KiB, 26506.81 KiB/s, rx=0 KiB, 0.00 KiB/s WARN 2024-08-19 20:24:01,188 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] stream_transfer_task: Fail to send to 127.0.34.1:0: seastar::rpc::stream_closed (rpc stream was closed by peer) WARN 2024-08-19 20:24:01,189 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Failed to send: seastar::rpc::stream_closed (rpc stream was closed by peer) WARN 2024-08-19 20:24:01,189 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Streaming error occurred, peer=127.0.34.1 ``` To be safe in case the stream fail message is not received, node1 could fail the stream plan as soon as the rpc stream is aborted in the stream_mutation_fragments handler. Fixes #20227 Closes scylladb/scylladb#21960 --- streaming/stream_manager.cc | 15 ++++ streaming/stream_manager.hh | 2 + streaming/stream_plan.cc | 1 + streaming/stream_session.cc | 23 +++++- test/pylib/rest_client.py | 4 +- test/topology_custom/test_tablets2.py | 102 ++++++++++++++++++++++++++ 6 files changed, 143 insertions(+), 4 deletions(-) diff --git a/streaming/stream_manager.cc b/streaming/stream_manager.cc index 23730a7947..617b7e03e0 100644 --- a/streaming/stream_manager.cc +++ b/streaming/stream_manager.cc @@ -16,6 +16,7 @@ #include "streaming/stream_session_state.hh" #include #include +#include #include "db/config.hh" namespace streaming { @@ -312,6 +313,20 @@ bool stream_manager::has_peer(inet_address endpoint) const { return false; } +future<> stream_manager::fail_stream_plan(streaming::plan_id plan_id) { + return container().invoke_on_all([plan_id] (auto& sm) -> future<> { + for (auto sr : sm.get_all_streams()) { + for (auto session : sr->get_coordinator()->get_all_stream_sessions()) { + co_await seastar::coroutine::maybe_yield(); + if (session->plan_id() == plan_id) { + sslog.info("stream_manager: Failed stream_session for stream_plan plan_id={}", plan_id); + session->close_session(stream_session_state::FAILED); + } + } + } + }); +} + void stream_manager::fail_sessions(inet_address endpoint) { for (auto sr : get_all_streams()) { for (auto session : sr->get_coordinator()->get_all_stream_sessions()) { diff --git a/streaming/stream_manager.hh b/streaming/stream_manager.hh index 58034fd6e8..c333254bac 100644 --- a/streaming/stream_manager.hh +++ b/streaming/stream_manager.hh @@ -194,6 +194,8 @@ public: uint32_t throughput_mbs() const noexcept { return _io_throughput_mbs.get(); } + + future<> fail_stream_plan(streaming::plan_id plan_id); }; } // namespace streaming diff --git a/streaming/stream_plan.cc b/streaming/stream_plan.cc index f197962c2c..b179458f2c 100644 --- a/streaming/stream_plan.cc +++ b/streaming/stream_plan.cc @@ -11,6 +11,7 @@ #include "streaming/stream_plan.hh" #include "streaming/stream_result_future.hh" #include "streaming/stream_state.hh" +#include namespace streaming { diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 16d0128edd..b7cd2a91ab 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -185,6 +185,10 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { } sslog.info("stream_mutation_fragments: released"); }).then([mf = std::move(mf)] () mutable { + if (utils::get_local_injector().is_enabled("stream_mutation_fragments_rx_error")) { + sslog.info("stream_mutation_fragments_rx_error: throw"); + throw std::runtime_error("stream_mutation_fragments_rx_error"); + } return mutation_fragment_opt(std::move(mf)); }); } else { @@ -214,7 +218,9 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)), make_streaming_consumer(estimated_partitions, reason, topo_guard), std::move(op) - ).then_wrapped([s, plan_id, from, sink, estimated_partitions, log_done, sh_ptr = std::move(sharder_ptr)] (future f) mutable { + ).then_wrapped([this, s, plan_id_ = plan_id, from, sink_ = sink, estimated_partitions, log_done, sh_ptr = std::move(sharder_ptr)] (future f) mutable -> future<> { + auto sink = sink_; + auto plan_id = plan_id_; int32_t status = 0; uint64_t received_partitions = 0; if (f.failed()) { @@ -238,7 +244,16 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { sslog.info("[Stream #{}] Write to sstable for ks={}, cf={}, estimated_partitions={}, received_partitions={}", plan_id, s->ks_name(), s->cf_name(), estimated_partitions, received_partitions); } - return sink(status).finally([sink] () mutable { + if (status == -1) { + try { + if (!utils::get_local_injector().is_enabled("stream_mutation_fragments_skip_fail_stream_plan")) { + co_await fail_stream_plan(plan_id); + } + } catch (...) { + sslog.warn("[Stream #{}] Failed to abort the stream plan: {}", plan_id, std::current_exception()); + } + } + co_await sink(status).finally([sink] () mutable { return sink.close(); }); }).handle_exception([s, plan_id, from, sink] (std::exception_ptr ep) { @@ -348,6 +363,10 @@ future<> stream_session::on_initialization_complete() { } void stream_session::received_failed_complete_message() { + if (utils::get_local_injector().is_enabled("stream_session_ignore_failed_message")) { + sslog.info("[Stream #{}] Ignored failed complete message, peer={}", plan_id(), peer); + return; + } sslog.info("[Stream #{}] Received failed complete message, peer={}", plan_id(), peer); _received_failed_complete_message = true; close_session(stream_session_state::FAILED); diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index ba4db0a47a..8fe314de5d 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -241,8 +241,8 @@ class ScyllaRESTAPIClient(): """ return await self.client.get_json(f"/v2/error_injection/injection/{injection}", host=node_ip) - async def move_tablet(self, node_ip: str, ks: str, table: str, src_host: HostID, src_shard: int, dst_host: HostID, dst_shard: int, token: int) -> None: - await self.client.post(f"/storage_service/tablets/move", host=node_ip, params={ + async def move_tablet(self, node_ip: str, ks: str, table: str, src_host: HostID, src_shard: int, dst_host: HostID, dst_shard: int, token: int, timeout: Optional[float] = None) -> None: + await self.client.post(f"/storage_service/tablets/move", host=node_ip, timeout=timeout, params={ "ks": ks, "table": table, "src_host": str(src_host), diff --git a/test/topology_custom/test_tablets2.py b/test/topology_custom/test_tablets2.py index fcfbbc2070..178f201d1c 100644 --- a/test/topology_custom/test_tablets2.py +++ b/test/topology_custom/test_tablets2.py @@ -243,6 +243,108 @@ async def test_topology_changes(manager: ManagerClient): await cql.run_async("DROP KEYSPACE test;") +async def get_two_servers_to_move_tablet(manager: ManagerClient): + """ + The first server in servers list is source node to move the tablet from. The second server is the dest node. + """ + logger.info("Bootstrapping cluster") + cmdline = ['--enable-file-stream', 'false'] + servers = [await manager.server_add(cmdline=cmdline)] + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + + servers.append(await manager.server_add(cmdline=cmdline)) + + key = 7 # Whatever + tablet_token = 0 # Doesn't matter since there is one tablet + await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({key}, 0)") + rows = await cql.run_async("SELECT pk from test.test") + assert len(list(rows)) == 1 + + replica = await get_tablet_replica(manager, servers[0], 'test', 'test', tablet_token) + logger.info(f'{replica=}') + + s0_host_id = await manager.get_host_id(servers[0].server_id) + s1_host_id = await manager.get_host_id(servers[1].server_id) + + # Make sure servers[0] is the source node to move the tablet from + if replica[0] != s0_host_id: + servers.reverse() + s0_host_id, s1_host_id = s1_host_id, s0_host_id + + dst_shard = 0 + + return (servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard) + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_streaming_rx_error_no_failed_message_with_fail_stream_plan(manager: ManagerClient): + servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard = await get_two_servers_to_move_tablet(manager) + + await manager.api.enable_injection(servers[0].ip_addr, "stream_session_ignore_failed_message", one_shot=True) + await manager.api.enable_injection(servers[1].ip_addr, "stream_session_ignore_failed_message", one_shot=True) + await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error", one_shot=True) + + s1_log = await manager.server_open_log(servers[1].server_id) + s1_mark = await s1_log.mark() + + migration_task = asyncio.create_task( + manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token, timeout=30)) + + await s1_log.wait_for('stream_manager: Failed stream_session for stream_plan', from_mark=s1_mark) + s1_mark = await s1_log.mark() + + await manager.api.disable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error") + + logger.info("Waiting for migration to finish") + await migration_task + logger.info("Migration done") + + # Sanity test + rows = await cql.run_async("SELECT pk from test.test") + assert len(list(rows)) == 1 + + await cql.run_async("TRUNCATE test.test") + rows = await cql.run_async("SELECT pk from test.test") + assert len(list(rows)) == 0 + + # Verify that there is no data resurrection + rows = await cql.run_async("SELECT pk from test.test") + assert len(list(rows)) == 0 + + # Verify that moving the tablet back works + await manager.api.move_tablet(servers[0].ip_addr, "test", "test", s1_host_id, dst_shard, replica[0], replica[1], tablet_token) + rows = await cql.run_async("SELECT pk from test.test") + assert len(list(rows)) == 0 + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_streaming_rx_error_no_failed_message_no_fail_stream_plan_hang(manager: ManagerClient): + servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard = await get_two_servers_to_move_tablet(manager) + + await manager.api.enable_injection(servers[0].ip_addr, "stream_session_ignore_failed_message", one_shot=True) + await manager.api.enable_injection(servers[1].ip_addr, "stream_session_ignore_failed_message", one_shot=True) + await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error", one_shot=True) + + await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments_skip_fail_stream_plan", one_shot=True) + await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_skip_fail_stream_plan", one_shot=True) + + s1_log = await manager.server_open_log(servers[1].server_id) + s1_mark = await s1_log.mark() + + migration_task = asyncio.create_task( + manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token, timeout=10)) + + try: + logger.info("Waiting for migration to finish") + await migration_task + assert False # The move tablet is not supposed to finish + except TimeoutError: + logger.info("Migration timeout as expected") @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode')