service: strong_consistency: Abort state_machine::apply when aborting server

The state machine used by strongly consistent tablets may block on a
read barrier if the local schema is insufficient to resolve pending
mutations [1]. To deal with that, we perform a read barrier that may
block for a long time.

When a strongly consistent tablet is being removed, we'd like to cancel
all ongoing executions of `state_machine::apply`: the shard is no
longer responsible for the tablet, so it doesn't matter what the outcome
is.

---

In the implementation, we abort the operations by simply throwing
an exception from `state_machine::apply` and not doing anything.
That's a red flag considering that it may lead to the instance
being killed on the spot [2].

Fortunately for us, strongly consistent tables use the default Raft
server implementation, i.e. `raft::server_impl`, which actually
handles one type of an exception thrown by the method: namely,
`abort_requested_exception`, which is the default exception thrown
by `seastar::abort_source` [3]. We leverage this property.

---

Unfortunately, `raft::server_impl::abort` isn't perfectly suited for
us. If we look into its code, we'll see that the relevant portion of
the procedure boils down to three steps:

1. Prevent scheduling adding new entries.
2. Wait for the applier fiber.
3. Abort the state machine.

Since aborting the state machine happens only after the applier fiber
has already finished, there will no longer be anything to abort. Either
all executions of `state_machine::apply` have already finished, or they
are hanging and we cannot do anything.

