streaming: Fail stream plan on stream_mutation_fragments handler in case of error

The following is observed in pytest:

1) node1, stream master, tried to pull data from node3

2) node3, stream follower, found node1 restarted

3) node3 killed the rpc stream

4) node1 did not get the stream session failure message from node3. This
failure message was supposed to kill the stream plan on node1. That's the
reason node1 failed the stream session much later at "2024-08-19 21:07:45,539".
Note, node3 failed the stream on its side, so it should have sent the stream
session failure message.

```
$ cat node1.log |grep f890bea0-5e68-11ef-99ae-e5bca04385fc
INFO  2024-08-19 20:24:01,162 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Executing streaming plan for Tablet migration-ks-index-0 with peers={127.0.34.3}, master
ERROR 2024-08-19 20:24:01,190 [shard 1:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Failed to handle STREAM_MUTATION_FRAGMENTS (receive and distribute phase) for ks=ks, cf=cf, peer=127.0.34.3: seastar::nested_exception: seastar::rpc::stream_closed (rpc stream was closed by peer) (while cleaning up after seastar::rpc::stream_closed (rpc stream was closed by peer))
WARN  2024-08-19 21:07:45,539 [shard 0:main] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Streaming plan for Tablet migration-ks-index-0 failed, peers={127.0.34.3}, tx=0 KiB, 0.00 KiB/s, rx=484 KiB, 0.18 KiB/s

$ cat node3.log |grep f890bea0-5e68-11ef-99ae-e5bca04385fc
INFO  2024-08-19 20:24:01,163 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Executing streaming plan for Tablet migration-ks-index-0 with peers=127.0.34.1, slave
INFO  2024-08-19 20:24:01,164 [shard 1:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Start sending ks=ks, cf=cf, estimated_partitions=2560, with new rpc streaming
WARN  2024-08-19 20:24:01,187 [shard 0: gms] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Streaming plan for Tablet migration-ks-index-0 failed, peers={127.0.34.1}, tx=633 KiB, 26506.81 KiB/s, rx=0 KiB, 0.00 KiB/s
WARN  2024-08-19 20:24:01,188 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] stream_transfer_task: Fail to send to 127.0.34.1:0: seastar::rpc::stream_closed (rpc stream was closed by peer)
WARN  2024-08-19 20:24:01,189 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Failed to send: seastar::rpc::stream_closed (rpc stream was closed by peer)
WARN  2024-08-19 20:24:01,189 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Streaming error occurred, peer=127.0.34.1
```

To be safe in case the stream fail message is not received, node1 could fail
the stream plan as soon as the rpc stream is aborted in the
stream_mutation_fragments handler.

Fixes #20227

Closes scylladb/scylladb#21960
This commit is contained in:
Asias He
2024-12-18 08:12:17 +08:00
committed by Tomasz Grabiec
parent cf72c31617
commit 6f04de3efd
6 changed files with 143 additions and 4 deletions

View File

