From 71957b4320dc5928218c711f5954281ae834427c Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 11 Jan 2024 15:19:18 +0100 Subject: [PATCH 1/6] storage_service: separate logger for raft topology Allows selectively enabling higher logging levels for just raft-topology related things, without doing it for the entire storage_service (which includes things like gossiper callbacks). Also gets rid of the redundant "raft topology:" prefix which was also not included everywhere. --- service/storage_service.cc | 373 +++++++++--------- test/topology/test_automatic_cleanup.py | 10 +- .../test_coordinator_queue_management.py | 4 +- .../test_topology_failure_recovery.py | 10 +- .../topology_custom/test_remove_alive_node.py | 2 +- .../test_topology_failure_recovery.py | 2 +- test/topology_custom/test_topology_smp.py | 2 +- .../test_cdc_generation_clearing.py | 4 +- .../test_tablets.py | 2 + 9 files changed, 206 insertions(+), 203 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 364f9a74e6..aaa6256cc0 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -102,6 +102,7 @@ namespace db { namespace service { static logging::logger slogger("storage_service"); +static logging::logger rtlogger("raft_topology"); static thread_local session_manager topology_session_manager; @@ -302,7 +303,7 @@ static future wait_for_ip(raft::server_id id, const raft_address_m id, std::chrono::duration_cast(timeout).count())))); } static thread_local logger::rate_limit rate_limit{std::chrono::seconds(1)}; - slogger.log(log_level::warn, rate_limit, "raft topology: cannot map {} to ip, retrying.", id); + rtlogger.log(log_level::warn, rate_limit, "cannot map {} to ip, retrying.", id); co_await sleep_abortable(std::chrono::milliseconds(5), as); } } @@ -360,7 +361,7 @@ static locator::node::state to_topology_node_state(node_state ns) { case node_state::rebuilding: return locator::node::state::normal; case node_state::none: return locator::node::state::none; } - on_internal_error(slogger, format("unhandled node state: {}", ns)); + on_internal_error(rtlogger, format("unhandled node state: {}", ns)); } // Synchronizes the local node state (token_metadata, system.peers/system.local tables, @@ -395,7 +396,7 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm locator::host_id host_id{id.uuid()}; auto ip = am.find(id); - slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}", + rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}", id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup); // Save tokens, not needed for raft topology management, but needed by legacy // Also ip -> id mapping is needed for address map recreation on reboot @@ -427,7 +428,7 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm locator::host_id host_id{id.uuid()}; auto ip = am.find(id); - slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}", + rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}", id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, seastar::value_of([&] () -> sstring { return rs.ring ? ::format("{}", rs.ring->tokens) : sstring("null"); @@ -469,7 +470,7 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm auto existing_ip = am.find(replaced_id); if (!existing_ip) { // FIXME: What if not known? - on_fatal_internal_error(slogger, ::format("Cannot map id of a node being replaced {} to its ip", replaced_id)); + on_fatal_internal_error(rtlogger, ::format("Cannot map id of a node being replaced {} to its ip", replaced_id)); } assert(existing_ip); const auto replaced_host_id = locator::host_id(replaced_id.uuid()); @@ -491,7 +492,7 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm co_await process_normal_node(id, rs); break; default: - on_fatal_internal_error(slogger, ::format("Unexpected state {} for node {}", rs.state, id)); + on_fatal_internal_error(rtlogger, ::format("Unexpected state {} for node {}", rs.state, id)); } }; @@ -534,7 +535,7 @@ future<> storage_service::topology_state_load() { co_return; } - slogger.debug("raft topology: reload raft topology state"); + rtlogger.debug("reload raft topology state"); // read topology state from disk and recreate token_metadata from it _topology_state_machine._topology = co_await _sys_ks.local().load_topology_state(); @@ -615,7 +616,7 @@ future<> storage_service::topology_state_load() { } if (auto gen_id = _topology_state_machine._topology.current_cdc_generation_id) { - slogger.debug("topology_state_load: current CDC generation ID: {}", *gen_id); + rtlogger.debug("topology_state_load: current CDC generation ID: {}", *gen_id); co_await _cdc_gens.local().handle_cdc_generation(*gen_id); } } @@ -1144,7 +1145,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, shardeddone(); } catch (...) { - slogger.error("raft topology: cleanup failed keyspace={} tables={} failed: {}", task->get_status().keyspace, table_infos, std::current_exception()); + rtlogger.error("cleanup failed keyspace={} tables={} failed: {}", task->get_status().keyspace, table_infos, std::current_exception()); throw; } }; @@ -1155,11 +1156,11 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, shardedsecond.cleanup != cleanup_status::running) { - slogger.trace("raft topology: cleanup triggered, but not needed"); + rtlogger.trace("cleanup triggered, but not needed"); continue; } - slogger.info("raft topology: start cleanup"); + rtlogger.info("start cleanup"); auto keyspaces = _db.local().get_all_keyspaces(); @@ -1186,7 +1187,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, shardedclient().start_operation(&_group0_as); @@ -1199,20 +1200,20 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, shardedclient().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: cleanup flag clearing: concurrent operation is detected, retrying."); + rtlogger.info("cleanup flag clearing: concurrent operation is detected, retrying."); continue; } break; } - slogger.debug("raft topology: cleanup flag cleared"); + rtlogger.debug("cleanup flag cleared"); } catch (const seastar::abort_requested_exception &) { - slogger.info("raft topology: cleanup fiber aborted"); + rtlogger.info("cleanup fiber aborted"); break; } catch (raft::request_aborted&) { - slogger.info("raft topology: cleanup fiber aborted"); + rtlogger.info("cleanup fiber aborted"); break; } catch (...) { - slogger.error("raft topology: cleanup fiber got an error: {}", std::current_exception()); + rtlogger.error("cleanup fiber got an error: {}", std::current_exception()); err = true; } if (err) { @@ -1293,11 +1294,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (!to_remove.empty()) { // Remove from group 0 nodes that left. They may failed to do so by themselves try { - slogger.trace("raft topology: topology coordinator fiber removing {}" + rtlogger.trace("topology coordinator fiber removing {}" " from raft since they are in `left` state", to_remove); co_await _group0.group0_server().modify_config({}, to_remove, &_as); } catch (const raft::commit_status_unknown&) { - slogger.trace("raft topology: topology coordinator fiber got unknown status" + rtlogger.trace("topology coordinator fiber got unknown status" " while removing {} from raft", to_remove); } } @@ -1387,7 +1388,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (!next_req) { // We did not find a request that has enough live node to proceed // Cancel all requests to let admin know that no operation can succeed - slogger.warn("topology coordinator: cancel request queue because no request can proceed. Dead nodes: {}", dead_nodes); + rtlogger.warn("topology coordinator: cancel request queue because no request can proceed. Dead nodes: {}", dead_nodes); return cancel_requests{std::move(guard), std::move(dead_nodes)}; } @@ -1405,8 +1406,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto& topo = _topo_sm._topology; if (topo.transition_nodes.empty()) { - on_internal_error(slogger, ::format( - "raft topology: could not find node to work on" + on_internal_error(rtlogger, ::format( + "could not find node to work on" " even though the state requires it (state: {})", topo.tstate)); } @@ -1454,12 +1455,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { future<> update_topology_state( group0_guard guard, std::vector&& updates, const sstring& reason) { try { - slogger.trace("raft topology: do update {} reason {}", updates, reason); + rtlogger.trace("do update {} reason {}", updates, reason); topology_change change{std::move(updates)}; group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: race while changing state: {}. Retrying", reason); + rtlogger.info("race while changing state: {}. Retrying", reason); throw; } }; @@ -1499,13 +1500,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { future<> exec_direct_command_helper(raft::server_id id, uint64_t cmd_index, const raft_topology_cmd& cmd) { auto ip = _address_map.find(id); if (!ip) { - slogger.warn("raft topology: cannot send command {} with term {} and index {} " + rtlogger.warn("cannot send command {} with term {} and index {} " "to {} because mapping to ip is not available", cmd.cmd, _term, cmd_index, id); co_await coroutine::exception(std::make_exception_ptr( std::runtime_error(::format("no ip address mapping for {}", id)))); } - slogger.trace("raft topology: send {} command with term {} and index {} to {}/{}", + rtlogger.trace("send {} command with term {} and index {} to {}/{}", cmd.cmd, _term, cmd_index, id, *ip); auto result = _db.get_token_metadata().get_topology().is_me(*ip) ? co_await _raft_topology_cmd_handler(_term, cmd_index, cmd) : @@ -1577,9 +1578,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { }; future<> remove_from_group0(const raft::server_id& id) { - slogger.info("raft topology: removing node {} from group 0 configuration...", id); + rtlogger.info("removing node {} from group 0 configuration...", id); co_await _group0.remove_from_raft_config(id); - slogger.info("raft topology: node {} removed from group 0 configuration", id); + rtlogger.info("node {} removed from group 0 configuration", id); } future<> step_down_as_nonvoter() { @@ -1617,15 +1618,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (!ep) { // get_sharding_info is only called for bootstrap tokens // or for tokens present in token_metadata - on_internal_error(slogger, ::format( - "raft topology: make_new_cdc_generation_data: get_sharding_info:" + on_internal_error(rtlogger, ::format( + "make_new_cdc_generation_data: get_sharding_info:" " can't find endpoint for token {}", end)); } auto ptr = _topo_sm._topology.find(raft::server_id{ep->uuid()}); if (!ptr) { - on_internal_error(slogger, ::format( - "raft topology: make_new_cdc_generation_data: get_sharding_info:" + on_internal_error(rtlogger, ::format( + "make_new_cdc_generation_data: get_sharding_info:" " couldn't find node {} in topology, owner of token {}", *ep, end)); } @@ -1673,7 +1674,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto [gen_uuid, gen_mutations] = co_await prepare_new_cdc_generation_data(tmptr, guard, binfo); if (gen_mutations.empty()) { - on_internal_error(slogger, "cdc_generation_data: gen_mutations is empty"); + on_internal_error(rtlogger, "cdc_generation_data: gen_mutations is empty"); } std::vector updates{gen_mutations.begin(), gen_mutations.end()}; @@ -1685,7 +1686,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto const reason = format( "insert CDC generation data (UUID: {}), part", gen_uuid); - slogger.trace("raft topology: do update {} reason {}", m, reason); + rtlogger.trace("do update {} reason {}", m, reason); write_mutations change{{std::move(m)}}; group0_command g0_cmd = _group0.client().prepare_command(std::move(change), reason); return _group0.client().add_entry_unguarded(std::move(g0_cmd), &_as); @@ -1780,13 +1781,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // // It also continually cleans the obsolete CDC generation data. future<> cdc_generation_publisher_fiber() { - slogger.trace("raft topology: start CDC generation publisher fiber"); + rtlogger.trace("start CDC generation publisher fiber"); while (!_as.abort_requested()) { co_await utils::get_local_injector().inject_with_handler("cdc_generation_publisher_fiber", [] (auto& handler) -> future<> { - slogger.info("raft toplogy: CDC generation publisher fiber sleeps after injection"); + rtlogger.info("CDC generation publisher fiber sleeps after injection"); co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); - slogger.info("raft toplogy: CDC generation publisher fiber finishes sleeping after injection"); + rtlogger.info("CDC generation publisher fiber finishes sleeping after injection"); }); bool sleep = false; @@ -1807,28 +1808,28 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (_topo_sm._topology.unpublished_cdc_generations.empty()) { // No CDC generations to publish. Wait until one appears or the topology coordinator aborts. - slogger.trace("raft topology: CDC generation publisher fiber has nothing to do. Sleeping."); + rtlogger.trace("CDC generation publisher fiber has nothing to do. Sleeping."); co_await _topo_sm.event.when([&] () { return !_topo_sm._topology.unpublished_cdc_generations.empty() || _as.abort_requested(); }); - slogger.trace("raft topology: CDC generation publisher fiber wakes up"); + rtlogger.trace("CDC generation publisher fiber wakes up"); } } catch (raft::request_aborted&) { - slogger.debug("raft topology: CDC generation publisher fiber aborted"); + rtlogger.debug("CDC generation publisher fiber aborted"); } catch (seastar::abort_requested_exception) { - slogger.debug("raft topology: CDC generation publisher fiber aborted"); + rtlogger.debug("CDC generation publisher fiber aborted"); } catch (group0_concurrent_modification&) { } catch (term_changed_error&) { - slogger.debug("raft topology: CDC generation publisher fiber notices term change {} -> {}", _term, _raft.get_current_term()); + rtlogger.debug("CDC generation publisher fiber notices term change {} -> {}", _term, _raft.get_current_term()); } catch (...) { - slogger.error("raft topology: CDC generation publisher fiber got error {}", std::current_exception()); + rtlogger.error("CDC generation publisher fiber got error {}", std::current_exception()); sleep = true; } if (sleep) { try { co_await seastar::sleep_abortable(std::chrono::seconds(1), _as); } catch (...) { - slogger.debug("raft topology: CDC generation publisher: sleep failed: {}", std::current_exception()); + rtlogger.debug("CDC generation publisher: sleep failed: {}", std::current_exception()); } } co_await coroutine::maybe_yield(); @@ -1840,7 +1841,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { future<> handle_global_request(group0_guard guard) { switch (_topo_sm._topology.global_request.value()) { case global_topology_request::new_cdc_generation: { - slogger.info("raft topology: new CDC generation requested"); + rtlogger.info("new CDC generation requested"); auto tmptr = get_token_metadata_ptr(); auto [gen_uuid, guard_, mutation] = co_await prepare_and_broadcast_cdc_generation_data(tmptr, std::move(guard), std::nullopt); @@ -1871,7 +1872,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // marked as supported by all normal nodes and it is not empty future<> enable_features(group0_guard guard, std::set features_to_enable) { if (!_topo_sm._topology.transition_nodes.empty()) { - on_internal_error(slogger, + on_internal_error(rtlogger, "topology coordinator attempted to enable features even though there is" " a topology operations in progress"); } @@ -1916,7 +1917,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto reason = ::format("enabling features: {}", features_to_enable); co_await update_topology_state(std::move(guard), {builder.build()}, reason); - slogger.info("raft topology: enabled features: {}", features_to_enable); + rtlogger.info("enabled features: {}", features_to_enable); } future global_token_metadata_barrier(group0_guard&& guard, std::unordered_set exclude_nodes = {}) { @@ -1924,7 +1925,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { try { guard = co_await exec_global_command(std::move(guard), raft_topology_cmd::command::barrier_and_drain, exclude_nodes, drop_guard_and_retake::yes); } catch (...) { - slogger.error("raft topology: drain rpc failed, proceed to fence old writes: {}", std::current_exception()); + rtlogger.error("drain rpc failed, proceed to fence old writes: {}", std::current_exception()); drain_failed = true; } if (drain_failed) { @@ -1984,7 +1985,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (!holder || holder->failed()) { holder = futurize_invoke(action) .finally([this, g = _async_gate.hold(), gid, name] () noexcept { - slogger.trace("raft topology: {} for tablet {} resolved.", name, gid); + rtlogger.trace("{} for tablet {} resolved.", name, gid); _tablets_ready = true; _topo_sm.event.broadcast(); }); @@ -1992,7 +1993,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } if (!holder->available()) { - slogger.trace("raft topology: Tablet {} still doing {}", gid, name); + rtlogger.trace("Tablet {} still doing {}", gid, name); return false; } @@ -2020,7 +2021,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(mig.tablet.table); auto last_token = tmap.get_last_token(mig.tablet.tablet); if (tmap.get_tablet_transition_info(mig.tablet.tablet)) { - slogger.warn("Tablet already in transition, ignoring migration: {}", mig); + rtlogger.warn("Tablet already in transition, ignoring migration: {}", mig); return; } out.emplace_back( @@ -2047,7 +2048,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // If progress cannot be made, e.g. because all transitions are streaming, we block // and wait for notification. - slogger.trace("raft topology: handle_tablet_migration()"); + rtlogger.trace("handle_tablet_migration()"); std::vector updates; bool needs_barrier = false; bool has_transitions = false; @@ -2074,7 +2075,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { }; auto transition_to = [&] (locator::tablet_transition_stage stage) { - slogger.trace("raft topology: Will set tablet {} stage to {}", gid, stage); + rtlogger.trace("Will set tablet {} stage to {}", gid, stage); updates.emplace_back(get_mutation_builder() .set_stage(last_token, stage) .build()); @@ -2096,7 +2097,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { switch (trinfo.stage) { case locator::tablet_transition_stage::allow_write_both_read_old: if (do_barrier()) { - slogger.trace("raft topology: Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_old); + rtlogger.trace("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_old); updates.emplace_back(get_mutation_builder() .set_stage(last_token, locator::tablet_transition_stage::write_both_read_old) // Create session a bit earlier to avoid adding barrier @@ -2117,12 +2118,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { [] { throw std::runtime_error("stream_tablet failed due to error injection"); }); } if (advance_in_background(gid, tablet_state.streaming, "streaming", [&] { - slogger.info("raft topology: Initiating tablet streaming of {} to {}", gid, trinfo.pending_replica); + rtlogger.info("Initiating tablet streaming of {} to {}", gid, trinfo.pending_replica); auto dst = trinfo.pending_replica.host; return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging, netw::msg_addr(id2ip(dst)), _as, raft::server_id(dst.uuid()), gid); })) { - slogger.trace("raft topology: Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_new); + rtlogger.trace("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_new); updates.emplace_back(get_mutation_builder() .set_stage(last_token, locator::tablet_transition_stage::write_both_read_new) .del_session(last_token) @@ -2138,7 +2139,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { case locator::tablet_transition_stage::cleanup: if (advance_in_background(gid, tablet_state.cleanup, "cleanup", [&] { locator::tablet_replica dst = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo); - slogger.info("raft topology: Initiating tablet cleanup of {} on {}", gid, dst); + rtlogger.info("Initiating tablet cleanup of {} on {}", gid, dst); return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging, netw::msg_addr(id2ip(dst.host)), _as, raft::server_id(dst.host.uuid()), gid); })) { @@ -2172,7 +2173,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (ts != guard.write_timestamp()) { // We rely on the fact that should_preempt_balancing() does not release the guard // so that tablet metadata reading and updates are atomic. - on_internal_error(slogger, "should_preempt_balancing() retook the guard"); + on_internal_error(rtlogger, "should_preempt_balancing() retook the guard"); } } if (!preempt) { @@ -2215,7 +2216,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // Streaming may have finished after we checked. To avoid missed notification, we need // to check atomically with event.wait() if (!_tablets_ready) { - slogger.trace("raft topology: Going to sleep with active tablet transitions"); + rtlogger.trace("Going to sleep with active tablet transitions"); release_guard(std::move(guard)); co_await await_event(); } @@ -2292,7 +2293,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { try { co_await wait_for_ip(id, _address_map, _as); } catch (...) { - slogger.warn("wait_for_ip failed during cancelation: {}", std::current_exception()); + rtlogger.warn("wait_for_ip failed during cancelation: {}", std::current_exception()); } } break; @@ -2318,7 +2319,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { }, }); } catch (...) { - slogger.warn("raft topology: attempt to send rejection response to {} failed: {}. " + rtlogger.warn("attempt to send rejection response to {} failed: {}. " "The node may hang. It's safe to shut it down manually now.", id, std::current_exception()); } @@ -2383,7 +2384,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto reason = ::format("bootstrap: failed to accept {}", node.id); co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, reason); - slogger.info("raft topology: node {} moved to left state", node.id); + rtlogger.info("node {} moved to left state", node.id); break; } @@ -2431,8 +2432,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } break; default: - on_internal_error(slogger, - format("raft topology: topology is in join_group0 state, but the node" + on_internal_error(rtlogger, + format("topology is in join_group0 state, but the node" " being worked on ({}) is in unexpected state '{}'; should be" " either 'bootstrapping' or 'replacing'", node.id, node.rs->state)); } @@ -2451,7 +2452,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (group0_concurrent_modification&) { throw; } catch (...) { - slogger.error("raft topology: transition_state::commit_cdc_generation, " + rtlogger.error("transition_state::commit_cdc_generation, " "raft_topology_cmd::command::barrier failed, error {}", std::current_exception()); _rollback = fmt::format("Failed to commit cdc generation: {}", std::current_exception()); break; @@ -2465,8 +2466,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto cdc_gen_ts = cdc::new_generation_timestamp(add_ts_delay, _ring_delay); auto cdc_gen_uuid = _topo_sm._topology.new_cdc_generation_data_uuid; if (!cdc_gen_uuid) { - on_internal_error(slogger, - "raft topology: new CDC generation data UUID missing in `commit_cdc_generation` state"); + on_internal_error(rtlogger, + "new CDC generation data UUID missing in `commit_cdc_generation` state"); } cdc::generation_id_v2 cdc_gen_id { @@ -2479,8 +2480,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // This could happen if the topology coordinator's clock is broken. auto curr_gen_id = _topo_sm._topology.current_cdc_generation_id; if (curr_gen_id && curr_gen_id->ts >= cdc_gen_ts) { - on_internal_error(slogger, ::format( - "raft topology: new CDC generation has smaller timestamp than the previous one." + on_internal_error(rtlogger, ::format( + "new CDC generation has smaller timestamp than the previous one." " Old generation ID: {}, new generation ID: {}", *curr_gen_id, cdc_gen_id)); } } @@ -2530,7 +2531,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (group0_concurrent_modification&) { throw; } catch (...) { - slogger.error("raft topology: tablets draining failed with {}. Aborting the topology operation", std::current_exception()); + rtlogger.error("tablets draining failed with {}. Aborting the topology operation", std::current_exception()); _rollback = fmt::format("Failed to drain tablets: {}", std::current_exception()); } break; @@ -2545,7 +2546,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (group0_concurrent_modification&) { throw; } catch (...) { - slogger.error("raft topology: transition_state::write_both_read_old, " + rtlogger.error("transition_state::write_both_read_old, " "global_token_metadata_barrier failed, error {}", std::current_exception()); _rollback = fmt::format("global_token_metadata_barrier failed in write_both_read_old state {}", std::current_exception()); @@ -2572,7 +2573,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (node.rs->state == node_state::decommissioning && raft::configuration::voter_count(_group0.group0_server().get_configuration().current) % 2 == 0) { if (node.id == _raft.id()) { - slogger.info("raft topology: coordinator is decommissioning and becomes a non-voter; " + rtlogger.info("coordinator is decommissioning and becomes a non-voter; " "giving up leadership"); co_await step_down_as_nonvoter(); } else { @@ -2600,7 +2601,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (term_changed_error&) { throw; } catch (...) { - slogger.error("raft topology: send_raft_topology_cmd(stream_ranges) failed with exception" + rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception" " (node state is {}): {}", node.rs->state, std::current_exception()); _rollback = fmt::format("Failed stream ranges: {}", std::current_exception()); break; @@ -2627,7 +2628,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (group0_concurrent_modification&) { throw; } catch (...) { - slogger.error("raft topology: transition_state::write_both_read_new, " + rtlogger.error("transition_state::write_both_read_new, " "global_token_metadata_barrier failed, error {}", std::current_exception()); barrier_failed = true; @@ -2701,7 +2702,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } break; default: - on_fatal_internal_error(slogger, ::format( + on_fatal_internal_error(rtlogger, ::format( "Ring state on node {} is write_both_read_new while the node is in state {}", node.id, node.rs->state)); } @@ -2719,12 +2720,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // Used to start new topology transitions using node requests or perform node operations // that don't change the topology (like rebuild). future<> handle_node_transition(node_to_work_on&& node) { - slogger.info("raft topology: coordinator fiber found a node to work on id={} state={}", node.id, node.rs->state); + rtlogger.info("coordinator fiber found a node to work on id={} state={}", node.id, node.rs->state); switch (node.rs->state) { case node_state::none: { if (_topo_sm._topology.normal_nodes.empty()) { - slogger.info("raft topology: skipping join node handshake for the first node in the cluster"); + rtlogger.info("skipping join node handshake for the first node in the cluster"); } else { auto validation_result = validate_joining_node(node); @@ -2757,7 +2758,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { throw; } catch(...) { wait_for_ip_error = std::current_exception(); - slogger.warn("raft_topology_cmd::command::wait_for_ip failed, error {}", + rtlogger.warn("raft_topology_cmd::command::wait_for_ip failed, error {}", wait_for_ip_error); } if (wait_for_ip_error) { @@ -2784,14 +2785,14 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, reason); - slogger.info("raft topology: rejected node moved to left state {}", node.id); + rtlogger.info("rejected node moved to left state {}", node.id); try { co_await respond_to_joining_node(node.id, join_node_response_params{ .response = std::move(validation_result), }); } catch (const std::runtime_error& e) { - slogger.warn("raft topology: attempt to send rejection response to {} failed. " + rtlogger.warn("attempt to send rejection response to {} failed. " "The node may hang. It's safe to shut it down manually now. Error: {}", node.id, e.what()); } @@ -2842,7 +2843,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { rtbuilder.done("the node is alive"); co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()}, "reject removenode"); - slogger.warn("raft topology: rejected removenode operation for node {} " + rtlogger.warn("rejected removenode operation for node {} " "because it is alive", node.id); break; } @@ -2894,7 +2895,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (node.id == _raft.id()) { // Someone else needs to coordinate the rest of the decommission process, // because the decommissioning node is going to shut down in the middle of this state. - slogger.info("raft topology: coordinator is decommissioning; giving up leadership"); + rtlogger.info("coordinator is decommissioning; giving up leadership"); co_await step_down_as_nonvoter(); // Note: if we restart after this point and become a voter @@ -2912,7 +2913,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (group0_concurrent_modification&) { throw; } catch (...) { - slogger.error("raft topology: node_state::left_token_ring (node: {}), " + rtlogger.error("node_state::left_token_ring (node: {}), " "global_token_metadata_barrier failed, error {}", node.id, std::current_exception()); barrier_failed = true; @@ -2946,7 +2947,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { try { node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier); } catch (...) { - slogger.warn("raft topology: failed to tell node {} to shut down - it may hang." + rtlogger.warn("failed to tell node {} to shut down - it may hang." " It's safe to shut it down manually now. (Exception: {})", node.id, std::current_exception()); shutdown_failed = true; @@ -2975,7 +2976,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (term_changed_error&) { throw; } catch(...) { - slogger.warn("raft topology: failed to run barrier_and_drain during rollback {}", std::current_exception()); + rtlogger.warn("failed to run barrier_and_drain during rollback {}", std::current_exception()); barrier_failed = true; } @@ -2995,7 +2996,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto str = fmt::format("complete rollback of {} to state normal", node.id); - slogger.info("{}", str); + rtlogger.info("{}", str); co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, str); } break; @@ -3004,12 +3005,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { case node_state::removing: case node_state::replacing: // Should not get here - on_fatal_internal_error(slogger, ::format( + on_fatal_internal_error(rtlogger, ::format( "Found node {} in state {} but there is no ongoing topology transition", node.id, node.rs->state)); case node_state::left: // Should not get here - on_fatal_internal_error(slogger, ::format( + on_fatal_internal_error(rtlogger, ::format( "Topology coordinator is called for node {} in state 'left'", node.id)); break; } @@ -3037,7 +3038,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { const auto& supported_features = node.rs->supported_features; std::ranges::set_difference(node.topology->enabled_features, supported_features, std::back_inserter(unsupported_features)); if (!unsupported_features.empty()) { - slogger.warn("raft topology: node {} does not understand some features: {}", node.id, unsupported_features); + rtlogger.warn("node {} does not understand some features: {}", node.id, unsupported_features); return join_node_response_params::rejected{ .reason = format("Feature check failed. The node does not support some features that are enabled by the cluster: {}", unsupported_features), @@ -3074,7 +3075,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { }); responded = true; } catch (const std::runtime_error& e) { - slogger.warn("raft topology: attempt to send acceptance response to {} failed. " + rtlogger.warn("attempt to send acceptance response to {} failed. " "The node may hang. It's safe to shut it down manually now. Error: {}", node.id, e.what()); } @@ -3108,7 +3109,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { topology_mutation_builder builder(node.guard.write_timestamp()); builder.with_node(id).set("cleanup_status", cleanup_status::needed); muts.emplace_back(builder.build()); - slogger.trace("raft topology: mark node {} as needed cleanup", id); + rtlogger.trace("mark node {} as needed cleanup", id); } } return muts; @@ -3129,7 +3130,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { topology_mutation_builder builder(guard.write_timestamp()); builder.with_node(id).set("cleanup_status", cleanup_status::running); muts.emplace_back(builder.build()); - slogger.trace("raft topology: mark node {} as cleanup running", id); + rtlogger.trace("mark node {} as cleanup running", id); } } if (!muts.empty()) { @@ -3176,12 +3177,12 @@ public: }; future topology_coordinator::maybe_start_tablet_migration(group0_guard guard) { - slogger.debug("raft topology: Evaluating tablet balance"); + rtlogger.debug("Evaluating tablet balance"); auto tm = get_token_metadata_ptr(); auto plan = co_await _tablet_allocator.balance_tablets(tm); if (plan.empty()) { - slogger.debug("raft topology: Tablets are balanced"); + rtlogger.debug("Tablets are balanced"); co_return false; } @@ -3224,10 +3225,10 @@ future<> topology_coordinator::fence_previous_coordinator() { continue; } catch (raft::request_aborted&) { // Abort was requested. Break the loop - slogger.debug("raft topology: request to fence previous coordinator was aborted"); + rtlogger.debug("request to fence previous coordinator was aborted"); break; } catch (...) { - slogger.error("raft topology: failed to fence previous coordinator {}", std::current_exception()); + rtlogger.error("failed to fence previous coordinator {}", std::current_exception()); } try { co_await seastar::sleep_abortable(std::chrono::seconds(1), _as); @@ -3235,13 +3236,13 @@ future<> topology_coordinator::fence_previous_coordinator() { // Abort was requested. Break the loop break; } catch (...) { - slogger.debug("raft topology: sleep failed while fencing previous coordinator: {}", std::current_exception()); + rtlogger.debug("sleep failed while fencing previous coordinator: {}", std::current_exception()); } } } future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard) { - slogger.info("raft topology: start rolling back topology change"); + rtlogger.info("start rolling back topology change"); // Look for a node which operation should be aborted // (there should be one since we are in the rollback) node_to_work_on node = get_node_to_work_on(std::move(guard)); @@ -3268,7 +3269,7 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard state = node_state::rollback_to_normal; break; default: - on_internal_error(slogger, fmt::format("raft topology: tried to rollback in unsupported state {}", node.rs->state)); + on_internal_error(rtlogger, fmt::format("tried to rollback in unsupported state {}", node.rs->state)); } topology_mutation_builder builder(node.guard.write_timestamp()); @@ -3288,12 +3289,12 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard auto str = fmt::format("rollback {} after {} failure to state {} and setting cleanup flag", node.id, node.rs->state, state); - slogger.info("{}", str); + rtlogger.info("{}", str); co_await update_topology_state(std::move(node.guard), std::move(muts), str); } future<> topology_coordinator::run() { - slogger.info("raft topology: start topology coordinator fiber"); + rtlogger.info("start topology coordinator fiber"); auto abort = _as.subscribe([this] () noexcept { _topo_sm.event.broadcast(); @@ -3319,29 +3320,29 @@ future<> topology_coordinator::run() { bool had_work = co_await handle_topology_transition(std::move(guard)); if (!had_work) { // Nothing to work on. Wait for topology change event. - slogger.trace("raft topology: topology coordinator fiber has nothing to do. Sleeping."); + rtlogger.trace("topology coordinator fiber has nothing to do. Sleeping."); co_await await_event(); - slogger.trace("raft topology: topology coordinator fiber got an event"); + rtlogger.trace("topology coordinator fiber got an event"); } } catch (raft::request_aborted&) { - slogger.debug("raft topology: topology change coordinator fiber aborted"); + rtlogger.debug("topology change coordinator fiber aborted"); } catch (seastar::abort_requested_exception&) { - slogger.debug("raft topology: topology change coordinator fiber aborted"); + rtlogger.debug("topology change coordinator fiber aborted"); } catch (raft::commit_status_unknown&) { - slogger.warn("raft topology: topology change coordinator fiber got commit_status_unknown"); + rtlogger.warn("topology change coordinator fiber got commit_status_unknown"); } catch (group0_concurrent_modification&) { } catch (term_changed_error&) { // Term changed. We may no longer be a leader - slogger.debug("raft topology: topology change coordinator fiber notices term change {} -> {}", _term, _raft.get_current_term()); + rtlogger.debug("topology change coordinator fiber notices term change {} -> {}", _term, _raft.get_current_term()); } catch (...) { - slogger.error("raft topology: topology change coordinator fiber got error {}", std::current_exception()); + rtlogger.error("topology change coordinator fiber got error {}", std::current_exception()); sleep = true; } if (sleep) { try { co_await seastar::sleep_abortable(std::chrono::seconds(1), _as); } catch (...) { - slogger.debug("raft topology: sleep failed: {}", std::current_exception()); + rtlogger.debug("sleep failed: {}", std::current_exception()); } } co_await coroutine::maybe_yield(); @@ -3367,7 +3368,7 @@ future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded storage_service::raft_state_monitor_fiber(raft::server& raft, shardedrequest_abort(); // abort current coordinator if running @@ -3483,13 +3484,13 @@ public: {} future<> pre_server_start(const group0_info& g0_info) override { - slogger.info("raft topology: join: sending the join request to {}", g0_info.ip_addr); + rtlogger.info("join: sending the join request to {}", g0_info.ip_addr); auto result = co_await ser::join_node_rpc_verbs::send_join_node_request( &_ss._messaging.local(), netw::msg_addr(g0_info.ip_addr), g0_info.id, _req); std::visit(overloaded_functor { [this] (const join_node_request_result::ok&) { - slogger.info("raft topology: join: request to join placed, waiting" + rtlogger.info("join: request to join placed, waiting" " for the response from the topology coordinator"); _ss._join_node_request_done.set_value(); @@ -3512,7 +3513,7 @@ public: // deliver the rejection, it won't complete. In such a case, the // operator is responsible for shutting down the joining node. co_await _ss._join_node_response_done.get_shared_future(as); - slogger.info("raft topology: join: success"); + rtlogger.info("join: success"); co_return true; } }; @@ -3527,7 +3528,7 @@ future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_se throw std::runtime_error(::format("Cannot perform a replace operation because this is the first node in the cluster")); } - slogger.info("raft topology: adding myself as the first node to the topology"); + rtlogger.info("adding myself as the first node to the topology"); auto guard = co_await _group0->client().start_operation(&_group0_as); auto insert_join_request_mutations = build_mutation_from_join_params(params, guard); @@ -3545,7 +3546,7 @@ future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_se try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: bootstrap: concurrent operation is detected, retrying."); + rtlogger.info("bootstrap: concurrent operation is detected, retrying."); } } } @@ -3588,7 +3589,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft } while (true) { - slogger.info("raft topology: refreshing topology to check if it's synchronized with local metadata"); + rtlogger.info("refreshing topology to check if it's synchronized with local metadata"); auto guard = co_await _group0->client().start_operation(&_group0_as); @@ -3610,7 +3611,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft const auto unsafe_to_disable_features = _topology_state_machine._topology.calculate_not_yet_enabled_features(); _feature_service.check_features(enabled_features, unsafe_to_disable_features); - slogger.info("raft topology: updating topology with local metadata"); + rtlogger.info("updating topology with local metadata"); co_await _sys_ks.local().set_must_synchronize_topology(true); @@ -3627,7 +3628,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: update topology with local metadata:" + rtlogger.info("update topology with local metadata:" " concurrent operation is detected, retrying."); } } @@ -3901,7 +3902,7 @@ future<> storage_service::join_token_ring(shardedclient().get_group0_upgrade_state()).second; if (upgrade_state != group0_upgrade_state::use_post_raft_procedures) { - on_internal_error(slogger, "raft topology: cluster not upgraded to use group 0 after setup_group0"); + on_internal_error(rtlogger, "cluster not upgraded to use group 0 after setup_group0"); } co_return &_group0->group0_server(); } @@ -3916,7 +3917,7 @@ future<> storage_service::join_token_ring(sharded storage_service::raft_decommission() { throw std::runtime_error("Cannot decommission last node in the cluster"); } - slogger.info("raft topology: request decommission for: {}", raft_server.id()); + rtlogger.info("request decommission for: {}", raft_server.id()); topology_mutation_builder builder(guard.write_timestamp()); builder.with_node(raft_server.id()) .set("topology_request", topology_request::leave) @@ -5506,7 +5507,7 @@ future<> storage_service::raft_decommission() { try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: decommission: concurrent operation is detected, retrying."); + rtlogger.info("decommission: concurrent operation is detected, retrying."); continue; } break; @@ -5519,7 +5520,7 @@ future<> storage_service::raft_decommission() { co_await _gossiper.add_local_application_state({{ gms::application_state::STATUS, gms::versioned_value::left({}, _gossiper.now().time_since_epoch().count()) }}); } else { auto err = fmt::format("Decommission failed. See earlier errors ({})", error); - slogger.error("{}", err); + rtlogger.error("{}", err); throw std::runtime_error(err); } } @@ -5822,7 +5823,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list storage_service::raft_removenode(locator::host_id host_id, std::list storage_service::raft_removenode(locator::host_id host_id, std::listclient().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: removenode: concurrent operation is detected, retrying."); + rtlogger.info("removenode: concurrent operation is detected, retrying."); continue; } break; } - slogger.info("raft topology: removenode: wait for completion"); + rtlogger.info("removenode: wait for completion"); // Wait until request completes auto error = co_await wait_for_topology_request_completion(request_id); @@ -5868,11 +5869,11 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::listremove_from_raft_config(id); } catch (raft::not_a_member&) { - slogger.info("raft topology removenode: already removed from the raft config by the topology coordinator"); + rtlogger.info("removenode: already removed from the raft config by the topology coordinator"); } } else { auto err = fmt::format("Removenode failed. See earlier errors ({})", error); - slogger.error("{}", err); + rtlogger.error("{}", err); throw std::runtime_error(err); } } @@ -6441,7 +6442,7 @@ future<> storage_service::do_cluster_cleanup() { throw std::runtime_error(::format("local node is not in the normal state (current state: {})", rs.state)); } - slogger.info("raft topology: cluster cleanup requested"); + rtlogger.info("cluster cleanup requested"); topology_mutation_builder builder(guard.write_timestamp()); builder.set_global_topology_request(global_topology_request::cleanup); topology_change change{{builder.build()}}; @@ -6450,7 +6451,7 @@ future<> storage_service::do_cluster_cleanup() { try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: cleanup: concurrent operation is detected, retrying."); + rtlogger.info("cleanup: concurrent operation is detected, retrying."); continue; } break; @@ -6462,7 +6463,7 @@ future<> storage_service::do_cluster_cleanup() { return n.second.cleanup == cleanup_status::clean; }); }); - slogger.info("raft topology: cluster cleanup done"); + rtlogger.info("cluster cleanup done"); } future storage_service::wait_for_topology_request_completion(utils::UUID id) { @@ -6499,7 +6500,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) { throw std::runtime_error("Cannot rebuild a single node"); } - slogger.info("raft topology: request rebuild for: {}", raft_server.id()); + rtlogger.info("request rebuild for: {}", raft_server.id()); topology_mutation_builder builder(guard.write_timestamp()); builder.with_node(raft_server.id()) .set("topology_request", topology_request::rebuild) @@ -6516,7 +6517,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) { try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: rebuild: concurrent operation is detected, retrying."); + rtlogger.info("rebuild: concurrent operation is detected, retrying."); continue; } break; @@ -6533,7 +6534,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() { std::optional curr_gen; while (true) { - slogger.info("raft topology: request check_and_repair_cdc_streams, refreshing topology"); + rtlogger.info("request check_and_repair_cdc_streams, refreshing topology"); auto guard = co_await _group0->client().start_operation(&_group0_as); auto curr_req = _topology_state_machine._topology.global_request; if (curr_req && *curr_req != global_topology_request::new_cdc_generation) { @@ -6562,7 +6563,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() { try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: request check+repair CDC: concurrent operation is detected, retrying."); + rtlogger.info("request check+repair CDC: concurrent operation is detected, retrying."); continue; } break; @@ -7057,7 +7058,7 @@ future<> storage_service::snitch_reconfigured() { future storage_service::raft_topology_cmd_handler(raft::term_t term, uint64_t cmd_index, const raft_topology_cmd& cmd) { raft_topology_cmd_result result; - slogger.trace("raft topology: topology cmd rpc {} is called", cmd.cmd); + rtlogger.trace("topology cmd rpc {} is called", cmd.cmd); // The retrier does: // If no operation was previously started - start it now @@ -7067,16 +7068,16 @@ future storage_service::raft_topology_cmd_handler(raft auto retrier = [] (std::optional>& f, auto&& func) -> future<> { if (!f || f->failed()) { if (f) { - slogger.info("raft topology: retry streaming after previous attempt failed with {}", f->get_future().get_exception()); + rtlogger.info("retry streaming after previous attempt failed with {}", f->get_future().get_exception()); } else { - slogger.info("raft topology: start streaming"); + rtlogger.info("start streaming"); } f = func(); } else { - slogger.debug("raft topology: already streaming"); + rtlogger.debug("already streaming"); } co_await f.value().get_future(); - slogger.info("raft topology: streaming completed"); + rtlogger.info("streaming completed"); }; try { @@ -7133,7 +7134,7 @@ future storage_service::raft_topology_cmd_handler(raft ex = std::current_exception(); } if (ex) { - slogger.error("raft topology: feature check during barrier failed: {}", ex); + rtlogger.error("feature check during barrier failed: {}", ex); co_await drain(); break; } @@ -7145,7 +7146,7 @@ future storage_service::raft_topology_cmd_handler(raft case raft_topology_cmd::command::barrier_and_drain: { co_await container().invoke_on_all([version] (storage_service& ss) -> future<> { const auto current_version = ss._shared_token_metadata.get()->get_version(); - slogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}", + rtlogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}", version, current_version); // This shouldn't happen under normal operation, it's only plausible @@ -7166,7 +7167,7 @@ future storage_service::raft_topology_cmd_handler(raft co_await ss._shared_token_metadata.stale_versions_in_use(); co_await get_topology_session_manager().drain_closing_sessions(); - slogger.debug("raft_topology_cmd::barrier_and_drain done"); + rtlogger.debug("raft_topology_cmd::barrier_and_drain done"); }); result.status = raft_topology_cmd_result::command_status::success; } @@ -7176,7 +7177,7 @@ future storage_service::raft_topology_cmd_handler(raft auto tstate = _topology_state_machine._topology.tstate; if (!rs.ring || (tstate != topology::transition_state::write_both_read_old && rs.state != node_state::normal && rs.state != node_state::rebuilding)) { - slogger.warn("raft topology: got stream_ranges request while my tokens state is {} and node state is {}", tstate, rs.state); + rtlogger.warn("got stream_ranges request while my tokens state is {} and node state is {}", tstate, rs.state); break; } @@ -7210,7 +7211,7 @@ future storage_service::raft_topology_cmd_handler(raft } else { co_await retrier(_bootstrap_result, coroutine::lambda([&] () ->future<> { if (!_topology_state_machine._topology.req_param.contains(raft_server.id())) { - on_internal_error(slogger, ::format("Cannot find request_param for node id {}", raft_server.id())); + on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", raft_server.id())); } if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) { // FIXME: we should not need to translate ids to IPs here. See #6403. @@ -7218,7 +7219,7 @@ future storage_service::raft_topology_cmd_handler(raft for (const auto& id : std::get(_topology_state_machine._topology.req_param[raft_server.id()]).ignored_ids) { auto ip = _group0->address_map().find(id); if (!ip) { - on_fatal_internal_error(slogger, ::format("Cannot find a mapping from node id {} to its ip", id)); + on_fatal_internal_error(rtlogger, ::format("Cannot find a mapping from node id {} to its ip", id)); } ignored_ips.insert(*ip); } @@ -7250,11 +7251,11 @@ future storage_service::raft_topology_cmd_handler(raft // Find the node that is been removed auto it = boost::find_if(_topology_state_machine._topology.transition_nodes, [] (auto& e) { return e.second.state == node_state::removing; }); if (it == _topology_state_machine._topology.transition_nodes.end()) { - slogger.warn("raft topology: got stream_ranges request while my state is normal but cannot find a node that is been removed"); + rtlogger.warn("got stream_ranges request while my state is normal but cannot find a node that is been removed"); break; } auto id = it->first; - slogger.debug("raft topology: streaming to remove node {}", id); + rtlogger.debug("streaming to remove node {}", id); const auto& am = _group0->address_map(); auto ip = am.find(id); // map node id to ip assert (ip); // what to do if address is unknown? @@ -7267,14 +7268,14 @@ future storage_service::raft_topology_cmd_handler(raft }); if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) { if (!_topology_state_machine._topology.req_param.contains(id)) { - on_internal_error(slogger, ::format("Cannot find request_param for node id {}", id)); + on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", id)); } // FIXME: we should not need to translate ids to IPs here. See #6403. std::list ignored_ips; for (const auto& ignored_id : std::get(_topology_state_machine._topology.req_param[id]).ignored_ids) { auto ip = _group0->address_map().find(ignored_id); if (!ip) { - on_fatal_internal_error(slogger, ::format("Cannot find a mapping from node id {} to its ip", ignored_id)); + on_fatal_internal_error(rtlogger, ::format("Cannot find a mapping from node id {} to its ip", ignored_id)); } ignored_ips.push_back(*ip); } @@ -7289,7 +7290,7 @@ future storage_service::raft_topology_cmd_handler(raft break; case node_state::rebuilding: { auto source_dc = std::get(_topology_state_machine._topology.req_param[raft_server.id()]).source_dc; - slogger.info("raft topology: rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); + rtlogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); co_await retrier(_rebuild_result, [&] () -> future<> { auto tmptr = get_token_metadata_ptr(); if (is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) { @@ -7307,11 +7308,11 @@ future storage_service::raft_topology_cmd_handler(raft } try { co_await streamer->stream_async(); - slogger.info("raft topology: streaming for rebuild successful"); + rtlogger.info("streaming for rebuild successful"); } catch (...) { auto ep = std::current_exception(); // This is used exclusively through JMX, so log the full trace but only throw a simple RTE - slogger.warn("raft topology: error while rebuilding node: {}", ep); + rtlogger.warn("error while rebuilding node: {}", ep); std::rethrow_exception(std::move(ep)); } } @@ -7325,7 +7326,7 @@ future storage_service::raft_topology_cmd_handler(raft case node_state::none: case node_state::removing: case node_state::rollback_to_normal: - on_fatal_internal_error(slogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster", + on_fatal_internal_error(rtlogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster", raft_server.id(), rs.state)); break; } @@ -7340,17 +7341,17 @@ future storage_service::raft_topology_cmd_handler(raft ids.push_back(id); } } - slogger.debug("Got raft_topology_cmd::wait_for_ip, new nodes [{}]", ids); + rtlogger.debug("Got raft_topology_cmd::wait_for_ip, new nodes [{}]", ids); for (const auto& id: ids) { co_await wait_for_ip(id, _group0->address_map(), _abort_source); } - slogger.debug("raft_topology_cmd::wait_for_ip done [{}]", ids); + rtlogger.debug("raft_topology_cmd::wait_for_ip done [{}]", ids); result.status = raft_topology_cmd_result::command_status::success; break; } } } catch (...) { - slogger.error("raft topology: raft_topology_cmd failed with: {}", std::current_exception()); + rtlogger.error("raft_topology_cmd failed with: {}", std::current_exception()); } co_return result; } @@ -7427,7 +7428,7 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, co_await raft_server.read_barrier(&_group0_as); if (_tablet_ops.contains(tablet)) { - slogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet); + rtlogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet); co_await _tablet_ops[tablet].done.get_future(); co_return; } @@ -7450,10 +7451,10 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, try { co_await op(guard); p.set_value(); - slogger.debug("{} for tablet migration of {} successful", op_name, tablet); + rtlogger.debug("{} for tablet migration of {} successful", op_name, tablet); } catch (...) { p.set_exception(std::current_exception()); - slogger.warn("{} for tablet migration of {} failed: {}", op_name, tablet, std::current_exception()); + rtlogger.warn("{} for tablet migration of {} failed: {}", op_name, tablet, std::current_exception()); throw; } } @@ -7534,7 +7535,7 @@ future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) { } auto shard_opt = tmap.get_shard(tablet.tablet, tm->get_my_id()); if (!shard_opt) { - on_internal_error(slogger, format("Tablet {} has no shard on this node", tablet)); + on_internal_error(rtlogger, format("Tablet {} has no shard on this node", tablet)); } shard = *shard_opt; } @@ -7559,7 +7560,7 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator: auto guard = co_await _group0->client().start_operation(&_abort_source); while (_topology_state_machine._topology.is_busy()) { - slogger.debug("move_tablet(): topology state machine is busy"); + rtlogger.debug("move_tablet(): topology state machine is busy"); release_guard(std::move(guard)); co_await _topology_state_machine.event.wait(); guard = co_await _group0->client().start_operation(&_abort_source); @@ -7603,15 +7604,15 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator: .build())); sstring reason = format("Moving tablet {} from {} to {}", gid, src, dst); - slogger.info("raft topology: {}", reason); - slogger.trace("raft topology: do update {} reason {}", updates, reason); + rtlogger.info("{}", reason); + rtlogger.trace("do update {} reason {}", updates, reason); topology_change change{std::move(updates)}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason); try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); break; } catch (group0_concurrent_modification&) { - slogger.debug("move_tablet(): concurrent modification, retrying"); + rtlogger.debug("move_tablet(): concurrent modification, retrying"); } } @@ -7636,7 +7637,7 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) { group0_guard guard = co_await _group0->client().start_operation(&_abort_source); while (_topology_state_machine._topology.is_busy()) { - slogger.debug("set_tablet_balancing_enabled(): topology is busy"); + rtlogger.debug("set_tablet_balancing_enabled(): topology is busy"); release_guard(std::move(guard)); co_await _topology_state_machine.event.wait(); guard = co_await _group0->client().start_operation(&_abort_source); @@ -7648,21 +7649,21 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) { .build())); sstring reason = format("Setting tablet balancing to {}", enabled); - slogger.info("raft topology: {}", reason); + rtlogger.info("{}", reason); topology_change change{std::move(updates)}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason); try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); break; } catch (group0_concurrent_modification&) { - slogger.debug("set_tablet_balancing_enabled(): concurrent modification"); + rtlogger.debug("set_tablet_balancing_enabled(): concurrent modification"); } } } future storage_service::join_node_request_handler(join_node_request_params params) { join_node_request_result result; - slogger.info("raft topology: received request to join from host_id: {}", params.host_id); + rtlogger.info("received request to join from host_id: {}", params.host_id); if (params.cluster_name != _db.local().get_config().cluster_name()) { result.result = join_node_request_result::rejected{ @@ -7721,7 +7722,7 @@ future storage_service::join_node_request_handler(join const auto timeout = std::chrono::seconds(10); - slogger.warn("raft topology: the node {} which was requested to be" + rtlogger.warn("the node {} which was requested to be" " replaced has the same ID as the current group 0 leader ({});" " this looks like an attempt to join a node with the same IP" " as a leader which might have just crashed; waiting for" @@ -7743,7 +7744,7 @@ future storage_service::join_node_request_handler(join co_await sleep_abortable(std::chrono::milliseconds(100), as); } } catch (abort_requested_exception&) { - slogger.warn("raft topology: the node {} tries to replace the" + rtlogger.warn("the node {} tries to replace the" " current leader {} but the leader didn't change within" " {}s. Rejecting the node", params.host_id, @@ -7771,7 +7772,7 @@ future storage_service::join_node_request_handler(join if (const auto *p = _topology_state_machine._topology.find(params.host_id)) { const auto& rs = p->second; if (rs.state == node_state::left) { - slogger.warn("raft topology: the node {} attempted to join", + rtlogger.warn("the node {} attempted to join", " but it was removed from the cluster. Rejecting" " the node", params.host_id); @@ -7779,7 +7780,7 @@ future storage_service::join_node_request_handler(join .reason = "The node has already been removed from the cluster", }; } else { - slogger.warn("raft topology: the node {} attempted to join", + rtlogger.warn("the node {} attempted to join", " again after an unfinished attempt but it is no longer" " allowed to do so. Rejecting the node", params.host_id); @@ -7800,11 +7801,11 @@ future storage_service::join_node_request_handler(join co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); break; } catch (group0_concurrent_modification&) { - slogger.info("raft topology: join_node_request: concurrent operation is detected, retrying."); + rtlogger.info("join_node_request: concurrent operation is detected, retrying."); } } - slogger.info("raft topology: placed join request for {}", params.host_id); + rtlogger.info("placed join request for {}", params.host_id); // Success result.result = join_node_request_result::ok {}; @@ -7828,7 +7829,7 @@ future storage_service::join_node_response_handler(jo if (_join_node_response_done.available()) { // We already handled this RPC. No need to retry it. - slogger.info("raft topology: the node got join_node_response RPC for the second time, ignoring"); + rtlogger.info("the node got join_node_response RPC for the second time, ignoring"); if (std::holds_alternative(params.response) && _join_node_response_done.failed()) { @@ -7903,7 +7904,7 @@ future storage_service::join_node_response_handler(jo } static logger::rate_limit rate_limit{std::chrono::seconds(1)}; - slogger.log(log_level::warn, rate_limit, "raft topology: cannot map nodes {} to ips, retrying.", + rtlogger.log(log_level::warn, rate_limit, "cannot map nodes {} to ips, retrying.", untranslated_ids); co_await sleep_abortable(std::chrono::milliseconds(5), _group0_as); @@ -7912,11 +7913,11 @@ future storage_service::join_node_response_handler(jo } } - slogger.info("raft topology: coordinator accepted request to join, " + rtlogger.info("coordinator accepted request to join, " "waiting for nodes {} to be alive before responding and continuing", sync_nodes); co_await _gossiper.wait_alive(sync_nodes, wait_for_live_nodes_timeout); - slogger.info("raft topology: nodes {} are alive", sync_nodes); + rtlogger.info("nodes {} are alive", sync_nodes); // Unblock waiting join_node_rpc_handshaker::post_server_start, // which will start the raft server and continue @@ -7934,7 +7935,7 @@ future storage_service::join_node_response_handler(jo }, params.response); } catch (...) { auto eptr = std::current_exception(); - slogger.warn("raft topology: error while handling the join response from the topology coordinator. " + rtlogger.warn("error while handling the join response from the topology coordinator. " "The node will not join the cluster. Error: {}", eptr); _join_node_response_done.set_exception(std::move(eptr)); diff --git a/test/topology/test_automatic_cleanup.py b/test/topology/test_automatic_cleanup.py index db5ada359e..7ae5e5a91a 100644 --- a/test/topology/test_automatic_cleanup.py +++ b/test/topology/test_automatic_cleanup.py @@ -26,21 +26,21 @@ async def test_no_cleanup_when_unnecessary(request, manager: ManagerClient): marks = [await log.mark() for log in logs] await manager.server_add() await manager.server_add() - matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 0 servers = await manager.running_servers() logs = [await manager.server_open_log(srv.server_id) for srv in servers] marks = [await log.mark() for log in logs] await manager.decommission_node(servers[4].server_id) - matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 4 servers = await manager.running_servers() logs = [await manager.server_open_log(srv.server_id) for srv in servers] marks = [await log.mark() for log in logs] await manager.decommission_node(servers[3].server_id) - matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 0 await manager.server_add() @@ -48,11 +48,11 @@ async def test_no_cleanup_when_unnecessary(request, manager: ManagerClient): logs = [await manager.server_open_log(srv.server_id) for srv in servers] marks = [await log.mark() for log in logs] await manager.api.client.post("/storage_service/cleanup_all", servers[0].ip_addr) - matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 3 marks = [await log.mark() for log in logs] await manager.decommission_node(servers[3].server_id) - matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 0 diff --git a/test/topology/test_coordinator_queue_management.py b/test/topology/test_coordinator_queue_management.py index 6fc1c4ca96..1c451b2e42 100644 --- a/test/topology/test_coordinator_queue_management.py +++ b/test/topology/test_coordinator_queue_management.py @@ -40,7 +40,7 @@ async def test_coordinator_queue_management(manager: ManagerClient): done, pending = await asyncio.wait(search, return_when = asyncio.FIRST_COMPLETED) for t in pending: t.cancel() - [await l.wait_for("raft topology: removenode: wait for completion", m) for l, m in zip(logs[:2], marks[:2])] + [await l.wait_for("raft_topology - removenode: wait for completion", m) for l, m in zip(logs[:2], marks[:2])] [await manager.api.message_injection(s.ip_addr, inj) for s in servers[:3]] @@ -61,7 +61,7 @@ async def test_coordinator_queue_management(manager: ManagerClient): done, pending = await asyncio.wait(search, return_when = asyncio.FIRST_COMPLETED) for t in pending: t.cancel() - logs[1].wait_for("raft topology: decommission: wait for completion", marks[1]) + logs[1].wait_for("raft_topology - decommission: wait for completion", marks[1]) [await manager.api.message_injection(s.ip_addr, inj) for s in servers[:3]] diff --git a/test/topology/test_topology_failure_recovery.py b/test/topology/test_topology_failure_recovery.py index dc1d3cf6d6..17c5cb4188 100644 --- a/test/topology/test_topology_failure_recovery.py +++ b/test/topology/test_topology_failure_recovery.py @@ -27,7 +27,7 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.decommission_node(servers[2].server_id, expected_error="Decommission failed. See earlier errors") servers = await manager.running_servers() assert len(servers) == 3 - matches = [await log.grep("storage_service - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 # remove failure marks = [await log.mark() for log in logs] @@ -36,7 +36,7 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.server_stop_gracefully(servers[3].server_id) await manager.api.enable_injection(servers[2].ip_addr, 'stream_ranges_fail', one_shot=True) await manager.remove_node(servers[0].server_id, servers[3].server_id, expected_error="Removenode failed. See earlier errors") - matches = [await log.grep("storage_service - rollback.*after removing failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after removing failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 await manager.server_start(servers[3].server_id) await manager.servers_see_each_other(servers) @@ -49,7 +49,7 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors") servers = await manager.running_servers() assert s not in servers - matches = [await log.grep("storage_service - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 # bootstrap failure in raft barrier marks = [await log.mark() for log in logs] @@ -59,7 +59,7 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors") servers = await manager.running_servers() assert s not in servers - matches = [await log.grep("storage_service - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 # replace failure marks = [await log.mark() for log in logs] @@ -72,5 +72,5 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.server_start(s.server_id, expected_error="Replace failed. See earlier errors") servers = await manager.running_servers() assert s not in servers - matches = [await log.grep("storage_service - rollback.*after replacing failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after replacing failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 diff --git a/test/topology_custom/test_remove_alive_node.py b/test/topology_custom/test_remove_alive_node.py index 84e4759af3..7660fca7cd 100644 --- a/test/topology_custom/test_remove_alive_node.py +++ b/test/topology_custom/test_remove_alive_node.py @@ -44,4 +44,4 @@ async def test_removing_alive_node_fails(manager: ManagerClient) -> None: # topology_coordinator::handle_node_transition). logging.info(f"Removing {srv3} initiated by {srv2}") await manager.remove_node(srv2.server_id, srv3.server_id, [], "Removenode failed. See earlier errors", False) - await log_file1.wait_for("raft topology: rejected removenode operation for node", timeout=60) + await log_file1.wait_for("raft_topology - rejected removenode operation for node", timeout=60) diff --git a/test/topology_custom/test_topology_failure_recovery.py b/test/topology_custom/test_topology_failure_recovery.py index c8956ae9bf..b73766a3b8 100644 --- a/test/topology_custom/test_topology_failure_recovery.py +++ b/test/topology_custom/test_topology_failure_recovery.py @@ -38,7 +38,7 @@ async def test_tablet_drain_failure_during_decommission(manager: ManagerClient): await manager.decommission_node(servers[2].server_id, expected_error="Decommission failed. See earlier errors") - matches = [await log.grep("storage_service - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 await cql.run_async("DROP KEYSPACE test;") diff --git a/test/topology_custom/test_topology_smp.py b/test/topology_custom/test_topology_smp.py index 2f90043864..1251c90054 100644 --- a/test/topology_custom/test_topology_smp.py +++ b/test/topology_custom/test_topology_smp.py @@ -50,7 +50,7 @@ async def test_nodes_with_different_smp(request: FixtureRequest, manager: Manage log_args = [ '--default-log-level', 'debug', '--logger-log-level', 'raft_group0=trace:group0_client=trace:storage_service=trace' - ':raft=trace:raft_group_registry=trace' + ':raft=trace:raft_group_registry=trace:raft_topology=trace' ] logger.info(f'Adding --smp=3 server') diff --git a/test/topology_experimental_raft/test_cdc_generation_clearing.py b/test/topology_experimental_raft/test_cdc_generation_clearing.py index cb47d22f53..7174e35e65 100644 --- a/test/topology_experimental_raft/test_cdc_generation_clearing.py +++ b/test/topology_experimental_raft/test_cdc_generation_clearing.py @@ -26,7 +26,7 @@ async def test_current_cdc_generation_is_not_removed(manager: ManagerClient): # We enable the injection to ensure that a too-late timestamp does not prevent removing the CDC generation. logger.info("Bootstrapping first node") server = await manager.server_add( - cmdline=['--logger-log-level', 'storage_service=trace'], + cmdline=['--logger-log-level', 'storage_service=trace:raft_topology=trace'], config={'error_injections_at_startup': ['clean_obsolete_cdc_generations_ignore_ts']} ) @@ -53,7 +53,7 @@ async def test_dependency_on_timestamps(manager: ManagerClient): and the topology coordinator's clock is too small. Then, test that the CDC generation publisher removes the clean-up candidate (together with older generations) if its timestamp is old enough.""" logger.info("Bootstrapping first node") - servers = [await manager.server_add(cmdline=['--logger-log-level', 'storage_service=trace'])] + servers = [await manager.server_add(cmdline=['--logger-log-level', 'storage_service=trace:raft_topology=trace'])] log_file1 = await manager.server_open_log(servers[0].server_id) mark: Optional[int] = None diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index 5259773455..292febeef9 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -79,6 +79,7 @@ async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(m '--logger-log-level', 'query_processor=trace', '--logger-log-level', 'gossip=trace', '--logger-log-level', 'storage_service=trace', + '--logger-log-level', 'raft_topology=trace', '--logger-log-level', 'messaging_service=trace', '--logger-log-level', 'rpc=trace', ] @@ -237,6 +238,7 @@ async def test_streaming_is_guarded_by_topology_guard(manager: ManagerClient): logger.info("Bootstrapping cluster") cmdline = [ '--logger-log-level', 'storage_service=trace', + '--logger-log-level', 'raft_topology=trace', ] servers = [await manager.server_add(cmdline=cmdline)] From ae25f703c45972e0b96876f12e22c91bf099aa2a Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 11 Jan 2024 15:38:12 +0100 Subject: [PATCH 2/6] raft topology: INFO log when executing global commands and updating topology state Those are rare control plane events, but useful for debugging e.g. if topology coordinator gets stuck at some point. --- service/storage_service.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index aaa6256cc0..e5303c5c6a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1455,7 +1455,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { future<> update_topology_state( group0_guard guard, std::vector&& updates, const sstring& reason) { try { - rtlogger.trace("do update {} reason {}", updates, reason); + rtlogger.info("updating topology state: {}", reason); + rtlogger.trace("update_topology_state mutations: {}", updates); topology_change change{std::move(updates)}; group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as); @@ -1544,6 +1545,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { group0_guard guard, const raft_topology_cmd& cmd, const std::unordered_set& exclude_nodes, drop_guard_and_retake drop_and_retake = drop_guard_and_retake::yes) { + rtlogger.info("executing global topology command {}, excluded nodes: {}", cmd.cmd, exclude_nodes); auto nodes = _topo_sm._topology.normal_nodes | boost::adaptors::filtered([&exclude_nodes] (const std::pair& n) { return !exclude_nodes.contains(n.first); From aeb53ea31d8a90041dc56699c251dd4f0314dc65 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 11 Jan 2024 15:42:48 +0100 Subject: [PATCH 3/6] raft topology: don't include null ID in exclude_nodes Observed with newly added logs: ``` raft topology - executing global topology command barrier_and_drain, excluded nodes: {00000000-0000-0000-0000-000000000000} ``` --- service/storage_service.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index e5303c5c6a..30786e20ea 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1563,7 +1563,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { std::unordered_set get_excluded_nodes(raft::server_id id, const std::optional& req, const std::optional& req_param) { auto exclude_nodes = parse_ignore_nodes(req_param); - exclude_nodes.insert(parse_replaced_node(req_param)); + if (auto replaced_node = parse_replaced_node(req_param)) { + exclude_nodes.insert(replaced_node); + } if (req && *req == topology_request::remove) { exclude_nodes.insert(id); } From 92e6604127c1bfe71cca23a7edb4d554045a7515 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 11 Jan 2024 16:01:48 +0100 Subject: [PATCH 4/6] raft topology: log when entering transition states Those are rare control plane events, but might be useful when debugging problems with topology coordinator (e.g. where it got stuck). --- service/storage_service.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/service/storage_service.cc b/service/storage_service.cc index 30786e20ea..e3c9c888a1 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2372,6 +2372,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_return false; } + rtlogger.info("entered `{}` transition state", *tstate); switch (*tstate) { case topology::transition_state::join_group0: { auto [node, accepted] = co_await finish_accepting_node(get_node_to_work_on(std::move(guard))); From 52e67ca121946b8c1169ff13b1726927b05a64a3 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 11 Jan 2024 16:21:10 +0100 Subject: [PATCH 5/6] raft topology: increase level of some TRACE messages Increased them to DEBUG level, and in one case to WARN (inside an exception handler). The selected messages are still relatively rare (per-node per-transition control plane events, plus events such as fibers sleeping and waking up) although more low level. They are also small messages. Messages that are large such as those which print all tokens of nodes or large mutations are left on TRACE level. The plan is to enable DEBUG level logging in test.py tests for raft_topology, while not spamming the logs completely such as by printing large mutations. --- service/storage_service.cc | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index e3c9c888a1..cbd95e7644 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1294,11 +1294,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (!to_remove.empty()) { // Remove from group 0 nodes that left. They may failed to do so by themselves try { - rtlogger.trace("topology coordinator fiber removing {}" + rtlogger.debug("topology coordinator fiber removing {}" " from raft since they are in `left` state", to_remove); co_await _group0.group0_server().modify_config({}, to_remove, &_as); } catch (const raft::commit_status_unknown&) { - rtlogger.trace("topology coordinator fiber got unknown status" + rtlogger.warn("topology coordinator fiber got commit_status_unknown status" " while removing {} from raft", to_remove); } } @@ -1507,7 +1507,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_await coroutine::exception(std::make_exception_ptr( std::runtime_error(::format("no ip address mapping for {}", id)))); } - rtlogger.trace("send {} command with term {} and index {} to {}/{}", + rtlogger.debug("send {} command with term {} and index {} to {}/{}", cmd.cmd, _term, cmd_index, id, *ip); auto result = _db.get_token_metadata().get_topology().is_me(*ip) ? co_await _raft_topology_cmd_handler(_term, cmd_index, cmd) : @@ -1785,7 +1785,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // // It also continually cleans the obsolete CDC generation data. future<> cdc_generation_publisher_fiber() { - rtlogger.trace("start CDC generation publisher fiber"); + rtlogger.debug("start CDC generation publisher fiber"); while (!_as.abort_requested()) { co_await utils::get_local_injector().inject_with_handler("cdc_generation_publisher_fiber", [] (auto& handler) -> future<> { @@ -1812,11 +1812,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (_topo_sm._topology.unpublished_cdc_generations.empty()) { // No CDC generations to publish. Wait until one appears or the topology coordinator aborts. - rtlogger.trace("CDC generation publisher fiber has nothing to do. Sleeping."); + rtlogger.debug("CDC generation publisher fiber has nothing to do. Sleeping."); co_await _topo_sm.event.when([&] () { return !_topo_sm._topology.unpublished_cdc_generations.empty() || _as.abort_requested(); }); - rtlogger.trace("CDC generation publisher fiber wakes up"); + rtlogger.debug("CDC generation publisher fiber wakes up"); } } catch (raft::request_aborted&) { rtlogger.debug("CDC generation publisher fiber aborted"); @@ -1989,7 +1989,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (!holder || holder->failed()) { holder = futurize_invoke(action) .finally([this, g = _async_gate.hold(), gid, name] () noexcept { - rtlogger.trace("{} for tablet {} resolved.", name, gid); + rtlogger.debug("{} for tablet {} resolved.", name, gid); _tablets_ready = true; _topo_sm.event.broadcast(); }); @@ -1997,7 +1997,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } if (!holder->available()) { - rtlogger.trace("Tablet {} still doing {}", gid, name); + rtlogger.debug("Tablet {} still doing {}", gid, name); return false; } @@ -2052,7 +2052,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // If progress cannot be made, e.g. because all transitions are streaming, we block // and wait for notification. - rtlogger.trace("handle_tablet_migration()"); + rtlogger.debug("handle_tablet_migration()"); std::vector updates; bool needs_barrier = false; bool has_transitions = false; @@ -2079,7 +2079,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { }; auto transition_to = [&] (locator::tablet_transition_stage stage) { - rtlogger.trace("Will set tablet {} stage to {}", gid, stage); + rtlogger.debug("Will set tablet {} stage to {}", gid, stage); updates.emplace_back(get_mutation_builder() .set_stage(last_token, stage) .build()); @@ -2101,7 +2101,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { switch (trinfo.stage) { case locator::tablet_transition_stage::allow_write_both_read_old: if (do_barrier()) { - rtlogger.trace("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_old); + rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_old); updates.emplace_back(get_mutation_builder() .set_stage(last_token, locator::tablet_transition_stage::write_both_read_old) // Create session a bit earlier to avoid adding barrier @@ -2127,7 +2127,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging, netw::msg_addr(id2ip(dst)), _as, raft::server_id(dst.uuid()), gid); })) { - rtlogger.trace("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_new); + rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_new); updates.emplace_back(get_mutation_builder() .set_stage(last_token, locator::tablet_transition_stage::write_both_read_new) .del_session(last_token) @@ -2220,7 +2220,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // Streaming may have finished after we checked. To avoid missed notification, we need // to check atomically with event.wait() if (!_tablets_ready) { - rtlogger.trace("Going to sleep with active tablet transitions"); + rtlogger.debug("Going to sleep with active tablet transitions"); release_guard(std::move(guard)); co_await await_event(); } @@ -3325,9 +3325,9 @@ future<> topology_coordinator::run() { bool had_work = co_await handle_topology_transition(std::move(guard)); if (!had_work) { // Nothing to work on. Wait for topology change event. - rtlogger.trace("topology coordinator fiber has nothing to do. Sleeping."); + rtlogger.debug("topology coordinator fiber has nothing to do. Sleeping."); co_await await_event(); - rtlogger.trace("topology coordinator fiber got an event"); + rtlogger.debug("topology coordinator fiber got an event"); } } catch (raft::request_aborted&) { rtlogger.debug("topology change coordinator fiber aborted"); @@ -7063,7 +7063,7 @@ future<> storage_service::snitch_reconfigured() { future storage_service::raft_topology_cmd_handler(raft::term_t term, uint64_t cmd_index, const raft_topology_cmd& cmd) { raft_topology_cmd_result result; - rtlogger.trace("topology cmd rpc {} is called", cmd.cmd); + rtlogger.debug("topology cmd rpc {} is called", cmd.cmd); // The retrier does: // If no operation was previously started - start it now From e4918c0d31e67522baef9a65adc60af6858764b1 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 11 Jan 2024 16:43:16 +0100 Subject: [PATCH 6/6] test/pylib: scylla_cluster: enable raft_topology=debug level by default To help debugging test.py failures in CI. --- test/pylib/scylla_cluster.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index b6f1177583..953de5b69f 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -129,7 +129,8 @@ SCYLLA_CMDLINE_OPTIONS = [ '--abort-on-lsa-bad-alloc', '1', '--abort-on-seastar-bad-alloc', '--abort-on-internal-error', '1', - '--abort-on-ebadf', '1' + '--abort-on-ebadf', '1', + '--logger-log-level', 'raft_topology=debug', ] # [--smp, 1], [--smp, 2] -> [--smp, 2]