diff --git a/service/storage_service.cc b/service/storage_service.cc index b2a70c29f4..1050834f9a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -102,6 +102,7 @@ namespace db { namespace service { static logging::logger slogger("storage_service"); +static logging::logger rtlogger("raft_topology"); static thread_local session_manager topology_session_manager; @@ -302,7 +303,7 @@ static future wait_for_ip(raft::server_id id, const raft_address_m id, std::chrono::duration_cast(timeout).count())))); } static thread_local logger::rate_limit rate_limit{std::chrono::seconds(1)}; - slogger.log(log_level::warn, rate_limit, "raft topology: cannot map {} to ip, retrying.", id); + rtlogger.log(log_level::warn, rate_limit, "cannot map {} to ip, retrying.", id); co_await sleep_abortable(std::chrono::milliseconds(5), as); } } @@ -360,7 +361,7 @@ static locator::node::state to_topology_node_state(node_state ns) { case node_state::rebuilding: return locator::node::state::normal; case node_state::none: return locator::node::state::none; } - on_internal_error(slogger, format("unhandled node state: {}", ns)); + on_internal_error(rtlogger, format("unhandled node state: {}", ns)); } // Synchronizes the local node state (token_metadata, system.peers/system.local tables, @@ -395,7 +396,7 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm locator::host_id host_id{id.uuid()}; auto ip = am.find(id); - slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}", + rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}", id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup); // Save tokens, not needed for raft topology management, but needed by legacy // Also ip -> id mapping is needed for address map recreation on reboot @@ -427,7 +428,7 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm locator::host_id host_id{id.uuid()}; auto ip = am.find(id); - slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}", + rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}", id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, seastar::value_of([&] () -> sstring { return rs.ring ? ::format("{}", rs.ring->tokens) : sstring("null"); @@ -469,7 +470,7 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm auto existing_ip = am.find(replaced_id); if (!existing_ip) { // FIXME: What if not known? - on_fatal_internal_error(slogger, ::format("Cannot map id of a node being replaced {} to its ip", replaced_id)); + on_fatal_internal_error(rtlogger, ::format("Cannot map id of a node being replaced {} to its ip", replaced_id)); } assert(existing_ip); const auto replaced_host_id = locator::host_id(replaced_id.uuid()); @@ -491,7 +492,7 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm co_await process_normal_node(id, rs); break; default: - on_fatal_internal_error(slogger, ::format("Unexpected state {} for node {}", rs.state, id)); + on_fatal_internal_error(rtlogger, ::format("Unexpected state {} for node {}", rs.state, id)); } }; @@ -534,7 +535,7 @@ future<> storage_service::topology_state_load() { co_return; } - slogger.debug("raft topology: reload raft topology state"); + rtlogger.debug("reload raft topology state"); // read topology state from disk and recreate token_metadata from it _topology_state_machine._topology = co_await _sys_ks.local().load_topology_state(); @@ -615,7 +616,7 @@ future<> storage_service::topology_state_load() { } if (auto gen_id = _topology_state_machine._topology.current_cdc_generation_id) { - slogger.debug("topology_state_load: current CDC generation ID: {}", *gen_id); + rtlogger.debug("topology_state_load: current CDC generation ID: {}", *gen_id); co_await _cdc_gens.local().handle_cdc_generation(*gen_id); } } @@ -1144,7 +1145,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, shardeddone(); } catch (...) { - slogger.error("raft topology: cleanup failed keyspace={} tables={} failed: {}", task->get_status().keyspace, table_infos, std::current_exception()); + rtlogger.error("cleanup failed keyspace={} tables={} failed: {}", task->get_status().keyspace, table_infos, std::current_exception()); throw; } }; @@ -1155,11 +1156,11 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, shardedsecond.cleanup != cleanup_status::running) { - slogger.trace("raft topology: cleanup triggered, but not needed"); + rtlogger.trace("cleanup triggered, but not needed"); continue; } - slogger.info("raft topology: start cleanup"); + rtlogger.info("start cleanup"); auto keyspaces = _db.local().get_all_keyspaces(); @@ -1186,7 +1187,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, shardedclient().start_operation(&_group0_as); @@ -1199,20 +1200,20 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, shardedclient().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: cleanup flag clearing: concurrent operation is detected, retrying."); + rtlogger.info("cleanup flag clearing: concurrent operation is detected, retrying."); continue; } break; } - slogger.debug("raft topology: cleanup flag cleared"); + rtlogger.debug("cleanup flag cleared"); } catch (const seastar::abort_requested_exception &) { - slogger.info("raft topology: cleanup fiber aborted"); + rtlogger.info("cleanup fiber aborted"); break; } catch (raft::request_aborted&) { - slogger.info("raft topology: cleanup fiber aborted"); + rtlogger.info("cleanup fiber aborted"); break; } catch (...) { - slogger.error("raft topology: cleanup fiber got an error: {}", std::current_exception()); + rtlogger.error("cleanup fiber got an error: {}", std::current_exception()); err = true; } if (err) { @@ -1293,11 +1294,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (!to_remove.empty()) { // Remove from group 0 nodes that left. They may failed to do so by themselves try { - slogger.trace("raft topology: topology coordinator fiber removing {}" + rtlogger.debug("topology coordinator fiber removing {}" " from raft since they are in `left` state", to_remove); co_await _group0.group0_server().modify_config({}, to_remove, &_as); } catch (const raft::commit_status_unknown&) { - slogger.trace("raft topology: topology coordinator fiber got unknown status" + rtlogger.warn("topology coordinator fiber got commit_status_unknown status" " while removing {} from raft", to_remove); } } @@ -1387,7 +1388,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (!next_req) { // We did not find a request that has enough live node to proceed // Cancel all requests to let admin know that no operation can succeed - slogger.warn("topology coordinator: cancel request queue because no request can proceed. Dead nodes: {}", dead_nodes); + rtlogger.warn("topology coordinator: cancel request queue because no request can proceed. Dead nodes: {}", dead_nodes); return cancel_requests{std::move(guard), std::move(dead_nodes)}; } @@ -1405,8 +1406,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto& topo = _topo_sm._topology; if (topo.transition_nodes.empty()) { - on_internal_error(slogger, ::format( - "raft topology: could not find node to work on" + on_internal_error(rtlogger, ::format( + "could not find node to work on" " even though the state requires it (state: {})", topo.tstate)); } @@ -1454,12 +1455,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { future<> update_topology_state( group0_guard guard, std::vector&& updates, const sstring& reason) { try { - slogger.trace("raft topology: do update {} reason {}", updates, reason); + rtlogger.info("updating topology state: {}", reason); + rtlogger.trace("update_topology_state mutations: {}", updates); topology_change change{std::move(updates)}; group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: race while changing state: {}. Retrying", reason); + rtlogger.info("race while changing state: {}. Retrying", reason); throw; } }; @@ -1499,13 +1501,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { future<> exec_direct_command_helper(raft::server_id id, uint64_t cmd_index, const raft_topology_cmd& cmd) { auto ip = _address_map.find(id); if (!ip) { - slogger.warn("raft topology: cannot send command {} with term {} and index {} " + rtlogger.warn("cannot send command {} with term {} and index {} " "to {} because mapping to ip is not available", cmd.cmd, _term, cmd_index, id); co_await coroutine::exception(std::make_exception_ptr( std::runtime_error(::format("no ip address mapping for {}", id)))); } - slogger.trace("raft topology: send {} command with term {} and index {} to {}/{}", + rtlogger.debug("send {} command with term {} and index {} to {}/{}", cmd.cmd, _term, cmd_index, id, *ip); auto result = _db.get_token_metadata().get_topology().is_me(*ip) ? co_await _raft_topology_cmd_handler(_term, cmd_index, cmd) : @@ -1543,6 +1545,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { group0_guard guard, const raft_topology_cmd& cmd, const std::unordered_set& exclude_nodes, drop_guard_and_retake drop_and_retake = drop_guard_and_retake::yes) { + rtlogger.info("executing global topology command {}, excluded nodes: {}", cmd.cmd, exclude_nodes); auto nodes = _topo_sm._topology.normal_nodes | boost::adaptors::filtered([&exclude_nodes] (const std::pair& n) { return !exclude_nodes.contains(n.first); @@ -1560,7 +1563,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { std::unordered_set get_excluded_nodes(raft::server_id id, const std::optional& req, const std::optional& req_param) { auto exclude_nodes = parse_ignore_nodes(req_param); - exclude_nodes.insert(parse_replaced_node(req_param)); + if (auto replaced_node = parse_replaced_node(req_param)) { + exclude_nodes.insert(replaced_node); + } if (req && *req == topology_request::remove) { exclude_nodes.insert(id); } @@ -1577,9 +1582,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { }; future<> remove_from_group0(const raft::server_id& id) { - slogger.info("raft topology: removing node {} from group 0 configuration...", id); + rtlogger.info("removing node {} from group 0 configuration...", id); co_await _group0.remove_from_raft_config(id); - slogger.info("raft topology: node {} removed from group 0 configuration", id); + rtlogger.info("node {} removed from group 0 configuration", id); } future<> step_down_as_nonvoter() { @@ -1617,15 +1622,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (!ep) { // get_sharding_info is only called for bootstrap tokens // or for tokens present in token_metadata - on_internal_error(slogger, ::format( - "raft topology: make_new_cdc_generation_data: get_sharding_info:" + on_internal_error(rtlogger, ::format( + "make_new_cdc_generation_data: get_sharding_info:" " can't find endpoint for token {}", end)); } auto ptr = _topo_sm._topology.find(raft::server_id{ep->uuid()}); if (!ptr) { - on_internal_error(slogger, ::format( - "raft topology: make_new_cdc_generation_data: get_sharding_info:" + on_internal_error(rtlogger, ::format( + "make_new_cdc_generation_data: get_sharding_info:" " couldn't find node {} in topology, owner of token {}", *ep, end)); } @@ -1673,7 +1678,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto [gen_uuid, gen_mutations] = co_await prepare_new_cdc_generation_data(tmptr, guard, binfo); if (gen_mutations.empty()) { - on_internal_error(slogger, "cdc_generation_data: gen_mutations is empty"); + on_internal_error(rtlogger, "cdc_generation_data: gen_mutations is empty"); } std::vector updates{gen_mutations.begin(), gen_mutations.end()}; @@ -1685,7 +1690,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto const reason = format( "insert CDC generation data (UUID: {}), part", gen_uuid); - slogger.trace("raft topology: do update {} reason {}", m, reason); + rtlogger.trace("do update {} reason {}", m, reason); write_mutations change{{std::move(m)}}; group0_command g0_cmd = _group0.client().prepare_command(std::move(change), reason); return _group0.client().add_entry_unguarded(std::move(g0_cmd), &_as); @@ -1780,13 +1785,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // // It also continually cleans the obsolete CDC generation data. future<> cdc_generation_publisher_fiber() { - slogger.trace("raft topology: start CDC generation publisher fiber"); + rtlogger.debug("start CDC generation publisher fiber"); while (!_as.abort_requested()) { co_await utils::get_local_injector().inject_with_handler("cdc_generation_publisher_fiber", [] (auto& handler) -> future<> { - slogger.info("raft toplogy: CDC generation publisher fiber sleeps after injection"); + rtlogger.info("CDC generation publisher fiber sleeps after injection"); co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); - slogger.info("raft toplogy: CDC generation publisher fiber finishes sleeping after injection"); + rtlogger.info("CDC generation publisher fiber finishes sleeping after injection"); }); bool sleep = false; @@ -1807,28 +1812,28 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (_topo_sm._topology.unpublished_cdc_generations.empty()) { // No CDC generations to publish. Wait until one appears or the topology coordinator aborts. - slogger.trace("raft topology: CDC generation publisher fiber has nothing to do. Sleeping."); + rtlogger.debug("CDC generation publisher fiber has nothing to do. Sleeping."); co_await _topo_sm.event.when([&] () { return !_topo_sm._topology.unpublished_cdc_generations.empty() || _as.abort_requested(); }); - slogger.trace("raft topology: CDC generation publisher fiber wakes up"); + rtlogger.debug("CDC generation publisher fiber wakes up"); } } catch (raft::request_aborted&) { - slogger.debug("raft topology: CDC generation publisher fiber aborted"); + rtlogger.debug("CDC generation publisher fiber aborted"); } catch (seastar::abort_requested_exception) { - slogger.debug("raft topology: CDC generation publisher fiber aborted"); + rtlogger.debug("CDC generation publisher fiber aborted"); } catch (group0_concurrent_modification&) { } catch (term_changed_error&) { - slogger.debug("raft topology: CDC generation publisher fiber notices term change {} -> {}", _term, _raft.get_current_term()); + rtlogger.debug("CDC generation publisher fiber notices term change {} -> {}", _term, _raft.get_current_term()); } catch (...) { - slogger.error("raft topology: CDC generation publisher fiber got error {}", std::current_exception()); + rtlogger.error("CDC generation publisher fiber got error {}", std::current_exception()); sleep = true; } if (sleep) { try { co_await seastar::sleep_abortable(std::chrono::seconds(1), _as); } catch (...) { - slogger.debug("raft topology: CDC generation publisher: sleep failed: {}", std::current_exception()); + rtlogger.debug("CDC generation publisher: sleep failed: {}", std::current_exception()); } } co_await coroutine::maybe_yield(); @@ -1840,7 +1845,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { future<> handle_global_request(group0_guard guard) { switch (_topo_sm._topology.global_request.value()) { case global_topology_request::new_cdc_generation: { - slogger.info("raft topology: new CDC generation requested"); + rtlogger.info("new CDC generation requested"); auto tmptr = get_token_metadata_ptr(); auto [gen_uuid, guard_, mutation] = co_await prepare_and_broadcast_cdc_generation_data(tmptr, std::move(guard), std::nullopt); @@ -1871,7 +1876,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // marked as supported by all normal nodes and it is not empty future<> enable_features(group0_guard guard, std::set features_to_enable) { if (!_topo_sm._topology.transition_nodes.empty()) { - on_internal_error(slogger, + on_internal_error(rtlogger, "topology coordinator attempted to enable features even though there is" " a topology operations in progress"); } @@ -1916,7 +1921,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto reason = ::format("enabling features: {}", features_to_enable); co_await update_topology_state(std::move(guard), {builder.build()}, reason); - slogger.info("raft topology: enabled features: {}", features_to_enable); + rtlogger.info("enabled features: {}", features_to_enable); } future global_token_metadata_barrier(group0_guard&& guard, std::unordered_set exclude_nodes = {}) { @@ -1924,7 +1929,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { try { guard = co_await exec_global_command(std::move(guard), raft_topology_cmd::command::barrier_and_drain, exclude_nodes, drop_guard_and_retake::yes); } catch (...) { - slogger.error("raft topology: drain rpc failed, proceed to fence old writes: {}", std::current_exception()); + rtlogger.error("drain rpc failed, proceed to fence old writes: {}", std::current_exception()); drain_failed = true; } if (drain_failed) { @@ -1984,7 +1989,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (!holder || holder->failed()) { holder = futurize_invoke(action) .finally([this, g = _async_gate.hold(), gid, name] () noexcept { - slogger.trace("raft topology: {} for tablet {} resolved.", name, gid); + rtlogger.debug("{} for tablet {} resolved.", name, gid); _tablets_ready = true; _topo_sm.event.broadcast(); }); @@ -1992,7 +1997,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } if (!holder->available()) { - slogger.trace("raft topology: Tablet {} still doing {}", gid, name); + rtlogger.debug("Tablet {} still doing {}", gid, name); return false; } @@ -2020,7 +2025,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(mig.tablet.table); auto last_token = tmap.get_last_token(mig.tablet.tablet); if (tmap.get_tablet_transition_info(mig.tablet.tablet)) { - slogger.warn("Tablet already in transition, ignoring migration: {}", mig); + rtlogger.warn("Tablet already in transition, ignoring migration: {}", mig); return; } out.emplace_back( @@ -2047,7 +2052,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // If progress cannot be made, e.g. because all transitions are streaming, we block // and wait for notification. - slogger.trace("raft topology: handle_tablet_migration()"); + rtlogger.debug("handle_tablet_migration()"); std::vector updates; bool needs_barrier = false; bool has_transitions = false; @@ -2074,7 +2079,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { }; auto transition_to = [&] (locator::tablet_transition_stage stage) { - slogger.trace("raft topology: Will set tablet {} stage to {}", gid, stage); + rtlogger.debug("Will set tablet {} stage to {}", gid, stage); updates.emplace_back(get_mutation_builder() .set_stage(last_token, stage) .build()); @@ -2096,7 +2101,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { switch (trinfo.stage) { case locator::tablet_transition_stage::allow_write_both_read_old: if (do_barrier()) { - slogger.trace("raft topology: Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_old); + rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_old); updates.emplace_back(get_mutation_builder() .set_stage(last_token, locator::tablet_transition_stage::write_both_read_old) // Create session a bit earlier to avoid adding barrier @@ -2117,12 +2122,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { [] { throw std::runtime_error("stream_tablet failed due to error injection"); }); } if (advance_in_background(gid, tablet_state.streaming, "streaming", [&] { - slogger.info("raft topology: Initiating tablet streaming of {} to {}", gid, trinfo.pending_replica); + rtlogger.info("Initiating tablet streaming of {} to {}", gid, trinfo.pending_replica); auto dst = trinfo.pending_replica.host; return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging, netw::msg_addr(id2ip(dst)), _as, raft::server_id(dst.uuid()), gid); })) { - slogger.trace("raft topology: Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_new); + rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_new); updates.emplace_back(get_mutation_builder() .set_stage(last_token, locator::tablet_transition_stage::write_both_read_new) .del_session(last_token) @@ -2138,7 +2143,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { case locator::tablet_transition_stage::cleanup: if (advance_in_background(gid, tablet_state.cleanup, "cleanup", [&] { locator::tablet_replica dst = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo); - slogger.info("raft topology: Initiating tablet cleanup of {} on {}", gid, dst); + rtlogger.info("Initiating tablet cleanup of {} on {}", gid, dst); return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging, netw::msg_addr(id2ip(dst.host)), _as, raft::server_id(dst.host.uuid()), gid); })) { @@ -2172,7 +2177,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (ts != guard.write_timestamp()) { // We rely on the fact that should_preempt_balancing() does not release the guard // so that tablet metadata reading and updates are atomic. - on_internal_error(slogger, "should_preempt_balancing() retook the guard"); + on_internal_error(rtlogger, "should_preempt_balancing() retook the guard"); } } if (!preempt) { @@ -2215,7 +2220,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // Streaming may have finished after we checked. To avoid missed notification, we need // to check atomically with event.wait() if (!_tablets_ready) { - slogger.trace("raft topology: Going to sleep with active tablet transitions"); + rtlogger.debug("Going to sleep with active tablet transitions"); release_guard(std::move(guard)); co_await await_event(); } @@ -2292,7 +2297,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { try { co_await wait_for_ip(id, _address_map, _as); } catch (...) { - slogger.warn("wait_for_ip failed during cancelation: {}", std::current_exception()); + rtlogger.warn("wait_for_ip failed during cancelation: {}", std::current_exception()); } } break; @@ -2318,7 +2323,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { }, }); } catch (...) { - slogger.warn("raft topology: attempt to send rejection response to {} failed: {}. " + rtlogger.warn("attempt to send rejection response to {} failed: {}. " "The node may hang. It's safe to shut it down manually now.", id, std::current_exception()); } @@ -2367,6 +2372,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_return false; } + rtlogger.info("entered `{}` transition state", *tstate); switch (*tstate) { case topology::transition_state::join_group0: { auto [node, accepted] = co_await finish_accepting_node(get_node_to_work_on(std::move(guard))); @@ -2383,7 +2389,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto reason = ::format("bootstrap: failed to accept {}", node.id); co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, reason); - slogger.info("raft topology: node {} moved to left state", node.id); + rtlogger.info("node {} moved to left state", node.id); break; } @@ -2431,8 +2437,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } break; default: - on_internal_error(slogger, - format("raft topology: topology is in join_group0 state, but the node" + on_internal_error(rtlogger, + format("topology is in join_group0 state, but the node" " being worked on ({}) is in unexpected state '{}'; should be" " either 'bootstrapping' or 'replacing'", node.id, node.rs->state)); } @@ -2451,7 +2457,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (group0_concurrent_modification&) { throw; } catch (...) { - slogger.error("raft topology: transition_state::commit_cdc_generation, " + rtlogger.error("transition_state::commit_cdc_generation, " "raft_topology_cmd::command::barrier failed, error {}", std::current_exception()); _rollback = fmt::format("Failed to commit cdc generation: {}", std::current_exception()); break; @@ -2465,8 +2471,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto cdc_gen_ts = cdc::new_generation_timestamp(add_ts_delay, _ring_delay); auto cdc_gen_uuid = _topo_sm._topology.new_cdc_generation_data_uuid; if (!cdc_gen_uuid) { - on_internal_error(slogger, - "raft topology: new CDC generation data UUID missing in `commit_cdc_generation` state"); + on_internal_error(rtlogger, + "new CDC generation data UUID missing in `commit_cdc_generation` state"); } cdc::generation_id_v2 cdc_gen_id { @@ -2479,8 +2485,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // This could happen if the topology coordinator's clock is broken. auto curr_gen_id = _topo_sm._topology.current_cdc_generation_id; if (curr_gen_id && curr_gen_id->ts >= cdc_gen_ts) { - on_internal_error(slogger, ::format( - "raft topology: new CDC generation has smaller timestamp than the previous one." + on_internal_error(rtlogger, ::format( + "new CDC generation has smaller timestamp than the previous one." " Old generation ID: {}, new generation ID: {}", *curr_gen_id, cdc_gen_id)); } } @@ -2530,7 +2536,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (group0_concurrent_modification&) { throw; } catch (...) { - slogger.error("raft topology: tablets draining failed with {}. Aborting the topology operation", std::current_exception()); + rtlogger.error("tablets draining failed with {}. Aborting the topology operation", std::current_exception()); _rollback = fmt::format("Failed to drain tablets: {}", std::current_exception()); } break; @@ -2545,7 +2551,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (group0_concurrent_modification&) { throw; } catch (...) { - slogger.error("raft topology: transition_state::write_both_read_old, " + rtlogger.error("transition_state::write_both_read_old, " "global_token_metadata_barrier failed, error {}", std::current_exception()); _rollback = fmt::format("global_token_metadata_barrier failed in write_both_read_old state {}", std::current_exception()); @@ -2572,7 +2578,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (node.rs->state == node_state::decommissioning && raft::configuration::voter_count(_group0.group0_server().get_configuration().current) % 2 == 0) { if (node.id == _raft.id()) { - slogger.info("raft topology: coordinator is decommissioning and becomes a non-voter; " + rtlogger.info("coordinator is decommissioning and becomes a non-voter; " "giving up leadership"); co_await step_down_as_nonvoter(); } else { @@ -2600,7 +2606,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (term_changed_error&) { throw; } catch (...) { - slogger.error("raft topology: send_raft_topology_cmd(stream_ranges) failed with exception" + rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception" " (node state is {}): {}", node.rs->state, std::current_exception()); _rollback = fmt::format("Failed stream ranges: {}", std::current_exception()); break; @@ -2627,7 +2633,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (group0_concurrent_modification&) { throw; } catch (...) { - slogger.error("raft topology: transition_state::write_both_read_new, " + rtlogger.error("transition_state::write_both_read_new, " "global_token_metadata_barrier failed, error {}", std::current_exception()); barrier_failed = true; @@ -2701,7 +2707,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } break; default: - on_fatal_internal_error(slogger, ::format( + on_fatal_internal_error(rtlogger, ::format( "Ring state on node {} is write_both_read_new while the node is in state {}", node.id, node.rs->state)); } @@ -2719,12 +2725,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // Used to start new topology transitions using node requests or perform node operations // that don't change the topology (like rebuild). future<> handle_node_transition(node_to_work_on&& node) { - slogger.info("raft topology: coordinator fiber found a node to work on id={} state={}", node.id, node.rs->state); + rtlogger.info("coordinator fiber found a node to work on id={} state={}", node.id, node.rs->state); switch (node.rs->state) { case node_state::none: { if (_topo_sm._topology.normal_nodes.empty()) { - slogger.info("raft topology: skipping join node handshake for the first node in the cluster"); + rtlogger.info("skipping join node handshake for the first node in the cluster"); } else { auto validation_result = validate_joining_node(node); @@ -2757,7 +2763,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { throw; } catch(...) { wait_for_ip_error = std::current_exception(); - slogger.warn("raft_topology_cmd::command::wait_for_ip failed, error {}", + rtlogger.warn("raft_topology_cmd::command::wait_for_ip failed, error {}", wait_for_ip_error); } if (wait_for_ip_error) { @@ -2784,14 +2790,14 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, reason); - slogger.info("raft topology: rejected node moved to left state {}", node.id); + rtlogger.info("rejected node moved to left state {}", node.id); try { co_await respond_to_joining_node(node.id, join_node_response_params{ .response = std::move(validation_result), }); } catch (const std::runtime_error& e) { - slogger.warn("raft topology: attempt to send rejection response to {} failed. " + rtlogger.warn("attempt to send rejection response to {} failed. " "The node may hang. It's safe to shut it down manually now. Error: {}", node.id, e.what()); } @@ -2842,7 +2848,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { rtbuilder.done("the node is alive"); co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()}, "reject removenode"); - slogger.warn("raft topology: rejected removenode operation for node {} " + rtlogger.warn("rejected removenode operation for node {} " "because it is alive", node.id); break; } @@ -2895,7 +2901,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (node.id == _raft.id()) { // Someone else needs to coordinate the rest of the decommission process, // because the decommissioning node is going to shut down in the middle of this state. - slogger.info("raft topology: coordinator is decommissioning; giving up leadership"); + rtlogger.info("coordinator is decommissioning; giving up leadership"); co_await step_down_as_nonvoter(); // Note: if we restart after this point and become a voter @@ -2913,7 +2919,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (group0_concurrent_modification&) { throw; } catch (...) { - slogger.error("raft topology: node_state::left_token_ring (node: {}), " + rtlogger.error("node_state::left_token_ring (node: {}), " "global_token_metadata_barrier failed, error {}", node.id, std::current_exception()); barrier_failed = true; @@ -2947,7 +2953,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { try { node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier); } catch (...) { - slogger.warn("raft topology: failed to tell node {} to shut down - it may hang." + rtlogger.warn("failed to tell node {} to shut down - it may hang." " It's safe to shut it down manually now. (Exception: {})", node.id, std::current_exception()); shutdown_failed = true; @@ -2976,7 +2982,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } catch (term_changed_error&) { throw; } catch(...) { - slogger.warn("raft topology: failed to run barrier_and_drain during rollback {}", std::current_exception()); + rtlogger.warn("failed to run barrier_and_drain during rollback {}", std::current_exception()); barrier_failed = true; } @@ -2996,7 +3002,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { auto str = fmt::format("complete rollback of {} to state normal", node.id); - slogger.info("{}", str); + rtlogger.info("{}", str); co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, str); } break; @@ -3005,12 +3011,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { case node_state::removing: case node_state::replacing: // Should not get here - on_fatal_internal_error(slogger, ::format( + on_fatal_internal_error(rtlogger, ::format( "Found node {} in state {} but there is no ongoing topology transition", node.id, node.rs->state)); case node_state::left: // Should not get here - on_fatal_internal_error(slogger, ::format( + on_fatal_internal_error(rtlogger, ::format( "Topology coordinator is called for node {} in state 'left'", node.id)); break; } @@ -3038,7 +3044,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { const auto& supported_features = node.rs->supported_features; std::ranges::set_difference(node.topology->enabled_features, supported_features, std::back_inserter(unsupported_features)); if (!unsupported_features.empty()) { - slogger.warn("raft topology: node {} does not understand some features: {}", node.id, unsupported_features); + rtlogger.warn("node {} does not understand some features: {}", node.id, unsupported_features); return join_node_response_params::rejected{ .reason = format("Feature check failed. The node does not support some features that are enabled by the cluster: {}", unsupported_features), @@ -3075,7 +3081,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { }); responded = true; } catch (const std::runtime_error& e) { - slogger.warn("raft topology: attempt to send acceptance response to {} failed. " + rtlogger.warn("attempt to send acceptance response to {} failed. " "The node may hang. It's safe to shut it down manually now. Error: {}", node.id, e.what()); } @@ -3109,7 +3115,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { topology_mutation_builder builder(node.guard.write_timestamp()); builder.with_node(id).set("cleanup_status", cleanup_status::needed); muts.emplace_back(builder.build()); - slogger.trace("raft topology: mark node {} as needed cleanup", id); + rtlogger.trace("mark node {} as needed cleanup", id); } } return muts; @@ -3130,7 +3136,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { topology_mutation_builder builder(guard.write_timestamp()); builder.with_node(id).set("cleanup_status", cleanup_status::running); muts.emplace_back(builder.build()); - slogger.trace("raft topology: mark node {} as cleanup running", id); + rtlogger.trace("mark node {} as cleanup running", id); } } if (!muts.empty()) { @@ -3177,12 +3183,12 @@ public: }; future topology_coordinator::maybe_start_tablet_migration(group0_guard guard) { - slogger.debug("raft topology: Evaluating tablet balance"); + rtlogger.debug("Evaluating tablet balance"); auto tm = get_token_metadata_ptr(); auto plan = co_await _tablet_allocator.balance_tablets(tm); if (plan.empty()) { - slogger.debug("raft topology: Tablets are balanced"); + rtlogger.debug("Tablets are balanced"); co_return false; } @@ -3225,10 +3231,10 @@ future<> topology_coordinator::fence_previous_coordinator() { continue; } catch (raft::request_aborted&) { // Abort was requested. Break the loop - slogger.debug("raft topology: request to fence previous coordinator was aborted"); + rtlogger.debug("request to fence previous coordinator was aborted"); break; } catch (...) { - slogger.error("raft topology: failed to fence previous coordinator {}", std::current_exception()); + rtlogger.error("failed to fence previous coordinator {}", std::current_exception()); } try { co_await seastar::sleep_abortable(std::chrono::seconds(1), _as); @@ -3236,13 +3242,13 @@ future<> topology_coordinator::fence_previous_coordinator() { // Abort was requested. Break the loop break; } catch (...) { - slogger.debug("raft topology: sleep failed while fencing previous coordinator: {}", std::current_exception()); + rtlogger.debug("sleep failed while fencing previous coordinator: {}", std::current_exception()); } } } future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard) { - slogger.info("raft topology: start rolling back topology change"); + rtlogger.info("start rolling back topology change"); // Look for a node which operation should be aborted // (there should be one since we are in the rollback) node_to_work_on node = get_node_to_work_on(std::move(guard)); @@ -3269,7 +3275,7 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard state = node_state::rollback_to_normal; break; default: - on_internal_error(slogger, fmt::format("raft topology: tried to rollback in unsupported state {}", node.rs->state)); + on_internal_error(rtlogger, fmt::format("tried to rollback in unsupported state {}", node.rs->state)); } topology_mutation_builder builder(node.guard.write_timestamp()); @@ -3289,12 +3295,12 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard auto str = fmt::format("rollback {} after {} failure to state {} and setting cleanup flag", node.id, node.rs->state, state); - slogger.info("{}", str); + rtlogger.info("{}", str); co_await update_topology_state(std::move(node.guard), std::move(muts), str); } future<> topology_coordinator::run() { - slogger.info("raft topology: start topology coordinator fiber"); + rtlogger.info("start topology coordinator fiber"); auto abort = _as.subscribe([this] () noexcept { _topo_sm.event.broadcast(); @@ -3320,29 +3326,29 @@ future<> topology_coordinator::run() { bool had_work = co_await handle_topology_transition(std::move(guard)); if (!had_work) { // Nothing to work on. Wait for topology change event. - slogger.trace("raft topology: topology coordinator fiber has nothing to do. Sleeping."); + rtlogger.debug("topology coordinator fiber has nothing to do. Sleeping."); co_await await_event(); - slogger.trace("raft topology: topology coordinator fiber got an event"); + rtlogger.debug("topology coordinator fiber got an event"); } } catch (raft::request_aborted&) { - slogger.debug("raft topology: topology change coordinator fiber aborted"); + rtlogger.debug("topology change coordinator fiber aborted"); } catch (seastar::abort_requested_exception&) { - slogger.debug("raft topology: topology change coordinator fiber aborted"); + rtlogger.debug("topology change coordinator fiber aborted"); } catch (raft::commit_status_unknown&) { - slogger.warn("raft topology: topology change coordinator fiber got commit_status_unknown"); + rtlogger.warn("topology change coordinator fiber got commit_status_unknown"); } catch (group0_concurrent_modification&) { } catch (term_changed_error&) { // Term changed. We may no longer be a leader - slogger.debug("raft topology: topology change coordinator fiber notices term change {} -> {}", _term, _raft.get_current_term()); + rtlogger.debug("topology change coordinator fiber notices term change {} -> {}", _term, _raft.get_current_term()); } catch (...) { - slogger.error("raft topology: topology change coordinator fiber got error {}", std::current_exception()); + rtlogger.error("topology change coordinator fiber got error {}", std::current_exception()); sleep = true; } if (sleep) { try { co_await seastar::sleep_abortable(std::chrono::seconds(1), _as); } catch (...) { - slogger.debug("raft topology: sleep failed: {}", std::current_exception()); + rtlogger.debug("sleep failed: {}", std::current_exception()); } } co_await coroutine::maybe_yield(); @@ -3368,7 +3374,7 @@ future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded storage_service::raft_state_monitor_fiber(raft::server& raft, shardedrequest_abort(); // abort current coordinator if running @@ -3484,13 +3490,13 @@ public: {} future<> pre_server_start(const group0_info& g0_info) override { - slogger.info("raft topology: join: sending the join request to {}", g0_info.ip_addr); + rtlogger.info("join: sending the join request to {}", g0_info.ip_addr); auto result = co_await ser::join_node_rpc_verbs::send_join_node_request( &_ss._messaging.local(), netw::msg_addr(g0_info.ip_addr), g0_info.id, _req); std::visit(overloaded_functor { [this] (const join_node_request_result::ok&) { - slogger.info("raft topology: join: request to join placed, waiting" + rtlogger.info("join: request to join placed, waiting" " for the response from the topology coordinator"); _ss._join_node_request_done.set_value(); @@ -3513,7 +3519,7 @@ public: // deliver the rejection, it won't complete. In such a case, the // operator is responsible for shutting down the joining node. co_await _ss._join_node_response_done.get_shared_future(as); - slogger.info("raft topology: join: success"); + rtlogger.info("join: success"); co_return true; } }; @@ -3528,7 +3534,7 @@ future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_se throw std::runtime_error(::format("Cannot perform a replace operation because this is the first node in the cluster")); } - slogger.info("raft topology: adding myself as the first node to the topology"); + rtlogger.info("adding myself as the first node to the topology"); auto guard = co_await _group0->client().start_operation(&_group0_as); auto insert_join_request_mutations = build_mutation_from_join_params(params, guard); @@ -3546,7 +3552,7 @@ future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_se try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: bootstrap: concurrent operation is detected, retrying."); + rtlogger.info("bootstrap: concurrent operation is detected, retrying."); } } } @@ -3589,7 +3595,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft } while (true) { - slogger.info("raft topology: refreshing topology to check if it's synchronized with local metadata"); + rtlogger.info("refreshing topology to check if it's synchronized with local metadata"); auto guard = co_await _group0->client().start_operation(&_group0_as); @@ -3611,7 +3617,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft const auto unsafe_to_disable_features = _topology_state_machine._topology.calculate_not_yet_enabled_features(); _feature_service.check_features(enabled_features, unsafe_to_disable_features); - slogger.info("raft topology: updating topology with local metadata"); + rtlogger.info("updating topology with local metadata"); co_await _sys_ks.local().set_must_synchronize_topology(true); @@ -3628,7 +3634,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: update topology with local metadata:" + rtlogger.info("update topology with local metadata:" " concurrent operation is detected, retrying."); } } @@ -3902,7 +3908,7 @@ future<> storage_service::join_token_ring(shardedclient().get_group0_upgrade_state()).second; if (upgrade_state != group0_upgrade_state::use_post_raft_procedures) { - on_internal_error(slogger, "raft topology: cluster not upgraded to use group 0 after setup_group0"); + on_internal_error(rtlogger, "cluster not upgraded to use group 0 after setup_group0"); } co_return &_group0->group0_server(); } @@ -3917,7 +3923,7 @@ future<> storage_service::join_token_ring(sharded storage_service::raft_decommission() { throw std::runtime_error("Cannot decommission last node in the cluster"); } - slogger.info("raft topology: request decommission for: {}", raft_server.id()); + rtlogger.info("request decommission for: {}", raft_server.id()); topology_mutation_builder builder(guard.write_timestamp()); builder.with_node(raft_server.id()) .set("topology_request", topology_request::leave) @@ -5507,7 +5513,7 @@ future<> storage_service::raft_decommission() { try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: decommission: concurrent operation is detected, retrying."); + rtlogger.info("decommission: concurrent operation is detected, retrying."); continue; } break; @@ -5520,7 +5526,7 @@ future<> storage_service::raft_decommission() { co_await _gossiper.add_local_application_state({{ gms::application_state::STATUS, gms::versioned_value::left({}, _gossiper.now().time_since_epoch().count()) }}); } else { auto err = fmt::format("Decommission failed. See earlier errors ({})", error); - slogger.error("{}", err); + rtlogger.error("{}", err); throw std::runtime_error(err); } } @@ -5823,7 +5829,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list storage_service::raft_removenode(locator::host_id host_id, std::list storage_service::raft_removenode(locator::host_id host_id, std::listclient().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: removenode: concurrent operation is detected, retrying."); + rtlogger.info("removenode: concurrent operation is detected, retrying."); continue; } break; } - slogger.info("raft topology: removenode: wait for completion"); + rtlogger.info("removenode: wait for completion"); // Wait until request completes auto error = co_await wait_for_topology_request_completion(request_id); @@ -5869,11 +5875,11 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::listremove_from_raft_config(id); } catch (raft::not_a_member&) { - slogger.info("raft topology removenode: already removed from the raft config by the topology coordinator"); + rtlogger.info("removenode: already removed from the raft config by the topology coordinator"); } } else { auto err = fmt::format("Removenode failed. See earlier errors ({})", error); - slogger.error("{}", err); + rtlogger.error("{}", err); throw std::runtime_error(err); } } @@ -6442,7 +6448,7 @@ future<> storage_service::do_cluster_cleanup() { throw std::runtime_error(::format("local node is not in the normal state (current state: {})", rs.state)); } - slogger.info("raft topology: cluster cleanup requested"); + rtlogger.info("cluster cleanup requested"); topology_mutation_builder builder(guard.write_timestamp()); builder.set_global_topology_request(global_topology_request::cleanup); topology_change change{{builder.build()}}; @@ -6451,7 +6457,7 @@ future<> storage_service::do_cluster_cleanup() { try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: cleanup: concurrent operation is detected, retrying."); + rtlogger.info("cleanup: concurrent operation is detected, retrying."); continue; } break; @@ -6463,7 +6469,7 @@ future<> storage_service::do_cluster_cleanup() { return n.second.cleanup == cleanup_status::clean; }); }); - slogger.info("raft topology: cluster cleanup done"); + rtlogger.info("cluster cleanup done"); } future storage_service::wait_for_topology_request_completion(utils::UUID id) { @@ -6500,7 +6506,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) { throw std::runtime_error("Cannot rebuild a single node"); } - slogger.info("raft topology: request rebuild for: {}", raft_server.id()); + rtlogger.info("request rebuild for: {}", raft_server.id()); topology_mutation_builder builder(guard.write_timestamp()); builder.set_session(session_id(guard.new_group0_state_id())); builder.with_node(raft_server.id()) @@ -6518,7 +6524,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) { try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: rebuild: concurrent operation is detected, retrying."); + rtlogger.info("rebuild: concurrent operation is detected, retrying."); continue; } break; @@ -6535,7 +6541,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() { std::optional curr_gen; while (true) { - slogger.info("raft topology: request check_and_repair_cdc_streams, refreshing topology"); + rtlogger.info("request check_and_repair_cdc_streams, refreshing topology"); auto guard = co_await _group0->client().start_operation(&_group0_as); auto curr_req = _topology_state_machine._topology.global_request; if (curr_req && *curr_req != global_topology_request::new_cdc_generation) { @@ -6564,7 +6570,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() { try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { - slogger.info("raft topology: request check+repair CDC: concurrent operation is detected, retrying."); + rtlogger.info("request check+repair CDC: concurrent operation is detected, retrying."); continue; } break; @@ -7059,7 +7065,7 @@ future<> storage_service::snitch_reconfigured() { future storage_service::raft_topology_cmd_handler(raft::term_t term, uint64_t cmd_index, const raft_topology_cmd& cmd) { raft_topology_cmd_result result; - slogger.trace("raft topology: topology cmd rpc {} is called", cmd.cmd); + rtlogger.debug("topology cmd rpc {} is called", cmd.cmd); // The retrier does: // If no operation was previously started - start it now @@ -7069,16 +7075,16 @@ future storage_service::raft_topology_cmd_handler(raft auto retrier = [] (std::optional>& f, auto&& func) -> future<> { if (!f || f->failed()) { if (f) { - slogger.info("raft topology: retry streaming after previous attempt failed with {}", f->get_future().get_exception()); + rtlogger.info("retry streaming after previous attempt failed with {}", f->get_future().get_exception()); } else { - slogger.info("raft topology: start streaming"); + rtlogger.info("start streaming"); } f = func(); } else { - slogger.debug("raft topology: already streaming"); + rtlogger.debug("already streaming"); } co_await f.value().get_future(); - slogger.info("raft topology: streaming completed"); + rtlogger.info("streaming completed"); }; try { @@ -7135,7 +7141,7 @@ future storage_service::raft_topology_cmd_handler(raft ex = std::current_exception(); } if (ex) { - slogger.error("raft topology: feature check during barrier failed: {}", ex); + rtlogger.error("feature check during barrier failed: {}", ex); co_await drain(); break; } @@ -7147,7 +7153,7 @@ future storage_service::raft_topology_cmd_handler(raft case raft_topology_cmd::command::barrier_and_drain: { co_await container().invoke_on_all([version] (storage_service& ss) -> future<> { const auto current_version = ss._shared_token_metadata.get()->get_version(); - slogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}", + rtlogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}", version, current_version); // This shouldn't happen under normal operation, it's only plausible @@ -7168,7 +7174,7 @@ future storage_service::raft_topology_cmd_handler(raft co_await ss._shared_token_metadata.stale_versions_in_use(); co_await get_topology_session_manager().drain_closing_sessions(); - slogger.debug("raft_topology_cmd::barrier_and_drain done"); + rtlogger.debug("raft_topology_cmd::barrier_and_drain done"); }); result.status = raft_topology_cmd_result::command_status::success; } @@ -7178,7 +7184,7 @@ future storage_service::raft_topology_cmd_handler(raft auto tstate = _topology_state_machine._topology.tstate; if (!rs.ring || (tstate != topology::transition_state::write_both_read_old && rs.state != node_state::normal && rs.state != node_state::rebuilding)) { - slogger.warn("raft topology: got stream_ranges request while my tokens state is {} and node state is {}", tstate, rs.state); + rtlogger.warn("got stream_ranges request while my tokens state is {} and node state is {}", tstate, rs.state); break; } @@ -7212,7 +7218,7 @@ future storage_service::raft_topology_cmd_handler(raft } else { co_await retrier(_bootstrap_result, coroutine::lambda([&] () ->future<> { if (!_topology_state_machine._topology.req_param.contains(raft_server.id())) { - on_internal_error(slogger, ::format("Cannot find request_param for node id {}", raft_server.id())); + on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", raft_server.id())); } if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) { // FIXME: we should not need to translate ids to IPs here. See #6403. @@ -7220,7 +7226,7 @@ future storage_service::raft_topology_cmd_handler(raft for (const auto& id : std::get(_topology_state_machine._topology.req_param[raft_server.id()]).ignored_ids) { auto ip = _group0->address_map().find(id); if (!ip) { - on_fatal_internal_error(slogger, ::format("Cannot find a mapping from node id {} to its ip", id)); + on_fatal_internal_error(rtlogger, ::format("Cannot find a mapping from node id {} to its ip", id)); } ignored_ips.insert(*ip); } @@ -7252,11 +7258,11 @@ future storage_service::raft_topology_cmd_handler(raft // Find the node that is been removed auto it = boost::find_if(_topology_state_machine._topology.transition_nodes, [] (auto& e) { return e.second.state == node_state::removing; }); if (it == _topology_state_machine._topology.transition_nodes.end()) { - slogger.warn("raft topology: got stream_ranges request while my state is normal but cannot find a node that is been removed"); + rtlogger.warn("got stream_ranges request while my state is normal but cannot find a node that is been removed"); break; } auto id = it->first; - slogger.debug("raft topology: streaming to remove node {}", id); + rtlogger.debug("streaming to remove node {}", id); const auto& am = _group0->address_map(); auto ip = am.find(id); // map node id to ip assert (ip); // what to do if address is unknown? @@ -7269,14 +7275,14 @@ future storage_service::raft_topology_cmd_handler(raft }); if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) { if (!_topology_state_machine._topology.req_param.contains(id)) { - on_internal_error(slogger, ::format("Cannot find request_param for node id {}", id)); + on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", id)); } // FIXME: we should not need to translate ids to IPs here. See #6403. std::list ignored_ips; for (const auto& ignored_id : std::get(_topology_state_machine._topology.req_param[id]).ignored_ids) { auto ip = _group0->address_map().find(ignored_id); if (!ip) { - on_fatal_internal_error(slogger, ::format("Cannot find a mapping from node id {} to its ip", ignored_id)); + on_fatal_internal_error(rtlogger, ::format("Cannot find a mapping from node id {} to its ip", ignored_id)); } ignored_ips.push_back(*ip); } @@ -7291,7 +7297,7 @@ future storage_service::raft_topology_cmd_handler(raft break; case node_state::rebuilding: { auto source_dc = std::get(_topology_state_machine._topology.req_param[raft_server.id()]).source_dc; - slogger.info("raft topology: rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); + rtlogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); co_await retrier(_rebuild_result, [&] () -> future<> { auto tmptr = get_token_metadata_ptr(); if (is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) { @@ -7309,11 +7315,11 @@ future storage_service::raft_topology_cmd_handler(raft } try { co_await streamer->stream_async(); - slogger.info("raft topology: streaming for rebuild successful"); + rtlogger.info("streaming for rebuild successful"); } catch (...) { auto ep = std::current_exception(); // This is used exclusively through JMX, so log the full trace but only throw a simple RTE - slogger.warn("raft topology: error while rebuilding node: {}", ep); + rtlogger.warn("error while rebuilding node: {}", ep); std::rethrow_exception(std::move(ep)); } } @@ -7327,7 +7333,7 @@ future storage_service::raft_topology_cmd_handler(raft case node_state::none: case node_state::removing: case node_state::rollback_to_normal: - on_fatal_internal_error(slogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster", + on_fatal_internal_error(rtlogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster", raft_server.id(), rs.state)); break; } @@ -7342,17 +7348,17 @@ future storage_service::raft_topology_cmd_handler(raft ids.push_back(id); } } - slogger.debug("Got raft_topology_cmd::wait_for_ip, new nodes [{}]", ids); + rtlogger.debug("Got raft_topology_cmd::wait_for_ip, new nodes [{}]", ids); for (const auto& id: ids) { co_await wait_for_ip(id, _group0->address_map(), _abort_source); } - slogger.debug("raft_topology_cmd::wait_for_ip done [{}]", ids); + rtlogger.debug("raft_topology_cmd::wait_for_ip done [{}]", ids); result.status = raft_topology_cmd_result::command_status::success; break; } } } catch (...) { - slogger.error("raft topology: raft_topology_cmd failed with: {}", std::current_exception()); + rtlogger.error("raft_topology_cmd failed with: {}", std::current_exception()); } co_return result; } @@ -7429,7 +7435,7 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, co_await raft_server.read_barrier(&_group0_as); if (_tablet_ops.contains(tablet)) { - slogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet); + rtlogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet); co_await _tablet_ops[tablet].done.get_future(); co_return; } @@ -7452,10 +7458,10 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, try { co_await op(guard); p.set_value(); - slogger.debug("{} for tablet migration of {} successful", op_name, tablet); + rtlogger.debug("{} for tablet migration of {} successful", op_name, tablet); } catch (...) { p.set_exception(std::current_exception()); - slogger.warn("{} for tablet migration of {} failed: {}", op_name, tablet, std::current_exception()); + rtlogger.warn("{} for tablet migration of {} failed: {}", op_name, tablet, std::current_exception()); throw; } } @@ -7536,7 +7542,7 @@ future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) { } auto shard_opt = tmap.get_shard(tablet.tablet, tm->get_my_id()); if (!shard_opt) { - on_internal_error(slogger, format("Tablet {} has no shard on this node", tablet)); + on_internal_error(rtlogger, format("Tablet {} has no shard on this node", tablet)); } shard = *shard_opt; } @@ -7561,7 +7567,7 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator: auto guard = co_await _group0->client().start_operation(&_abort_source); while (_topology_state_machine._topology.is_busy()) { - slogger.debug("move_tablet(): topology state machine is busy"); + rtlogger.debug("move_tablet(): topology state machine is busy"); release_guard(std::move(guard)); co_await _topology_state_machine.event.wait(); guard = co_await _group0->client().start_operation(&_abort_source); @@ -7605,15 +7611,15 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator: .build())); sstring reason = format("Moving tablet {} from {} to {}", gid, src, dst); - slogger.info("raft topology: {}", reason); - slogger.trace("raft topology: do update {} reason {}", updates, reason); + rtlogger.info("{}", reason); + rtlogger.trace("do update {} reason {}", updates, reason); topology_change change{std::move(updates)}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason); try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); break; } catch (group0_concurrent_modification&) { - slogger.debug("move_tablet(): concurrent modification, retrying"); + rtlogger.debug("move_tablet(): concurrent modification, retrying"); } } @@ -7638,7 +7644,7 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) { group0_guard guard = co_await _group0->client().start_operation(&_abort_source); while (_topology_state_machine._topology.is_busy()) { - slogger.debug("set_tablet_balancing_enabled(): topology is busy"); + rtlogger.debug("set_tablet_balancing_enabled(): topology is busy"); release_guard(std::move(guard)); co_await _topology_state_machine.event.wait(); guard = co_await _group0->client().start_operation(&_abort_source); @@ -7650,21 +7656,21 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) { .build())); sstring reason = format("Setting tablet balancing to {}", enabled); - slogger.info("raft topology: {}", reason); + rtlogger.info("{}", reason); topology_change change{std::move(updates)}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason); try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); break; } catch (group0_concurrent_modification&) { - slogger.debug("set_tablet_balancing_enabled(): concurrent modification"); + rtlogger.debug("set_tablet_balancing_enabled(): concurrent modification"); } } } future storage_service::join_node_request_handler(join_node_request_params params) { join_node_request_result result; - slogger.info("raft topology: received request to join from host_id: {}", params.host_id); + rtlogger.info("received request to join from host_id: {}", params.host_id); if (params.cluster_name != _db.local().get_config().cluster_name()) { result.result = join_node_request_result::rejected{ @@ -7723,7 +7729,7 @@ future storage_service::join_node_request_handler(join const auto timeout = std::chrono::seconds(10); - slogger.warn("raft topology: the node {} which was requested to be" + rtlogger.warn("the node {} which was requested to be" " replaced has the same ID as the current group 0 leader ({});" " this looks like an attempt to join a node with the same IP" " as a leader which might have just crashed; waiting for" @@ -7745,7 +7751,7 @@ future storage_service::join_node_request_handler(join co_await sleep_abortable(std::chrono::milliseconds(100), as); } } catch (abort_requested_exception&) { - slogger.warn("raft topology: the node {} tries to replace the" + rtlogger.warn("the node {} tries to replace the" " current leader {} but the leader didn't change within" " {}s. Rejecting the node", params.host_id, @@ -7773,7 +7779,7 @@ future storage_service::join_node_request_handler(join if (const auto *p = _topology_state_machine._topology.find(params.host_id)) { const auto& rs = p->second; if (rs.state == node_state::left) { - slogger.warn("raft topology: the node {} attempted to join", + rtlogger.warn("the node {} attempted to join", " but it was removed from the cluster. Rejecting" " the node", params.host_id); @@ -7781,7 +7787,7 @@ future storage_service::join_node_request_handler(join .reason = "The node has already been removed from the cluster", }; } else { - slogger.warn("raft topology: the node {} attempted to join", + rtlogger.warn("the node {} attempted to join", " again after an unfinished attempt but it is no longer" " allowed to do so. Rejecting the node", params.host_id); @@ -7802,11 +7808,11 @@ future storage_service::join_node_request_handler(join co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); break; } catch (group0_concurrent_modification&) { - slogger.info("raft topology: join_node_request: concurrent operation is detected, retrying."); + rtlogger.info("join_node_request: concurrent operation is detected, retrying."); } } - slogger.info("raft topology: placed join request for {}", params.host_id); + rtlogger.info("placed join request for {}", params.host_id); // Success result.result = join_node_request_result::ok {}; @@ -7830,7 +7836,7 @@ future storage_service::join_node_response_handler(jo if (_join_node_response_done.available()) { // We already handled this RPC. No need to retry it. - slogger.info("raft topology: the node got join_node_response RPC for the second time, ignoring"); + rtlogger.info("the node got join_node_response RPC for the second time, ignoring"); if (std::holds_alternative(params.response) && _join_node_response_done.failed()) { @@ -7905,7 +7911,7 @@ future storage_service::join_node_response_handler(jo } static logger::rate_limit rate_limit{std::chrono::seconds(1)}; - slogger.log(log_level::warn, rate_limit, "raft topology: cannot map nodes {} to ips, retrying.", + rtlogger.log(log_level::warn, rate_limit, "cannot map nodes {} to ips, retrying.", untranslated_ids); co_await sleep_abortable(std::chrono::milliseconds(5), _group0_as); @@ -7914,11 +7920,11 @@ future storage_service::join_node_response_handler(jo } } - slogger.info("raft topology: coordinator accepted request to join, " + rtlogger.info("coordinator accepted request to join, " "waiting for nodes {} to be alive before responding and continuing", sync_nodes); co_await _gossiper.wait_alive(sync_nodes, wait_for_live_nodes_timeout); - slogger.info("raft topology: nodes {} are alive", sync_nodes); + rtlogger.info("nodes {} are alive", sync_nodes); // Unblock waiting join_node_rpc_handshaker::post_server_start, // which will start the raft server and continue @@ -7936,7 +7942,7 @@ future storage_service::join_node_response_handler(jo }, params.response); } catch (...) { auto eptr = std::current_exception(); - slogger.warn("raft topology: error while handling the join response from the topology coordinator. " + rtlogger.warn("error while handling the join response from the topology coordinator. " "The node will not join the cluster. Error: {}", eptr); _join_node_response_done.set_exception(std::move(eptr)); diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index b6f1177583..953de5b69f 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -129,7 +129,8 @@ SCYLLA_CMDLINE_OPTIONS = [ '--abort-on-lsa-bad-alloc', '1', '--abort-on-seastar-bad-alloc', '--abort-on-internal-error', '1', - '--abort-on-ebadf', '1' + '--abort-on-ebadf', '1', + '--logger-log-level', 'raft_topology=debug', ] # [--smp, 1], [--smp, 2] -> [--smp, 2] diff --git a/test/topology/test_automatic_cleanup.py b/test/topology/test_automatic_cleanup.py index db5ada359e..7ae5e5a91a 100644 --- a/test/topology/test_automatic_cleanup.py +++ b/test/topology/test_automatic_cleanup.py @@ -26,21 +26,21 @@ async def test_no_cleanup_when_unnecessary(request, manager: ManagerClient): marks = [await log.mark() for log in logs] await manager.server_add() await manager.server_add() - matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 0 servers = await manager.running_servers() logs = [await manager.server_open_log(srv.server_id) for srv in servers] marks = [await log.mark() for log in logs] await manager.decommission_node(servers[4].server_id) - matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 4 servers = await manager.running_servers() logs = [await manager.server_open_log(srv.server_id) for srv in servers] marks = [await log.mark() for log in logs] await manager.decommission_node(servers[3].server_id) - matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 0 await manager.server_add() @@ -48,11 +48,11 @@ async def test_no_cleanup_when_unnecessary(request, manager: ManagerClient): logs = [await manager.server_open_log(srv.server_id) for srv in servers] marks = [await log.mark() for log in logs] await manager.api.client.post("/storage_service/cleanup_all", servers[0].ip_addr) - matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 3 marks = [await log.mark() for log in logs] await manager.decommission_node(servers[3].server_id) - matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 0 diff --git a/test/topology/test_coordinator_queue_management.py b/test/topology/test_coordinator_queue_management.py index 6fc1c4ca96..1c451b2e42 100644 --- a/test/topology/test_coordinator_queue_management.py +++ b/test/topology/test_coordinator_queue_management.py @@ -40,7 +40,7 @@ async def test_coordinator_queue_management(manager: ManagerClient): done, pending = await asyncio.wait(search, return_when = asyncio.FIRST_COMPLETED) for t in pending: t.cancel() - [await l.wait_for("raft topology: removenode: wait for completion", m) for l, m in zip(logs[:2], marks[:2])] + [await l.wait_for("raft_topology - removenode: wait for completion", m) for l, m in zip(logs[:2], marks[:2])] [await manager.api.message_injection(s.ip_addr, inj) for s in servers[:3]] @@ -61,7 +61,7 @@ async def test_coordinator_queue_management(manager: ManagerClient): done, pending = await asyncio.wait(search, return_when = asyncio.FIRST_COMPLETED) for t in pending: t.cancel() - logs[1].wait_for("raft topology: decommission: wait for completion", marks[1]) + logs[1].wait_for("raft_topology - decommission: wait for completion", marks[1]) [await manager.api.message_injection(s.ip_addr, inj) for s in servers[:3]] diff --git a/test/topology/test_topology_failure_recovery.py b/test/topology/test_topology_failure_recovery.py index dc1d3cf6d6..17c5cb4188 100644 --- a/test/topology/test_topology_failure_recovery.py +++ b/test/topology/test_topology_failure_recovery.py @@ -27,7 +27,7 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.decommission_node(servers[2].server_id, expected_error="Decommission failed. See earlier errors") servers = await manager.running_servers() assert len(servers) == 3 - matches = [await log.grep("storage_service - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 # remove failure marks = [await log.mark() for log in logs] @@ -36,7 +36,7 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.server_stop_gracefully(servers[3].server_id) await manager.api.enable_injection(servers[2].ip_addr, 'stream_ranges_fail', one_shot=True) await manager.remove_node(servers[0].server_id, servers[3].server_id, expected_error="Removenode failed. See earlier errors") - matches = [await log.grep("storage_service - rollback.*after removing failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after removing failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 await manager.server_start(servers[3].server_id) await manager.servers_see_each_other(servers) @@ -49,7 +49,7 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors") servers = await manager.running_servers() assert s not in servers - matches = [await log.grep("storage_service - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 # bootstrap failure in raft barrier marks = [await log.mark() for log in logs] @@ -59,7 +59,7 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors") servers = await manager.running_servers() assert s not in servers - matches = [await log.grep("storage_service - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 # replace failure marks = [await log.mark() for log in logs] @@ -72,5 +72,5 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.server_start(s.server_id, expected_error="Replace failed. See earlier errors") servers = await manager.running_servers() assert s not in servers - matches = [await log.grep("storage_service - rollback.*after replacing failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after replacing failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 diff --git a/test/topology_custom/test_remove_alive_node.py b/test/topology_custom/test_remove_alive_node.py index 84e4759af3..7660fca7cd 100644 --- a/test/topology_custom/test_remove_alive_node.py +++ b/test/topology_custom/test_remove_alive_node.py @@ -44,4 +44,4 @@ async def test_removing_alive_node_fails(manager: ManagerClient) -> None: # topology_coordinator::handle_node_transition). logging.info(f"Removing {srv3} initiated by {srv2}") await manager.remove_node(srv2.server_id, srv3.server_id, [], "Removenode failed. See earlier errors", False) - await log_file1.wait_for("raft topology: rejected removenode operation for node", timeout=60) + await log_file1.wait_for("raft_topology - rejected removenode operation for node", timeout=60) diff --git a/test/topology_custom/test_topology_failure_recovery.py b/test/topology_custom/test_topology_failure_recovery.py index c8956ae9bf..b73766a3b8 100644 --- a/test/topology_custom/test_topology_failure_recovery.py +++ b/test/topology_custom/test_topology_failure_recovery.py @@ -38,7 +38,7 @@ async def test_tablet_drain_failure_during_decommission(manager: ManagerClient): await manager.decommission_node(servers[2].server_id, expected_error="Decommission failed. See earlier errors") - matches = [await log.grep("storage_service - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 await cql.run_async("DROP KEYSPACE test;") diff --git a/test/topology_custom/test_topology_smp.py b/test/topology_custom/test_topology_smp.py index 2f90043864..1251c90054 100644 --- a/test/topology_custom/test_topology_smp.py +++ b/test/topology_custom/test_topology_smp.py @@ -50,7 +50,7 @@ async def test_nodes_with_different_smp(request: FixtureRequest, manager: Manage log_args = [ '--default-log-level', 'debug', '--logger-log-level', 'raft_group0=trace:group0_client=trace:storage_service=trace' - ':raft=trace:raft_group_registry=trace' + ':raft=trace:raft_group_registry=trace:raft_topology=trace' ] logger.info(f'Adding --smp=3 server') diff --git a/test/topology_experimental_raft/test_cdc_generation_clearing.py b/test/topology_experimental_raft/test_cdc_generation_clearing.py index cb47d22f53..7174e35e65 100644 --- a/test/topology_experimental_raft/test_cdc_generation_clearing.py +++ b/test/topology_experimental_raft/test_cdc_generation_clearing.py @@ -26,7 +26,7 @@ async def test_current_cdc_generation_is_not_removed(manager: ManagerClient): # We enable the injection to ensure that a too-late timestamp does not prevent removing the CDC generation. logger.info("Bootstrapping first node") server = await manager.server_add( - cmdline=['--logger-log-level', 'storage_service=trace'], + cmdline=['--logger-log-level', 'storage_service=trace:raft_topology=trace'], config={'error_injections_at_startup': ['clean_obsolete_cdc_generations_ignore_ts']} ) @@ -53,7 +53,7 @@ async def test_dependency_on_timestamps(manager: ManagerClient): and the topology coordinator's clock is too small. Then, test that the CDC generation publisher removes the clean-up candidate (together with older generations) if its timestamp is old enough.""" logger.info("Bootstrapping first node") - servers = [await manager.server_add(cmdline=['--logger-log-level', 'storage_service=trace'])] + servers = [await manager.server_add(cmdline=['--logger-log-level', 'storage_service=trace:raft_topology=trace'])] log_file1 = await manager.server_open_log(servers[0].server_id) mark: Optional[int] = None diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index 5259773455..292febeef9 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -79,6 +79,7 @@ async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(m '--logger-log-level', 'query_processor=trace', '--logger-log-level', 'gossip=trace', '--logger-log-level', 'storage_service=trace', + '--logger-log-level', 'raft_topology=trace', '--logger-log-level', 'messaging_service=trace', '--logger-log-level', 'rpc=trace', ] @@ -237,6 +238,7 @@ async def test_streaming_is_guarded_by_topology_guard(manager: ManagerClient): logger.info("Bootstrapping cluster") cmdline = [ '--logger-log-level', 'storage_service=trace', + '--logger-log-level', 'raft_topology=trace', ] servers = [await manager.server_add(cmdline=cmdline)]