replica: Fix use-after-free with concurrent schema change and sstable set update

When schema is changed, sstable set is updated according to the compaction
strategy of the new schema (no changes to set are actually made, just
the underlying set type is updated), but the problem is that it happens
without a lock, causing a use-after-free when running concurrently to
another set update.

Example:

1) A: sstable set is being updated on compaction completion
2) B: schema change updates the set (it's non deferring, so it
happens in one go) and frees the set used by A.
3) when A resumes, system will likely crash since the set is freed
already.

ASAN screams about it:
SUMMARY: AddressSanitizer: heap-use-after-free sstables/sstable_set.cc ...

Fix is about deferring update of the set on schema change to compaction,
which is triggered after new schema is set. Only strategy state and
backlog tracker are updated immediately, which is fine since strategy
doesn't depend on any particular implementation of sstable set, since
patch "sstables: Implement sstable_set_impl::all_sstable_runs()".

Fixes #22040.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 434c2c4649)
This commit is contained in:
Raphael S. Carvalho
2025-04-09 15:06:23 -03:00
committed by GitHub Action
parent ddf9d047db
commit 82ca17e70d
2 changed files with 49 additions and 10 deletions

View File

@@ -1892,6 +1892,8 @@ table::sstable_list_builder::build_new_list(const sstables::sstable_set& current
const std::vector<sstables::shared_sstable>& old_sstables) {
std::unordered_set<sstables::shared_sstable> s(old_sstables.begin(), old_sstables.end());
co_await utils::get_local_injector().inject("sstable_list_builder_delay", std::chrono::milliseconds(100));
// add sstables from the current list into the new list except the ones that are in the old list
std::vector<sstables::shared_sstable> removed_sstables;
co_await current_sstables.for_each_sstable_gently([&s, &removed_sstables, &new_sstable_list] (const sstables::shared_sstable& tab) {
@@ -2178,14 +2180,13 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
tlogger.debug("Setting compaction strategy of {}.{} to {}", _schema->ks_name(), _schema->cf_name(), sstables::compaction_strategy::name(strategy));
auto new_cs = make_compaction_strategy(strategy, _schema->compaction_strategy_options());
struct compaction_group_sstable_set_updater {
struct compaction_group_strategy_updater {
table& t;
compaction_group& cg;
compaction_backlog_tracker new_bt;
compaction::compaction_strategy_state new_cs_state;
lw_shared_ptr<sstables::sstable_set> new_sstables;
compaction_group_sstable_set_updater(table& t, compaction_group& cg, sstables::compaction_strategy& new_cs)
compaction_group_strategy_updater(table& t, compaction_group& cg, sstables::compaction_strategy& new_cs)
: t(t)
, cg(cg)
, new_bt(new_cs.make_backlog_tracker())
@@ -2196,26 +2197,26 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
auto move_read_charges = new_cs.type() == t._compaction_strategy.type();
cg.get_backlog_tracker().copy_ongoing_charges(new_bt, move_read_charges);
new_sstables = make_lw_shared<sstables::sstable_set>(new_cs.make_sstable_set(cg.as_table_state()));
std::vector<sstables::shared_sstable> new_sstables_for_backlog_tracker;
new_sstables_for_backlog_tracker.reserve(cg.main_sstables()->size());
cg.main_sstables()->for_each_sstable([this, &new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
new_sstables->insert(s);
cg.main_sstables()->for_each_sstable([&new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
new_sstables_for_backlog_tracker.push_back(s);
});
new_bt.replace_sstables({}, std::move(new_sstables_for_backlog_tracker));
}
void execute() noexcept {
// Update strategy state and backlog tracker according to new strategy. SSTable set update
// is delayed until new compaction, which is triggered on strategy change. SSTable set
// cannot be updated here since it must happen under the set update lock.
t._compaction_manager.register_backlog_tracker(cg.as_table_state(), std::move(new_bt));
cg.set_main_sstables(std::move(new_sstables));
cg.set_compaction_strategy_state(std::move(new_cs_state));
}
};
std::vector<compaction_group_sstable_set_updater> cg_sstable_set_updaters;
std::vector<compaction_group_strategy_updater> cg_sstable_set_updaters;
for_each_compaction_group([&] (compaction_group& cg) {
compaction_group_sstable_set_updater updater(*this, cg, new_cs);
compaction_group_strategy_updater updater(*this, cg, new_cs);
updater.prepare(new_cs);
cg_sstable_set_updaters.push_back(std::move(updater));
});
@@ -2224,7 +2225,6 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
for (auto& updater : cg_sstable_set_updaters) {
updater.execute();
}
refresh_compound_sstable_set();
}
size_t table::sstables_count() const {

View File

@@ -12,6 +12,7 @@ from test.pylib.util import wait_for_cql_and_get_hosts, unique_name
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, TabletReplicas
from test.cluster.conftest import skip_mode
from test.cluster.util import reconnect_driver, create_new_test_keyspace, new_test_keyspace
from test.cqlpy.cassandra_tests.validation.entities.secondary_index_test import dotestCreateAndDropIndex
import pytest
import asyncio
@@ -1913,3 +1914,41 @@ async def test_truncate_during_topology_change(manager: ManagerClient):
rows = await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test")
assert rows[0].count == 0, "Table should be empty after truncation"
# Reproducer for https://github.com/scylladb/scylladb/issues/22040.
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_concurrent_schema_change_with_compaction_completion(manager: ManagerClient):
cmdline = ['--smp=2']
servers = [await manager.server_add(cmdline=cmdline)]
await manager.api.enable_injection(servers[0].ip_addr, "sstable_list_builder_delay", one_shot=False)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
table = f"{ks}.test"
await cql.run_async(f"CREATE TABLE {table} (a int PRIMARY KEY, b int);")
stop_compaction = False
async def background_compaction():
while stop_compaction == False:
await manager.api.keyspace_compaction(servers[0].ip_addr, ks)
compaction_task = asyncio.create_task(background_compaction())
for i in range(5):
dotestCreateAndDropIndex(cql, table, "CamelCase", False)
dotestCreateAndDropIndex(cql, table, "CamelCase2", True)
stop_compaction = True
await compaction_task
async def force_minor_compaction():
for i in range(4):
cql.run_async(f"INSERT INTO {ks}.test (a, b) VALUES (1, 1);")
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await cql.run_async(f"ALTER TABLE {table} WITH compaction = {{ 'class' : 'TimeWindowCompactionStrategy' }};")
await force_minor_compaction()
await cql.run_async(f"ALTER TABLE {table} WITH compaction = {{ 'class' : 'IncrementalCompactionStrategy' }};")
await force_minor_compaction()