That's a pre-existing problem that we won't be solving here (even
though it's possible). We hope the problem will be solved, and it seems
likely: the code suggests that the behavior is not intended. For more
details, see e.g. [4].

---

We provide two validation tests. They simulate the abortion of
`state_machine::apply` in two different scenarios:

* when the table is dropped (which should also cover the case of tablet
  migration),
* when the node is shutting down.

The value of the tests isn't high since they don't ensure that the
state of the group is still valid (though it should be), nor do they
perform any other check. Instead, we rely on the testing framework to
spot any anomalies or errors. That's probably the best we can do at
the moment.

Unfortunately, both tests are marked as skipped becuause of the current
limitations of `raft::server_impl::abort` described above and in [4].

References:
[1] 4c8dba1
[2] See the description of `raft::state_machine` in `raft/raft.hh`.
[3] See `server_impl::applier_fiber` in `raft/server.cc`.
[4] SCYLLADB-1056
This commit is contained in:
Dawid Mędrek
2026-03-12 22:39:18 +01:00
parent ad8a263683
commit f0dfe29d88
2 changed files with 177 additions and 3 deletions

View File

@@ -6,6 +6,7 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/abort_source.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/on_internal_error.hh>
#include "state_machine.hh"
@@ -34,6 +35,8 @@ class state_machine : public raft_state_machine {
service::migration_manager& _mm;
db::system_keyspace& _sys_ks;
abort_source _as;
public:
state_machine(locator::global_tablet_id tablet,
raft::group_id gid,
@@ -70,6 +73,28 @@ public:
// (see `schema_applier::commit_on_shard()` and `storage_service::commit_token_metadata_change()`).
// In this case, we should just ignore mutations without throwing an error.
logger.log(log_level::warn, rate_limit, "apply(): table {} was already dropped, ignoring mutations", _tablet.table);
} catch (const abort_requested_exception& ex) {
// The exception can be thrown by get_schema_and_upgrade_mutations.
// It means that the Raft group is being removed.
//
// Technically, throwing an exception from a state machine
// may result in killing the corresponding Raft instance:
// cf. the description of raft::state_machine:
//
// "Any of the functions may return an error, but it will kill the
// raft instance that uses it. Depending on what state the failure
// leaves the state is the raft instance will either have to be recreated
// with the same state machine and rejoined the cluster with the same server_id
// or it new raft instance will have to be created with empty state machine and
// it will have to rejoin to the cluster with different server_id through
// configuration change."
//
// Fortunately, in strong consistency, we use the default Raft server
// implementation, which handles abort_requested_exception thrown by
// raft::state_machine::apply -- it will simply end the applier fiber.
logger.debug("apply(): execution for tablet {}, group_id={} aborted due to: {}",
_tablet, _group_id, ex);
throw;
}
catch (...) {
throw std::runtime_error(::format(
@@ -91,6 +116,8 @@ public:
}
future<> abort() override {
logger.debug("abort(): Aborting state machine for group {}", _group_id);
_as.request_abort();
return make_ready_future<>();
}
@@ -109,6 +136,10 @@ private:
bool barrier_executed = false;
auto get_schema = [&] (table_schema_version schema_version) -> future<std::pair<schema_ptr, column_mappings_cache::value_ptr>> {
if (utils::get_local_injector().enter("sc_state_machine_return_empty_schema")) {
co_return std::pair{nullptr, nullptr};
}
auto schema = local_schema_registry().get_or_null(schema_version);
if (schema) {
co_return std::pair{std::move(schema), nullptr};
@@ -147,8 +178,7 @@ private:
if (utils::get_local_injector().enter("disable_raft_drop_append_entries_for_specified_group")) {
utils::get_local_injector().disable("raft_drop_incoming_append_entries_for_specified_group");
}
// TODO: pass valid abort source
co_await _mm.get_group0_barrier().trigger();
co_await _mm.get_group0_barrier().trigger(false, &_as);
barrier_executed = true;
schema_cm = co_await get_schema(schema_version);
}
@@ -189,4 +219,4 @@ std::unique_ptr<raft_state_machine> make_state_machine(locator::global_tablet_id
return std::make_unique<state_machine>(tablet, gid, db, mm, sys_ks);
}
};
};

View File

@@ -1155,3 +1155,147 @@ async def test_abort_forwarded_write_upon_shutdown(manager: ManagerClient):
pass
await stop_fut
@pytest.mark.skip(reason="SCYLLADB-1056")
@pytest.mark.skip_mode(mode="release", reason="error injections are not supported in release mode")
async def test_abort_state_machine_apply_after_dropping_table(manager: ManagerClient):
"""
This test verifies that ongoing executions of state_machine::apply are
aborted when their corresponding Raft group is being removed. We test
that by dropping the table, but it should also correspond to other cases
like tablet migration.
For a similar scenario during a node shutdown, see test_abort_state_machine_apply_during_shutdown.
"""
cmdline = DEFAULT_CMDLINE + ["--logger-log-level", "sc_state_machine=debug:raft=debug"]
leader_server = await manager.server_add(config=DEFAULT_CONFIG, cmdline=cmdline,
property_file={"dc": "dc1", "rack": "rack1"})
# We want to prevent target_server from becoming the leader of either group0
# or the strongly consistent Raft group so it might not have the latest
# schema version and is forced to perform a read barrier.
config = DEFAULT_CONFIG | {"error_injections_at_startup": ["avoid_being_raft_leader"]}
target_server = await manager.server_add(config=config, cmdline=cmdline, property_file={"dc": "dc1", "rack": "rack2"})
cql, [leader_host, _] = await manager.get_ready_cql([leader_server, target_server])
leader_host_id, target_host_id = await gather_safely(*[
manager.get_host_id(leader_server.server_id),
manager.get_host_id(target_server.server_id)
])
wait_before_apply_injection = "strong_consistency_state_machine_wait_before_apply"
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1} AND consistency = 'global'") as ks:
table_name = "my_table"
table = f"{ks}.{table_name}"
await cql.run_async(f"CREATE TABLE {table} (pk int PRIMARY KEY, v int)")
group_id = await get_table_raft_group_id(manager, ks, table_name)
leader_host_id = await wait_for_leader(manager, leader_server, group_id)
assert leader_host_id != target_host_id
await gather_safely(*[
manager.api.enable_injection(target_server.ip_addr, wait_before_apply_injection, one_shot=True),
manager.api.enable_injection(target_server.ip_addr, "sc_state_machine_return_empty_schema", one_shot=True)
])
log = await manager.server_open_log(target_server.server_id)
mark = await log.mark()
# We won't wait for the follower to apply the state, so we can await this right away.
await cql.run_async(f"INSERT INTO {table} (pk, v) VALUES (0, 13)", host=leader_host)
await log.wait_for(wait_before_apply_injection, from_mark=mark)
mark = await log.mark()
await cql.run_async(f"DROP TABLE {table}")
# Wait until the Raft group has started being removed.
await log.wait_for(rf"schedule_raft_group_deletion\(\): starting aborting raft server for group id {group_id}", from_mark=mark)
mark = await log.mark()
# At this point, the Raft server should already be getting aborted,
# so we can resume state_machine::apply.
await manager.api.message_injection(target_server.ip_addr, wait_before_apply_injection)
# Verify that state_machine::apply was really aborted.
await log.wait_for(rf"apply\(\): execution for tablet \S+, group_id={group_id} aborted", from_mark=mark)
@pytest.mark.asyncio
@pytest.mark.skip(reason="SCYLLADB-1056")
@pytest.mark.skip_mode(mode="release", reason="error injections are not supported in release mode")
async def test_abort_state_machine_apply_during_shutdown(manager: ManagerClient):
"""
This test verifies that ongoing executions of state_machine::apply are
aborted when a node is shutting down.
For a similar scenario after dropping a table, see test_abort_state_machine_apply_after_dropping_table.
"""
cmdline = DEFAULT_CMDLINE + ["--logger-log-level", "sc_state_machine=debug:raft=debug"]
leader_server = await manager.server_add(config=DEFAULT_CONFIG, cmdline=cmdline,
property_file={"dc": "dc1", "rack": "rack1"})
# We want to prevent target_server from becoming the leader of either group0
# or the strongly consistent Raft group so it might not have the latest
# schema version and is forced to perform a read barrier.
config = DEFAULT_CONFIG | {"error_injections_at_startup": ["avoid_being_raft_leader"]}
target_server = await manager.server_add(config=config, cmdline=cmdline, property_file={"dc": "dc1", "rack": "rack2"})
cql, [leader_host, target_host] = await manager.get_ready_cql([leader_server, target_server])
leader_host_id, target_host_id = await gather_safely(*[
manager.get_host_id(leader_server.server_id),
manager.get_host_id(target_server.server_id)
])
wait_before_apply_injection = "strong_consistency_state_machine_wait_before_apply"
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1} AND consistency = 'global'") as ks:
table_name = "my_table"
table = f"{ks}.{table_name}"
await cql.run_async(f"CREATE TABLE {table} (pk int PRIMARY KEY, v int)")
group_id = await get_table_raft_group_id(manager, ks, table_name)
leader_host_id = await wait_for_leader(manager, leader_server, group_id)
assert leader_host_id != target_host_id
await gather_safely(*[
manager.api.enable_injection(target_server.ip_addr, wait_before_apply_injection, one_shot=True),
manager.api.enable_injection(target_server.ip_addr, "sc_state_machine_return_empty_schema", one_shot=True)
])
log = await manager.server_open_log(target_server.server_id)
mark = await log.mark()
# We won't wait for the follower to apply the state, so we can await this right away.
await cql.run_async(f"INSERT INTO {table} (pk, v) VALUES (0, 13)", host=leader_host)
await log.wait_for(wait_before_apply_injection, from_mark=mark)
mark = await log.mark()
stop_task = asyncio.create_task(manager.server_stop_gracefully(target_server.server_id))
# Wait until the Raft group has started being removed.
await log.wait_for(rf"schedule_raft_group_deletion\(\): starting aborting raft server for group id {group_id}", from_mark=mark)
mark = await log.mark()
# At this point, the Raft server should already be getting aborted,
# so we can resume state_machine::apply.
await manager.api.message_injection(target_server.ip_addr, wait_before_apply_injection)
# Verify that state_machine::apply was really aborted.
await log.wait_for(rf"apply\(\): execution for tablet \S+, group_id={group_id} aborted", from_mark=mark)
# The test framework should verify that we haven't observed any errors
# during the stopping procedure.
await stop_task
await manager.server_start(target_server.server_id)
cql, [target_host] = await manager.get_ready_cql([target_server])
rows = await cql.run_async(f"SELECT * FROM {table} WHERE pk = 0", host=target_host)
assert len(rows) == 1
assert rows[0].v == 13