Merge 'strong_consistency: fix crash when DROP TABLE races with in-flight DML' from Petr Gusev

When DROP TABLE races with an in-flight DML on a strongly-consistent
table, the node aborts in `groups_manager::acquire_server()` because the
raft group has already been erased from `_raft_groups`.

A concurrent `DROP TABLE` may have already removed the table from database
registries and erased the raft group via `schedule_raft_group_deletion`.
The `schema.table()` in `create_operation_ctx()` might not fail though
because someone might be holding `lw_shared_ptr<table>`, so that the
table is dropped but the table object is still alive.

Fix by accepting table_id in acquire_server and checking that the table
still exists in the database via `find_column_family` before looking up
the raft group.  If the table has been dropped, find_column_family
throws no_such_column_family instead of the node aborting via
on_internal_error.  When the table does exist, acquire_server proceeds
to acquire state.gate; schedule_raft_group_deletion co_awaits
gate::close, so it will wait for the DML operation to complete before
erasing the group.

backport: not needed (not released feature)

Fixes SCYLLADB-1450

Closes scylladb/scylladb#29430

* github.com:scylladb/scylladb:
  strong_consistency: fix crash when DROP TABLE races with in-flight DML
  test: add regression test for DROP TABLE racing with in-flight DML
This commit is contained in:
Piotr Dulikowski
2026-04-21 16:54:20 +02:00
4 changed files with 73 additions and 3 deletions

View File

@@ -154,7 +154,7 @@ auto coordinator::create_operation_ctx(const schema& schema, const dht::token& t
co_await utils::get_local_injector().inject("sc_coordinator_wait_before_acquire_server",
utils::wait_for_message(5min));
auto raft_server = co_await _groups_manager.acquire_server(raft_info.group_id, as);
auto raft_server = co_await _groups_manager.acquire_server(schema.id(), raft_info.group_id, as);
co_return operation_ctx {
.erm = std::move(erm),

View File

@@ -332,11 +332,27 @@ void groups_manager::update(token_metadata_ptr new_tm) {
schedule_raft_groups_deletion(false);
}
future<raft_server> groups_manager::acquire_server(raft::group_id group_id, abort_source& as) {
future<raft_server> groups_manager::acquire_server(table_id table_id, raft::group_id group_id, abort_source& as) {
if (!_features.strongly_consistent_tables) {
on_internal_error(logger, "strongly consistent tables are not enabled on this shard");
}
// A concurrent DROP TABLE may have already removed the table from database
// registries and erased the raft group from _raft_groups via
// schedule_raft_group_deletion. The schema.table() in create_operation_ctx()
// might not fail though in this case because someone might be holding
// lw_shared_ptr<table>, so that the table is dropped but the table object
// is still alive.
//
// Check that the table still exists in the database to turn the
// fatal on_internal_error below into a clean no_such_column_family
// exception.
//
// When the table does exist, we proceed to acquire state.gate->hold().
// This prevents schedule_raft_group_deletion (which co_awaits gate::close)
// from erasing the group until the DML operation completes.
_db.find_column_family(table_id);
const auto it = _raft_groups.find(group_id);
if (it == _raft_groups.end()) {
on_internal_error(logger, format("raft group {} not found", group_id));

View File

@@ -110,7 +110,7 @@ public:
void update(locator::token_metadata_ptr new_tm);
// The raft_server instance is used to submit write commands and perform read_barrier() before reads.
future<raft_server> acquire_server(raft::group_id group_id, abort_source& as);
future<raft_server> acquire_server(table_id table_id, raft::group_id group_id, abort_source& as);
// Called during node boot. Waits for all raft::server instances corresponding
// to the latest group0 state to start.

View File

@@ -764,6 +764,60 @@ async def test_forward_cql_exception_passthrough(manager: ManagerClient):
await manager.api.message_injection(non_leader_replica_host.address, "wait_before_handling_forwarded_request")
@pytest.mark.asyncio
@pytest.mark.skip_mode("release", "error injections aren't enabled in release mode")
async def test_drop_table_during_insert(manager: ManagerClient):
"""Regression test for SCYLLADB-1450: node crashes when DROP TABLE races with
an in-flight DML on a strongly-consistent table.
An error injection pauses INSERT inside create_operation_ctx (after obtaining
the ERM but before acquire_server). While it is paused the table is dropped,
which erases the raft group. Resuming the INSERT used to hit on_internal_error
in acquire_server and abort the node."""
config = {"experimental_features": ["strongly-consistent-tables"]}
cmdline = ["--logger-log-level", "sc_groups_manager=debug"]
server = await manager.server_add(config=config, cmdline=cmdline)
(cql, hosts) = await manager.get_ready_cql([server])
async with new_test_keyspace(
manager,
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
" AND tablets = {'initial': 1} AND consistency = 'global'",
) as ks:
table = f"{ks}.tbl"
await cql.run_async(f"CREATE TABLE {table} (pk int PRIMARY KEY, v int)")
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
await manager.api.enable_injection(server.ip_addr, "sc_coordinator_wait_before_acquire_server", one_shot=False)
# Fire INSERT in the background it will pause at the injection point.
insert_fut = cql.run_async(f"INSERT INTO {table} (pk, v) VALUES (0, 0)")
# Wait until the injection is actually hit.
await log.wait_for("sc_coordinator_wait_before_acquire_server: waiting for message", from_mark=mark, timeout=30)
# Drop the table while INSERT is paused.
await cql.run_async(f"DROP TABLE {table}")
# Wait for the raft group to be destroyed.
await log.wait_for("schedule_raft_group_deletion.*raft server.*is destroyed", from_mark=mark, timeout=30)
# Resume the INSERT with the bug present, the node will abort here.
await manager.api.message_injection(server.ip_addr, "sc_coordinator_wait_before_acquire_server")
# The INSERT should fail gracefully, not crash the node.
try:
await insert_fut
except Exception as e:
logger.info(f"INSERT failed with expected error: {e}")
# Verify the node is still alive.
await manager.api.get_host_id(server.ip_addr)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode="release", reason="error injections are not supported in release mode")
async def test_timed_out_queries(manager: ManagerClient):