Merge 'Demote log level on split failure during shutdown' from Raphael Raph Carvalho
Since commit 509f2af8db, gate_closed_exception can be triggered for ongoing split during shutdown. The commit is correct, but it causes split failure on shutdown to log an error, which causes CI instability. Previously, aborted_exception would be triggered instead which is logged as warning. Let's do the same.
Fixes https://scylladb.atlassian.net/browse/SCYLLADB-951.
Fixes https://github.com/scylladb/scylladb/issues/24850.
Only 2026.1 is affected.
Closes scylladb/scylladb#29032
* github.com:scylladb/scylladb:
replica: Demote log level on split failure during shutdown
service: Demote log level on split failure during shutdown
This commit is contained in:
@@ -1268,9 +1268,15 @@ future<> compaction_manager::start(const db::config& cfg, utils::disk_space_moni
|
||||
if (dsm && (this_shard_id() == 0)) {
|
||||
_out_of_space_subscription = dsm->subscribe(cfg.critical_disk_utilization_level, [this] (auto threshold_reached) {
|
||||
if (threshold_reached) {
|
||||
return container().invoke_on_all([] (compaction_manager& cm) { return cm.drain(); });
|
||||
return container().invoke_on_all([] (compaction_manager& cm) {
|
||||
cm._in_critical_disk_utilization_mode = true;
|
||||
return cm.drain();
|
||||
});
|
||||
}
|
||||
return container().invoke_on_all([] (compaction_manager& cm) { cm.enable(); });
|
||||
return container().invoke_on_all([] (compaction_manager& cm) {
|
||||
cm._in_critical_disk_utilization_mode = false;
|
||||
cm.enable();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2348,6 +2354,16 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_spl
|
||||
return perform_task_on_all_files<split_compaction_task_executor>("split", info, t, std::move(options), std::move(owned_ranges_ptr), std::move(get_sstables), throw_if_stopping::no);
|
||||
}
|
||||
|
||||
std::exception_ptr compaction_manager::make_disabled_exception(compaction::compaction_group_view& cg) {
|
||||
std::exception_ptr ex;
|
||||
if (_in_critical_disk_utilization_mode) {
|
||||
ex = std::make_exception_ptr(std::runtime_error("critical disk utilization"));
|
||||
} else {
|
||||
ex = std::make_exception_ptr(compaction_stopped_exception(cg.schema()->ks_name(), cg.schema()->cf_name(), "compaction disabled"));
|
||||
}
|
||||
return ex;
|
||||
}
|
||||
|
||||
future<std::vector<sstables::shared_sstable>>
|
||||
compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
|
||||
if (!split_compaction_task_executor::sstable_needs_split(sst, opt)) {
|
||||
@@ -2357,8 +2373,7 @@ compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compac
|
||||
// We don't want to prevent split because compaction is temporarily disabled on a view only for synchronization,
|
||||
// which is unneeded against new sstables that aren't part of any set yet, so never use can_proceed(&t) here.
|
||||
if (is_disabled()) {
|
||||
co_return coroutine::exception(std::make_exception_ptr(std::runtime_error(format("Cannot split {} because manager has compaction disabled, " \
|
||||
"reason might be out of space prevention", sst->get_filename()))));
|
||||
co_return coroutine::exception(make_disabled_exception(t));
|
||||
}
|
||||
std::vector<sstables::shared_sstable> ret;
|
||||
|
||||
|
||||
@@ -115,6 +115,8 @@ private:
|
||||
uint32_t _disabled_state_count = 0;
|
||||
|
||||
bool is_disabled() const { return _state != state::running || _disabled_state_count > 0; }
|
||||
// precondition: is_disabled() is true.
|
||||
std::exception_ptr make_disabled_exception(compaction::compaction_group_view& cg);
|
||||
|
||||
std::optional<future<>> _stop_future;
|
||||
|
||||
@@ -170,6 +172,7 @@ private:
|
||||
shared_tombstone_gc_state _shared_tombstone_gc_state;
|
||||
|
||||
utils::disk_space_monitor::subscription _out_of_space_subscription;
|
||||
bool _in_critical_disk_utilization_mode = false;
|
||||
private:
|
||||
// Requires task->_compaction_state.gate to be held and task to be registered in _tasks.
|
||||
future<compaction_stats_opt> perform_task(shared_ptr<compaction::compaction_task_executor> task, throw_if_stopping do_throw_if_stopping);
|
||||
|
||||
@@ -1465,6 +1465,7 @@ table::add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
|
||||
sstables::offstrategy offstrategy) {
|
||||
std::vector<sstables::shared_sstable> ret, ssts;
|
||||
std::exception_ptr ex;
|
||||
log_level failure_log_level = log_level::error;
|
||||
try {
|
||||
bool trigger_compaction = offstrategy == sstables::offstrategy::no;
|
||||
auto& cg = compaction_group_for_sstable(new_sst);
|
||||
@@ -1486,6 +1487,9 @@ table::add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
|
||||
co_await do_add_sstable_and_update_cache(cg, sst, offstrategy, trigger_compaction);
|
||||
sst = nullptr;
|
||||
}
|
||||
} catch (compaction::compaction_stopped_exception&) {
|
||||
failure_log_level = log_level::warn;
|
||||
ex = std::current_exception();
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
@@ -1493,13 +1497,13 @@ table::add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
|
||||
if (ex) {
|
||||
// on failed split, input sstable is unlinked here.
|
||||
if (new_sst) {
|
||||
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", new_sst->get_filename(), new_sst->get_origin(), ex);
|
||||
tlogger.log(failure_log_level, "Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", new_sst->get_filename(), new_sst->get_origin(), ex);
|
||||
co_await new_sst->unlink();
|
||||
}
|
||||
// on failure after successful split, sstables not attached yet will be unlinked
|
||||
co_await coroutine::parallel_for_each(ssts, [&ex] (sstables::shared_sstable sst) -> future<> {
|
||||
co_await coroutine::parallel_for_each(ssts, [&ex, failure_log_level] (sstables::shared_sstable sst) -> future<> {
|
||||
if (sst) {
|
||||
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
|
||||
tlogger.log(failure_log_level, "Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
|
||||
co_await sst->unlink();
|
||||
}
|
||||
});
|
||||
@@ -1513,6 +1517,7 @@ table::add_new_sstables_and_update_cache(std::vector<sstables::shared_sstable> n
|
||||
std::function<future<>(sstables::shared_sstable)> on_add) {
|
||||
std::exception_ptr ex;
|
||||
std::vector<sstables::shared_sstable> ret;
|
||||
log_level failure_log_level = log_level::error;
|
||||
|
||||
// We rely on add_new_sstable_and_update_cache() to unlink the sstable fed into it,
|
||||
// so the exception handling below will only have to unlink sstables not processed yet.
|
||||
@@ -1522,14 +1527,17 @@ table::add_new_sstables_and_update_cache(std::vector<sstables::shared_sstable> n
|
||||
std::ranges::move(ssts, std::back_inserter(ret));
|
||||
|
||||
}
|
||||
} catch (compaction::compaction_stopped_exception&) {
|
||||
failure_log_level = log_level::warn;
|
||||
ex = std::current_exception();
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
if (ex) {
|
||||
co_await coroutine::parallel_for_each(new_ssts, [&ex] (sstables::shared_sstable sst) -> future<> {
|
||||
co_await coroutine::parallel_for_each(new_ssts, [&ex, failure_log_level] (sstables::shared_sstable sst) -> future<> {
|
||||
if (sst) {
|
||||
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
|
||||
tlogger.log(failure_log_level, "Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
|
||||
co_await sst->unlink();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -3026,6 +3026,8 @@ future<> storage_service::drain() {
|
||||
}
|
||||
|
||||
future<> storage_service::do_drain() {
|
||||
co_await utils::get_local_injector().inject("storage_service_drain_wait", utils::wait_for_message(60s));
|
||||
|
||||
// Need to stop transport before group0, otherwise RPCs may fail with raft_group_not_found.
|
||||
co_await stop_transport();
|
||||
|
||||
@@ -4016,6 +4018,9 @@ future<> storage_service::process_tablet_split_candidate(table_id table) noexcep
|
||||
} catch (raft::request_aborted& ex) {
|
||||
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
|
||||
break;
|
||||
} catch (seastar::gate_closed_exception& ex) {
|
||||
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
|
||||
break;
|
||||
} catch (...) {
|
||||
slogger.error("Failed to complete splitting of table {} due to {}, retrying after {} seconds",
|
||||
table, std::current_exception(), split_retry.sleep_time());
|
||||
|
||||
@@ -542,7 +542,7 @@ async def test_repair_failure_on_split_rejection(manager: ManagerClient, volumes
|
||||
|
||||
# Expect repair to fail when splitting new sstables
|
||||
await log.wait_for("Repair for tablet migration of .* failed", from_mark=mark)
|
||||
await log.wait_for("Cannot split .* because manager has compaction disabled", from_mark=mark)
|
||||
await log.wait_for("Failed to load SSTable.*\(critical disk utilization\)", from_mark=mark)
|
||||
|
||||
assert await log.grep(f"compaction.*Split {cf}", from_mark=mark) == []
|
||||
|
||||
|
||||
@@ -2226,4 +2226,75 @@ async def test_split_and_intranode_synchronization(manager: ManagerClient):
|
||||
tablet_count = await get_tablet_count(manager, server, ks, 'test')
|
||||
return tablet_count >= expected_tablet_count or None
|
||||
# Give enough time for split to happen in debug mode
|
||||
await wait_for(finished_splitting, time.time() + 120)
|
||||
await wait_for(finished_splitting, time.time() + 120)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_split_stopped_on_shutdown(manager: ManagerClient):
|
||||
logger.info('Bootstrapping cluster')
|
||||
cfg = { 'enable_tablets': True,
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
}
|
||||
cmdline = [
|
||||
'--logger-log-level', 'debug_error_injection=debug',
|
||||
'--smp', '1',
|
||||
]
|
||||
server = await manager.server_add(cmdline=cmdline, config=cfg)
|
||||
|
||||
logger.info(f'server_id = {server.server_id}')
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
|
||||
initial_tablets = 2
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
||||
|
||||
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
|
||||
|
||||
# insert data
|
||||
pks = range(256)
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
|
||||
|
||||
# flush the table
|
||||
await manager.api.flush_keyspace(server.ip_addr, ks)
|
||||
|
||||
# force split on the test table
|
||||
expected_tablet_count = 4
|
||||
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
||||
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
log_mark = await log.mark()
|
||||
|
||||
await manager.api.enable_injection(server.ip_addr, "splitting_mutation_writer_switch_wait", one_shot=True)
|
||||
await manager.api.enable_injection(server.ip_addr, "storage_service_drain_wait", one_shot=True)
|
||||
await manager.enable_tablet_balancing()
|
||||
|
||||
await log.wait_for('Emitting resize decision of type split', from_mark=log_mark)
|
||||
await log.wait_for('splitting_mutation_writer_switch_wait: waiting', from_mark=log_mark)
|
||||
|
||||
log_mark = await log.mark()
|
||||
|
||||
shutdown_task = asyncio.create_task(manager.server_stop_gracefully(server.server_id))
|
||||
|
||||
await log.wait_for('Stopping.*ongoing compactions')
|
||||
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
|
||||
|
||||
await log.wait_for('storage_service_drain_wait: waiting', from_mark=log_mark)
|
||||
await log.wait_for('Failed to complete splitting of table', from_mark=log_mark)
|
||||
|
||||
await manager.api.message_injection(server.ip_addr, "storage_service_drain_wait")
|
||||
|
||||
await shutdown_task
|
||||
|
||||
errors = await log.grep_for_errors(from_mark=log_mark)
|
||||
assert errors == []
|
||||
|
||||
await manager.server_start(server.server_id)
|
||||
await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60)
|
||||
|
||||
await log.wait_for('Detected tablet split for table', from_mark=log_mark)
|
||||
tablet_count = await get_tablet_count(manager, server, ks, 'test')
|
||||
assert tablet_count >= expected_tablet_count
|
||||
|
||||
Reference in New Issue
Block a user