@@ -16,6 +16,7 @@
#include "streaming/stream_session_state.hh"
#include <seastar/core/metrics.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include "db/config.hh"
namespace streaming {
@@ -312,6 +313,20 @@ bool stream_manager::has_peer(inet_address endpoint) const {
return false;
}
future<> stream_manager::fail_stream_plan(streaming::plan_id plan_id) {
return container().invoke_on_all([plan_id] (auto& sm) -> future<> {
for (auto sr : sm.get_all_streams()) {
for (auto session : sr->get_coordinator()->get_all_stream_sessions()) {
co_await seastar::coroutine::maybe_yield();
if (session->plan_id() == plan_id) {
sslog.info("stream_manager: Failed stream_session for stream_plan plan_id={}", plan_id);
session->close_session(stream_session_state::FAILED);
}
}
}
});
}
void stream_manager::fail_sessions(inet_address endpoint) {
for (auto sr : get_all_streams()) {
for (auto session : sr->get_coordinator()->get_all_stream_sessions()) {

View File

@@ -194,6 +194,8 @@ public:
uint32_t throughput_mbs() const noexcept {
return _io_throughput_mbs.get();
}
future<> fail_stream_plan(streaming::plan_id plan_id);
};
} // namespace streaming

View File

@@ -11,6 +11,7 @@
#include "streaming/stream_plan.hh"
#include "streaming/stream_result_future.hh"
#include "streaming/stream_state.hh"
#include <seastar/coroutine/all.hh>
namespace streaming {

View File

@@ -185,6 +185,10 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
}
sslog.info("stream_mutation_fragments: released");
}).then([mf = std::move(mf)] () mutable {
if (utils::get_local_injector().is_enabled("stream_mutation_fragments_rx_error")) {
sslog.info("stream_mutation_fragments_rx_error: throw");
throw std::runtime_error("stream_mutation_fragments_rx_error");
}
return mutation_fragment_opt(std::move(mf));
});
} else {
@@ -214,7 +218,9 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)),
make_streaming_consumer(estimated_partitions, reason, topo_guard),
std::move(op)
).then_wrapped([s, plan_id, from, sink, estimated_partitions, log_done, sh_ptr = std::move(sharder_ptr)] (future<uint64_t> f) mutable {
).then_wrapped([this, s, plan_id_ = plan_id, from, sink_ = sink, estimated_partitions, log_done, sh_ptr = std::move(sharder_ptr)] (future<uint64_t> f) mutable -> future<> {
auto sink = sink_;
auto plan_id = plan_id_;
int32_t status = 0;
uint64_t received_partitions = 0;
if (f.failed()) {
@@ -238,7 +244,16 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
sslog.info("[Stream #{}] Write to sstable for ks={}, cf={}, estimated_partitions={}, received_partitions={}",
plan_id, s->ks_name(), s->cf_name(), estimated_partitions, received_partitions);
}
return sink(status).finally([sink] () mutable {
if (status == -1) {
try {
if (!utils::get_local_injector().is_enabled("stream_mutation_fragments_skip_fail_stream_plan")) {
co_await fail_stream_plan(plan_id);
}
} catch (...) {
sslog.warn("[Stream #{}] Failed to abort the stream plan: {}", plan_id, std::current_exception());
}
}
co_await sink(status).finally([sink] () mutable {
return sink.close();
});
}).handle_exception([s, plan_id, from, sink] (std::exception_ptr ep) {
@@ -348,6 +363,10 @@ future<> stream_session::on_initialization_complete() {
}
void stream_session::received_failed_complete_message() {
if (utils::get_local_injector().is_enabled("stream_session_ignore_failed_message")) {
sslog.info("[Stream #{}] Ignored failed complete message, peer={}", plan_id(), peer);
return;
}
sslog.info("[Stream #{}] Received failed complete message, peer={}", plan_id(), peer);
_received_failed_complete_message = true;
close_session(stream_session_state::FAILED);

View File

@@ -241,8 +241,8 @@ class ScyllaRESTAPIClient():
"""
return await self.client.get_json(f"/v2/error_injection/injection/{injection}", host=node_ip)
async def move_tablet(self, node_ip: str, ks: str, table: str, src_host: HostID, src_shard: int, dst_host: HostID, dst_shard: int, token: int) -> None:
await self.client.post(f"/storage_service/tablets/move", host=node_ip, params={
async def move_tablet(self, node_ip: str, ks: str, table: str, src_host: HostID, src_shard: int, dst_host: HostID, dst_shard: int, token: int, timeout: Optional[float] = None) -> None:
await self.client.post(f"/storage_service/tablets/move", host=node_ip, timeout=timeout, params={
"ks": ks,
"table": table,
"src_host": str(src_host),

View File

@@ -243,6 +243,108 @@ async def test_topology_changes(manager: ManagerClient):
await cql.run_async("DROP KEYSPACE test;")
async def get_two_servers_to_move_tablet(manager: ManagerClient):
"""
The first server in servers list is source node to move the tablet from. The second server is the dest node.
"""
logger.info("Bootstrapping cluster")
cmdline = ['--enable-file-stream', 'false']
servers = [await manager.server_add(cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
servers.append(await manager.server_add(cmdline=cmdline))
key = 7 # Whatever
tablet_token = 0 # Doesn't matter since there is one tablet
await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({key}, 0)")
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 1
replica = await get_tablet_replica(manager, servers[0], 'test', 'test', tablet_token)
logger.info(f'{replica=}')
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
# Make sure servers[0] is the source node to move the tablet from
if replica[0] != s0_host_id:
servers.reverse()
s0_host_id, s1_host_id = s1_host_id, s0_host_id
dst_shard = 0
return (servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_streaming_rx_error_no_failed_message_with_fail_stream_plan(manager: ManagerClient):
servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard = await get_two_servers_to_move_tablet(manager)
await manager.api.enable_injection(servers[0].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
await manager.api.enable_injection(servers[1].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error", one_shot=True)
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token, timeout=30))
await s1_log.wait_for('stream_manager: Failed stream_session for stream_plan', from_mark=s1_mark)
s1_mark = await s1_log.mark()
await manager.api.disable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error")
logger.info("Waiting for migration to finish")
await migration_task
logger.info("Migration done")
# Sanity test
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 1
await cql.run_async("TRUNCATE test.test")
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 0
# Verify that there is no data resurrection
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 0
# Verify that moving the tablet back works
await manager.api.move_tablet(servers[0].ip_addr, "test", "test", s1_host_id, dst_shard, replica[0], replica[1], tablet_token)
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 0
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_streaming_rx_error_no_failed_message_no_fail_stream_plan_hang(manager: ManagerClient):
servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard = await get_two_servers_to_move_tablet(manager)
await manager.api.enable_injection(servers[0].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
await manager.api.enable_injection(servers[1].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error", one_shot=True)
await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments_skip_fail_stream_plan", one_shot=True)
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_skip_fail_stream_plan", one_shot=True)
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token, timeout=10))
try:
logger.info("Waiting for migration to finish")
await migration_task
assert False # The move tablet is not supposed to finish
except TimeoutError:
logger.info("Migration timeout as expected")
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')