Merge '[Backport 2025.1] replica: Fix split compaction when tablet boundaries change' from Scylladb[bot]
Consider the following: 1) balancer emits split decision 2) split compaction starts 3) split decision is revoked 4) emits merge decision 5) completes merge, before compaction in step 2 finishes After last step, split compaction initiated in step 2 can fail because it works with the global tablet map, rather than the map when the compaction started. With the global state changing under its feet, on merge, the mutation splitting writer will think it's going backwards since sibling tablets are merged. This problem was also seen when running load-and-stream, where split initiated by the sstable writer failed, split completed, and the unsplit sstable is left in the table dir, causing problems in the restart. To fix this, let's make split compaction always work with the state when it started, not a global state. Fixes #24153. All 2025.* versions are vulnerable, so fix must be backported to them. - (cherry picked from commit0c1587473c) - (cherry picked from commit68f23d54d8) Parent PR: #25690 Closes scylladb/scylladb#25933 * github.com:scylladb/scylladb: replica: Fix split compaction when tablet boundaries change replica: Futurize split_compaction_options() test: fix flakiness of test_missing_data
This commit is contained in:
@@ -281,6 +281,15 @@ const tablet_map& tablet_metadata::get_tablet_map(table_id id) const {
|
||||
}
|
||||
}
|
||||
|
||||
future<tablet_metadata::tablet_map_ptr> tablet_metadata::get_tablet_map_ptr(table_id id) const {
|
||||
try {
|
||||
// lightweight since it only copies the shared ptr, not the map itself.
|
||||
co_return co_await _tablets.at(id).copy();
|
||||
} catch (const std::out_of_range&) {
|
||||
throw_with_backtrace<no_such_tablet_map>(id);
|
||||
}
|
||||
}
|
||||
|
||||
bool tablet_metadata::has_tablet_map(table_id id) const {
|
||||
return _tablets.contains(id);
|
||||
}
|
||||
|
||||
@@ -611,6 +611,8 @@ private:
|
||||
public:
|
||||
bool balancing_enabled() const { return _balancing_enabled; }
|
||||
const tablet_map& get_tablet_map(table_id id) const;
|
||||
// Gets shared ownership of tablet map
|
||||
future<tablet_map_ptr> get_tablet_map_ptr(table_id id) const;
|
||||
bool has_tablet_map(table_id id) const;
|
||||
const table_to_tablet_map& all_tables() const { return _tablets; }
|
||||
size_t external_memory_usage() const;
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
|
||||
#include "mutation_writer/feed_writers.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
namespace mutation_writer {
|
||||
|
||||
@@ -44,6 +45,7 @@ private:
|
||||
auto wr = std::exchange(_current_writer, std::nullopt);
|
||||
co_await wr->close();
|
||||
allocate_new_writer_if_needed();
|
||||
co_await utils::get_local_injector().inject("splitting_mutation_writer_switch_wait", utils::wait_for_message(std::chrono::seconds(60)));
|
||||
}
|
||||
|
||||
// Called frequently, hence yields (and allocates)
|
||||
|
||||
@@ -771,7 +771,7 @@ private:
|
||||
return tablet_map().tablet_count();
|
||||
}
|
||||
|
||||
sstables::compaction_type_options::split split_compaction_options() const noexcept;
|
||||
future<sstables::compaction_type_options::split> split_compaction_options() const noexcept;
|
||||
|
||||
// Called when coordinator executes tablet splitting, i.e. commit the new tablet map with
|
||||
// each tablet split into two, so this replica will remap all of its compaction groups
|
||||
@@ -1066,16 +1066,21 @@ bool table::all_storage_groups_split() {
|
||||
return _sg_manager->all_storage_groups_split();
|
||||
}
|
||||
|
||||
sstables::compaction_type_options::split tablet_storage_group_manager::split_compaction_options() const noexcept {
|
||||
return {[this](dht::token t) {
|
||||
future<sstables::compaction_type_options::split> tablet_storage_group_manager::split_compaction_options() const noexcept {
|
||||
// Split must work with a snapshot of tablet map, since it expects stability
|
||||
// throughout its execution.
|
||||
auto erm = _t.get_effective_replication_map();
|
||||
auto tablet_map_ptr = co_await erm->get_token_metadata().tablets().get_tablet_map_ptr(schema()->id());
|
||||
|
||||
co_return sstables::compaction_type_options::split([tablet_map_ptr = make_lw_shared(std::move(tablet_map_ptr))] (dht::token t) {
|
||||
// Classifies the input stream into either left or right side.
|
||||
auto [_, side] = storage_group_of(t);
|
||||
auto [_, side] = (*tablet_map_ptr)->get_tablet_id_and_range_side(t);
|
||||
return mutation_writer::token_group_id(side);
|
||||
}};
|
||||
});
|
||||
}
|
||||
|
||||
future<> tablet_storage_group_manager::split_all_storage_groups(tasks::task_info tablet_split_task_info) {
|
||||
sstables::compaction_type_options::split opt = split_compaction_options();
|
||||
sstables::compaction_type_options::split opt = co_await split_compaction_options();
|
||||
|
||||
co_await utils::get_local_injector().inject("split_storage_groups_wait", [] (auto& handler) -> future<> {
|
||||
dblog.info("split_storage_groups_wait: waiting");
|
||||
@@ -1095,17 +1100,17 @@ future<> table::split_all_storage_groups(tasks::task_info tablet_split_task_info
|
||||
|
||||
future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t idx) {
|
||||
if (!tablet_map().needs_split()) {
|
||||
return make_ready_future<>();
|
||||
co_return;
|
||||
}
|
||||
tasks::task_info tablet_split_task_info{tasks::task_id{tablet_map().resize_task_info().tablet_task_id.uuid()}, 0};
|
||||
|
||||
auto& sg = _storage_groups[idx];
|
||||
auto sg = _storage_groups[idx];
|
||||
if (!sg) {
|
||||
on_internal_error(tlogger, format("Tablet {} of table {}.{} is not allocated in this shard",
|
||||
idx, schema()->ks_name(), schema()->cf_name()));
|
||||
}
|
||||
|
||||
return sg->split(split_compaction_options(), tablet_split_task_info);
|
||||
co_return co_await sg->split(co_await split_compaction_options(), tablet_split_task_info);
|
||||
}
|
||||
|
||||
future<std::vector<sstables::shared_sstable>>
|
||||
@@ -1116,7 +1121,7 @@ tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable
|
||||
|
||||
auto& cg = compaction_group_for_sstable(sst);
|
||||
auto holder = cg.async_gate().hold();
|
||||
co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, cg.as_table_state(), split_compaction_options());
|
||||
co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, cg.as_table_state(), co_await split_compaction_options());
|
||||
}
|
||||
|
||||
future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
|
||||
|
||||
@@ -1835,3 +1835,77 @@ async def test_tablet_cleanup_vs_snapshot_race(manager: ManagerClient):
|
||||
await s0_log.wait_for('Cleanup failed for tablet', from_mark=s0_mark)
|
||||
|
||||
await manager.api.take_snapshot(servers[0].ip_addr, ks, "test_snapshot")
|
||||
|
||||
async def live_update_config(manager: ManagerClient, servers: list[ServerInfo], key: str, value: str):
|
||||
cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, deadline = time.time() + 60)
|
||||
await asyncio.gather(*[cql.run_async("UPDATE system.config SET value=%s WHERE name=%s", [value, key], host=host) for host in hosts])
|
||||
|
||||
# This is a test and reproducer for https://github.com/scylladb/scylladb/issues/24153
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_split_correctness_on_tablet_count_change(manager: ManagerClient):
|
||||
logger.info('Bootstrapping cluster')
|
||||
cfg = { 'enable_tablets': True,
|
||||
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
|
||||
}
|
||||
cmdline = [
|
||||
'--target-tablet-size-in-bytes', '10000',
|
||||
'--logger-log-level', 'load_balancer=debug',
|
||||
'--logger-log-level', 'debug_error_injection=debug',
|
||||
'--smp', '1', # single cpu is needed to prevent intra-node migration which interacts badly with injection splitting_mutation_writer_switch_wait.
|
||||
]
|
||||
server = await manager.server_add(cmdline=cmdline, config=cfg)
|
||||
|
||||
logger.info(f'server_id = {server.server_id}')
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': 2}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};")
|
||||
|
||||
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
|
||||
|
||||
# insert data and force split
|
||||
total_keys = 100
|
||||
pks = range(total_keys)
|
||||
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
|
||||
for pk in pks:
|
||||
value = random.randbytes(2000)
|
||||
cql.execute(insert, [pk, value])
|
||||
|
||||
# flush the table
|
||||
await manager.api.flush_keyspace(server.ip_addr, ks)
|
||||
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
log_mark = await log.mark()
|
||||
|
||||
await manager.api.enable_injection(server.ip_addr, "splitting_mutation_writer_switch_wait", one_shot=True)
|
||||
await manager.api.enable_tablet_balancing(server.ip_addr)
|
||||
|
||||
await log.wait_for('Emitting resize decision of type split', from_mark=log_mark)
|
||||
await log.wait_for('splitting_mutation_writer_switch_wait: waiting', from_mark=log_mark)
|
||||
|
||||
await manager.api.enable_injection(server.ip_addr, "merge_completion_fiber", one_shot=True)
|
||||
|
||||
last_tablet_count = await get_tablet_count(manager, server, ks, 'test')
|
||||
|
||||
# trigger merge by increasing target avg tablet significantly
|
||||
await live_update_config(manager, [server], "target_tablet_size_in_bytes", "500000")
|
||||
|
||||
# wait for merge to complete
|
||||
actual_tablet_count = last_tablet_count
|
||||
started = time.time()
|
||||
while actual_tablet_count >= last_tablet_count:
|
||||
actual_tablet_count = await get_tablet_count(manager, server, ks, 'test')
|
||||
|
||||
assert time.time() - started < 120, 'Timeout while waiting for tablet merge'
|
||||
await asyncio.sleep(.1)
|
||||
|
||||
logger.info(f'Merged test table; new number of tablets: {actual_tablet_count}')
|
||||
|
||||
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
|
||||
await asyncio.sleep(.1)
|
||||
await manager.api.message_injection(server.ip_addr, "merge_completion_fiber")
|
||||
|
||||
Reference in New Issue
Block a user