Merge 'raft/group0_state_machine: load current RPC compression dict on startup' from Michał Chojnowski

We are supposed to be loading the most recent RPC compression dictionary
on startup, but we forgot to port the relevant piece of logic during
the source-available port. This causes a restarted node not to use the
dictionary for RPC compression until the next dictionary update.

Fix that.

Fixes scylladb/scylladb#22738

This is more of a bugfix than an improvement, so it should be backported to 2025.1.

Closes scylladb/scylladb#22739

* github.com:scylladb/scylladb:
  test_rpc_compression.py: test the dictionaries are loaded on startup
  raft/group0_state_machine: load current RPC compression dict on startup
This commit is contained in:
Avi Kivity
2025-02-10 20:40:33 +02:00
3 changed files with 11 additions and 3 deletions

View File

@@ -57,7 +57,7 @@ group0_state_machine::group0_state_machine(raft_group0_client& client, migration
group0_server_accessor server_accessor, gms::gossiper& gossiper, gms::feature_service& feat, group0_server_accessor server_accessor, gms::gossiper& gossiper, gms::feature_service& feat,
bool topology_change_enabled) bool topology_change_enabled)
: _client(client), _mm(mm), _sp(sp), _ss(ss), _topology_change_enabled(topology_change_enabled) : _client(client), _mm(mm), _sp(sp), _ss(ss), _topology_change_enabled(topology_change_enabled)
, _state_id_handler(sp.local_db(), gossiper, server_accessor) , _state_id_handler(sp.local_db(), gossiper, server_accessor), _feature_service(feat)
, _topology_on_raft_support_listener(feat.supports_consistent_topology_changes.when_enabled([this] () noexcept { , _topology_on_raft_support_listener(feat.supports_consistent_topology_changes.when_enabled([this] () noexcept {
// Using features to decide whether to start fetching topology snapshots // Using features to decide whether to start fetching topology snapshots
// or not is technically not correct because we also use features to guard // or not is technically not correct because we also use features to guard
@@ -322,6 +322,9 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
// memory and thus needs to be protected with apply mutex // memory and thus needs to be protected with apply mutex
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(_abort_source); auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(_abort_source);
co_await _ss.topology_state_load(); co_await _ss.topology_state_load();
if (_feature_service.compression_dicts) {
co_await _ss.compression_dictionary_updated_callback();
}
_ss._topology_state_machine.event.broadcast(); _ss._topology_state_machine.event.broadcast();
} }

View File

@@ -106,6 +106,7 @@ class group0_state_machine : public raft_state_machine {
abort_source _abort_source; abort_source _abort_source;
bool _topology_change_enabled; bool _topology_change_enabled;
group0_state_id_handler _state_id_handler; group0_state_id_handler _state_id_handler;
gms::feature_service& _feature_service;
gms::feature::listener_registration _topology_on_raft_support_listener; gms::feature::listener_registration _topology_on_raft_support_listener;
modules_to_reload get_modules_to_reload(const std::vector<canonical_mutation>& mutations); modules_to_reload get_modules_to_reload(const std::vector<canonical_mutation>& mutations);

View File

@@ -208,11 +208,15 @@ async def test_external_dicts(manager: ManagerClient) -> None:
assert approximately_equal(compressed, expected_ratio * volume, 0.8) assert approximately_equal(compressed, expected_ratio * volume, 0.8)
await with_retries(functools.partial(test_once, "lz4", 0.5), timeout=600) await with_retries(functools.partial(test_once, "lz4", 0.5), timeout=600)
await live_update_config(manager, servers, "internode_compression_zstd_max_cpu_fraction", "1.0"), await live_update_config(manager, servers, "internode_compression_zstd_max_cpu_fraction", "1.0"),
await with_retries(functools.partial(test_once, "zstd", 0.25), timeout=600) await with_retries(functools.partial(test_once, "zstd", 0.25), timeout=600)
# Test that the dicts are loaded on startup.
await asyncio.gather(*[manager.server_stop_gracefully(s.server_id) for s in servers])
await asyncio.gather(*[manager.server_update_config(s.server_id, 'rpc_dict_training_when', 'never') for s in servers])
await asyncio.gather(*[manager.server_start(s.server_id) for s in servers])
await with_retries(functools.partial(test_once, "lz4", 0.5), timeout=10)
# Similar to test_external_dicts, but simpler. # Similar to test_external_dicts, but simpler.
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_external_dicts_sanity(manager: ManagerClient) -> None: async def test_external_dicts_sanity(manager: ManagerClient) -> None: