diff --git a/repair/row_level.cc b/repair/row_level.cc index 95d1135c83..7819e20cc6 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1555,14 +1555,14 @@ public: // RPC handler static future - repair_row_level_start_handler(repair_service& repair, gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, sstring ks_name, sstring cf_name, + repair_row_level_start_handler(repair_service& repair, gms::inet_address from, locator::host_id from_id, uint32_t src_cpu_id, uint32_t repair_meta_id, sstring ks_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time, abort_source& as) { rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}", repair.my_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size); try { - co_await repair.insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version), reason, compaction_time, as); + co_await repair.insert_repair_meta(from, from_id, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version), reason, compaction_time, as); co_return repair_row_level_start_response{repair_row_level_start_status::ok}; } catch (replica::no_such_column_family&) { co_return repair_row_level_start_response{repair_row_level_start_status::no_such_column_family}; @@ -2520,7 +2520,8 @@ future<> repair_service::init_ms_handlers() { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); auto from = cinfo.retrieve_auxiliary("baddr"); - return container().invoke_on(shard, [from, src_cpu_id, repair_meta_id, ks_name, cf_name, + auto from_id = cinfo.retrieve_auxiliary("host_id"); + return container().invoke_on(shard, [from, from_id, src_cpu_id, repair_meta_id, ks_name, cf_name, range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, schema_version, reason, compaction_time, this] (repair_service& local_repair) mutable { if (!local_repair._view_builder.local_is_initialized()) { return make_exception_future(std::runtime_error(format("Node {} is not fully initialized for repair, try again later", @@ -2528,7 +2529,7 @@ future<> repair_service::init_ms_handlers() { } streaming::stream_reason r = reason ? *reason : streaming::stream_reason::repair; const gc_clock::time_point ct = compaction_time ? *compaction_time : gc_clock::now(); - return repair_meta::repair_row_level_start_handler(local_repair, from, src_cpu_id, repair_meta_id, std::move(ks_name), + return repair_meta::repair_row_level_start_handler(local_repair, from, from_id, src_cpu_id, repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, shard_config{remote_shard, remote_shard_count, remote_ignore_msb}, schema_version, r, ct, _repair_module->abort_source()); @@ -3394,6 +3395,7 @@ repair_meta_ptr repair_service::get_repair_meta(gms::inet_address from, uint32_t future<> repair_service::insert_repair_meta( const gms::inet_address& from, + locator::host_id from_id, uint32_t src_cpu_id, uint32_t repair_meta_id, dht::token_range range, @@ -3405,7 +3407,7 @@ repair_service::insert_repair_meta( streaming::stream_reason reason, gc_clock::time_point compaction_time, abort_source& as) { - schema_ptr s = co_await get_migration_manager().get_schema_for_write(schema_version, {from, src_cpu_id}, get_messaging(), as); + schema_ptr s = co_await get_migration_manager().get_schema_for_write(schema_version, from_id, src_cpu_id, get_messaging(), as); auto& db = get_db(); reader_permit permit = co_await db.local().obtain_reader_permit(db.local().find_column_family(s->id()), "repair-meta", db::no_timeout, {}); node_repair_meta_id id{from, repair_meta_id}; diff --git a/repair/row_level.hh b/repair/row_level.hh index e431a5d8df..171e8d5ba4 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -233,6 +233,7 @@ public: future<> insert_repair_meta( const gms::inet_address& from, + locator::host_id from_id, uint32_t src_cpu_id, uint32_t repair_meta_id, dht::token_range range, diff --git a/service/migration_manager.cc b/service/migration_manager.cc index fb9910a503..06940a3ee9 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -8,6 +8,8 @@ * SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0) */ +#include +#include #include #include #include @@ -46,7 +48,7 @@ static logging::logger mlogger("migration_manager"); using namespace std::chrono_literals; const std::chrono::milliseconds migration_manager::migration_delay = 60000ms; -static future get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, service::storage_proxy& sp); +static future get_schema_definition(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, service::storage_proxy& sp); migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms, service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded& sysks) : @@ -122,7 +124,7 @@ void migration_manager::init_messaging_service() } ser::migration_manager_rpc_verbs::register_definitions_update(&_messaging, [this] (const rpc::client_info& cinfo, std::vector, rpc::optional> cm) { - auto src = netw::messaging_service::get_source(cinfo); + auto src = cinfo.retrieve_auxiliary("host_id"); if (!cm) { on_internal_error(mlogger, ::format( "definitions_update handler: canonical mutations not supported by {}", src)); @@ -209,7 +211,7 @@ future<> migration_notifier::unregister_listener(migration_listener* listener) return _listeners.remove(listener); } -void migration_manager::schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state) +void migration_manager::schedule_schema_pull(locator::host_id endpoint, const gms::endpoint_state& state) { if (!_enable_schema_pulls) { mlogger.debug("Not pulling schema because schema pulls were disabled due to Raft."); @@ -218,7 +220,7 @@ void migration_manager::schedule_schema_pull(const gms::inet_address& endpoint, const auto* value = state.get_application_state_ptr(gms::application_state::SCHEMA); - if (endpoint != _messaging.broadcast_address() && value) { + if (endpoint != _gossiper.my_host_id() && value) { // FIXME: discarded future (void)maybe_schedule_schema_pull(table_schema_version(utils::UUID{value->value()}), endpoint).handle_exception([endpoint] (auto ep) { mlogger.warn("Fail to pull schema from {}: {}", endpoint, ep); @@ -278,12 +280,18 @@ future<> migration_manager::wait_for_schema_agreement(const replica::database& d * If versions differ this node sends request with local migration list to the endpoint * and expecting to receive a list of migrations to apply locally. */ -future<> migration_manager::maybe_schedule_schema_pull(const table_schema_version& their_version, const gms::inet_address& endpoint) +future<> migration_manager::maybe_schedule_schema_pull(const table_schema_version& their_version, locator::host_id endpoint) { auto& proxy = _storage_proxy; auto& db = proxy.get_db().local(); - if (db.get_version() == their_version || !should_pull_schema_from(endpoint)) { + auto ip = _gossiper.get_address_map().find(endpoint); + if (!ip) { + mlogger.debug("No ip address for {}, not submitting migration task", endpoint); + return make_ready_future<>(); + } + + if (db.get_version() == their_version || !should_pull_schema_from(*ip)) { mlogger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false"); return make_ready_future<>(); } @@ -294,12 +302,12 @@ future<> migration_manager::maybe_schedule_schema_pull(const table_schema_versio return submit_migration_task(endpoint); } - return with_gate(_background_tasks, [this, &db, endpoint] { + return with_gate(_background_tasks, [this, &db, endpoint, ip = *ip] { // Include a delay to make sure we have a chance to apply any changes being // pushed out simultaneously. See CASSANDRA-5025 - return sleep_abortable(migration_delay, _as).then([this, &db, endpoint] { + return sleep_abortable(migration_delay, _as).then([this, &db, endpoint, ip] { // grab the latest version of the schema since it may have changed again since the initial scheduling - auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint); + auto ep_state = _gossiper.get_endpoint_state_ptr(ip); if (!ep_state) { mlogger.debug("epState vanished for {}, not submitting migration task", endpoint); return make_ready_future<>(); @@ -326,14 +334,13 @@ future<> migration_manager::disable_schema_pulls() { }); } -future<> migration_manager::submit_migration_task(const gms::inet_address& endpoint, bool can_ignore_down_node) +future<> migration_manager::submit_migration_task(locator::host_id id, bool can_ignore_down_node) { - if (!_gossiper.is_alive(endpoint)) { - auto msg = format("Can't send migration request: node {} is down.", endpoint); + if (!_gossiper.is_alive(id)) { + auto msg = format("Can't send migration request: node {} is down.", id); mlogger.warn("{}", msg); return can_ignore_down_node ? make_ready_future<>() : make_exception_future<>(std::runtime_error(msg)); } - netw::messaging_service::msg_addr id{endpoint, 0}; return merge_schema_from(id).handle_exception([](std::exception_ptr e) { try { std::rethrow_exception(e); @@ -344,7 +351,7 @@ future<> migration_manager::submit_migration_task(const gms::inet_address& endpo }); } -future<> migration_manager::do_merge_schema_from(netw::messaging_service::msg_addr id) +future<> migration_manager::do_merge_schema_from(locator::host_id id) { mlogger.info("Pulling schema from {}", id); abort_source as; @@ -359,7 +366,7 @@ future<> migration_manager::do_merge_schema_from(netw::messaging_service::msg_ad mlogger.info("Schema merge with {} completed", id); } -future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr id) +future<> migration_manager::merge_schema_from(locator::host_id id) { if (_as.abort_requested()) { return make_exception_future<>(abort_requested_exception()); @@ -373,7 +380,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr return res.first->second.trigger(); } -future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector& canonical_mutations) { +future<> migration_manager::merge_schema_from(locator::host_id src, const std::vector& canonical_mutations) { canonical_mutation_merge_count++; mlogger.debug("Applying schema mutations from {}", src); auto& proxy = _storage_proxy; @@ -883,9 +890,8 @@ future> prepare_view_drop_announcement(storage_proxy& sp, } } -future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoint, const std::vector& schema) +future<> migration_manager::push_schema_mutation(locator::host_id id, const std::vector& schema) { - netw::messaging_service::msg_addr id{endpoint, 0}; auto schema_features = _feat.cluster_schema_features(); auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(schema, schema_features); auto cm = std::vector(adjusted_schema.begin(), adjusted_schema.end()); @@ -919,7 +925,9 @@ future<> migration_manager::announce_without_raft(std::vector schema, _messaging.knows_version(endpoint) && _messaging.get_raw_version(endpoint) == netw::messaging_service::current_version; }); - co_await coroutine::parallel_for_each(live_members, + // FIXME: gossiper should return host id set + auto live_host_ids = live_members | std::views::transform([&] (const gms::inet_address& ip) { return _gossiper.get_host_id(ip); }); + co_await coroutine::parallel_for_each(live_host_ids, std::bind(std::mem_fn(&migration_manager::push_schema_mutation), this, std::placeholders::_1, schema)); } catch (...) { mlogger.error("failed to announce migration to all nodes: {}", std::current_exception()); @@ -1017,7 +1025,7 @@ future<> migration_manager::passive_announce() { // // The endpoint is the node from which 's' originated. // -future<> migration_manager::maybe_sync(const schema_ptr& s, netw::messaging_service::msg_addr endpoint) { +future<> migration_manager::maybe_sync(const schema_ptr& s, locator::host_id endpoint) { if (s->is_synced()) { return make_ready_future<>(); } @@ -1038,10 +1046,10 @@ future<> migration_manager::maybe_sync(const schema_ptr& s, netw::messaging_serv // Returns schema of given version, either from cache or from remote node identified by 'from'. // Doesn't affect current node's schema in any way. -static future get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, service::storage_proxy& storage_proxy) { - return local_schema_registry().get_or_load(v, [&ms, &storage_proxy, dst] (table_schema_version v) { +static future get_schema_definition(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, service::storage_proxy& storage_proxy) { + return local_schema_registry().get_or_load(v, [&ms, &storage_proxy, dst, shard] (table_schema_version v) { mlogger.debug("Requesting schema {} from {}", v, dst); - return ser::migration_manager_rpc_verbs::send_get_schema_version(&ms, dst, static_cast(dst.cpu_id), v).then([&storage_proxy] (frozen_schema s) { + return ser::migration_manager_rpc_verbs::send_get_schema_version(&ms, dst, shard, v).then([&storage_proxy] (frozen_schema s) { auto& proxy = storage_proxy.container(); // Since the latest schema version is always present in the schema registry // we only happen to query already outdated schema version, which is @@ -1076,11 +1084,11 @@ static future get_schema_definition(table_schema_version v, netw::me }); } -future migration_manager::get_schema_for_read(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, abort_source& as) { - return get_schema_for_write(v, dst, ms, as); +future migration_manager::get_schema_for_read(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, abort_source& as) { + return get_schema_for_write(v, dst, shard, ms, as); } -future migration_manager::get_schema_for_write(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, abort_source& as) { +future migration_manager::get_schema_for_write(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, abort_source& as) { if (_as.abort_requested()) { co_return coroutine::exception(std::make_exception_ptr(abort_requested_exception())); } @@ -1096,7 +1104,7 @@ future migration_manager::get_schema_for_write(table_schema_version } if (!s) { - s = co_await get_schema_definition(v, dst, ms, _storage_proxy); + s = co_await get_schema_definition(v, dst, shard, ms, _storage_proxy); } if (!s->is_synced()) { @@ -1151,7 +1159,7 @@ future<> migration_manager::sync_schema(const replica::database& db, const std:: const auto& src = hosts.front(); mlogger.debug("Pulling schema {} from {}", schema, src); bool can_ignore_down_node = false; - return submit_migration_task(src, can_ignore_down_node); + return submit_migration_task(_gossiper.get_host_id(src), can_ignore_down_node); }); } @@ -1164,7 +1172,7 @@ future get_column_mapping(db::system_keyspace& sys_ks, table_id } future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) { - schedule_schema_pull(endpoint, *ep_state); + schedule_schema_pull(ep_state->get_host_id(), *ep_state); return make_ready_future(); } @@ -1178,14 +1186,14 @@ future<> migration_manager::on_change(gms::inet_address endpoint, const gms::app const auto host_id = _gossiper.get_host_id(endpoint); const auto* node = _storage_proxy.get_token_metadata_ptr()->get_topology().find_node(host_id); if (node && node->is_member()) { - schedule_schema_pull(endpoint, *ep_state); + schedule_schema_pull(host_id, *ep_state); } return make_ready_future<>(); }); } future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) { - schedule_schema_pull(endpoint, *state); + schedule_schema_pull(state->get_host_id(), *state); return make_ready_future(); } diff --git a/service/migration_manager.hh b/service/migration_manager.hh index b2b5a674d8..5779c0fa11 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -56,7 +56,7 @@ class migration_manager : public seastar::async_sharded_service _schema_pulls; + std::unordered_map _schema_pulls; serialized_action _group0_barrier; std::vector _feature_listeners; seastar::gate _background_tasks; @@ -104,7 +104,7 @@ public: // Disable schema pulls when Raft group 0 is fully responsible for managing schema. future<> disable_schema_pulls(); - future<> submit_migration_task(const gms::inet_address& endpoint, bool can_ignore_down_node = true); + future<> submit_migration_task(locator::host_id endpoint, bool can_ignore_down_node = true); // Makes sure that this node knows about all schema changes known by "nodes" that were made prior to this call. future<> sync_schema(const replica::database& db, const std::vector& nodes); @@ -112,13 +112,13 @@ public: // Fetches schema from remote node and applies it locally. // Differs from submit_migration_task() in that all errors are propagated. // Coalesces requests. - future<> merge_schema_from(netw::msg_addr); - future<> do_merge_schema_from(netw::msg_addr); + future<> merge_schema_from(locator::host_id); + future<> do_merge_schema_from(locator::host_id); future<> reload_schema(); // Merge mutations received from src. // Keep mutations alive around whole async operation. - future<> merge_schema_from(netw::msg_addr src, const std::vector& mutations); + future<> merge_schema_from(locator::host_id src, const std::vector& mutations); // Incremented each time the function above is called. Needed by tests. size_t canonical_mutation_merge_count = 0; @@ -158,30 +158,30 @@ private: void init_messaging_service(); future<> uninit_messaging_service(); - future<> push_schema_mutation(const gms::inet_address& endpoint, const std::vector& schema); + future<> push_schema_mutation(locator::host_id endpoint, const std::vector& schema); future<> passive_announce(); - void schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state); + void schedule_schema_pull(locator::host_id endpoint, const gms::endpoint_state& state); - future<> maybe_schedule_schema_pull(const table_schema_version& their_version, const gms::inet_address& endpoint); + future<> maybe_schedule_schema_pull(const table_schema_version& their_version, locator::host_id endpoint); template future<> announce_with_raft(std::vector schema, group0_guard, std::string_view description); future<> announce_without_raft(std::vector schema, group0_guard); public: - future<> maybe_sync(const schema_ptr& s, netw::msg_addr endpoint); + future<> maybe_sync(const schema_ptr& s, locator::host_id endpoint); // Returns schema of given version, either from cache or from remote node identified by 'from'. // The returned schema may not be synchronized. See schema::is_synced(). // Intended to be used in the read path. - future get_schema_for_read(table_schema_version, netw::msg_addr from, netw::messaging_service& ms, abort_source& as); + future get_schema_for_read(table_schema_version, locator::host_id from, unsigned shard, netw::messaging_service& ms, abort_source& as); // Returns schema of given version, either from cache or from remote node identified by 'from'. // Ensures that this node is synchronized with the returned schema. See schema::is_synced(). // Intended to be used in the write path, which relies on synchronized schema. - future get_schema_for_write(table_schema_version, netw::msg_addr from, netw::messaging_service& ms, abort_source& as); + future get_schema_for_write(table_schema_version, locator::host_id from, unsigned shard, netw::messaging_service& ms, abort_source& as); private: virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override; diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc index 30e1a79409..efdaba2ade 100644 --- a/service/raft/group0_state_machine.cc +++ b/service/raft/group0_state_machine.cc @@ -186,7 +186,7 @@ future<> group0_state_machine::merge_and_apply(group0_state_machine_merger& merg co_await std::visit(make_visitor( [&] (schema_change& chng) -> future<> { - return _mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations)); + return _mm.merge_schema_from(locator::host_id{cmd.creator_id.uuid()}, std::move(chng.mutations)); }, [&] (broadcast_table_query& query) -> future<> { auto result = co_await service::broadcast_tables::execute_broadcast_table_query(_sp, query.query, cmd.new_state_id); @@ -198,7 +198,7 @@ future<> group0_state_machine::merge_and_apply(group0_state_machine_merger& merg co_await _ss.topology_transition({.tablets_hint = std::move(tablet_keys)}); }, [&] (mixed_change& chng) -> future<> { - co_await _mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations)); + co_await _mm.merge_schema_from(locator::host_id{cmd.creator_id.uuid()}, std::move(chng.mutations)); co_await _ss.topology_transition(); co_return; }, @@ -372,9 +372,7 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft:: auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(as); - // FIXME: move schema merging to host id and pass hid here - // for now empty value is OK since it is used for logging only - co_await _mm.merge_schema_from(netw::messaging_service::msg_addr{gms::inet_address{}, 0}, std::move(*cm)); + co_await _mm.merge_schema_from(hid, std::move(*cm)); if (topology_snp && !topology_snp->mutations.empty()) { co_await _ss.merge_topology_snapshot(std::move(*topology_snp)); diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index ea68f41bef..08c663d694 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -40,6 +40,7 @@ #include #include #include +#include #include "idl/group0.dist.hh" #include "idl/migration_manager.dist.hh" @@ -1440,7 +1441,7 @@ static future wait_for_peers_to_enter_synchronize_state( } // Returning nullopt means we finished early (`can_finish_early` returned true). -static future>> +static future>> collect_schema_versions_from_group0_members( netw::messaging_service& ms, const group0_members& members0, const noncopyable_function()>& can_finish_early, @@ -1448,17 +1449,20 @@ collect_schema_versions_from_group0_members( static constexpr auto rpc_timeout = std::chrono::seconds{5}; static constexpr auto max_concurrency = 10; - std::unordered_map versions; + std::unordered_map versions; for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { // We fetch the config on each iteration; some nodes may leave. - auto current_config = members0.get_inet_addrs(); + auto current_config = members0.get_members() | + std::views::transform([] (auto m) { return locator::host_id{m.addr.id.uuid()}; }) | + std::ranges::to>(); + if (current_config.empty()) { continue; } bool failed = false; - co_await max_concurrent_for_each(current_config, max_concurrency, [&] (const gms::inet_address& node) -> future<> { + co_await max_concurrent_for_each(current_config, max_concurrency, [&] (locator::host_id& node) -> future<> { if (versions.contains(node)) { // This node was already contacted in a previous iteration. co_return; @@ -1467,8 +1471,8 @@ collect_schema_versions_from_group0_members( try { upgrade_log.info("synchronize_schema: `send_schema_check({})`", node); versions.emplace(node, - co_await with_timeout(as, rpc_timeout, [&ms, addr = netw::msg_addr(node)] (abort_source& as) mutable { - return ser::migration_manager_rpc_verbs::send_schema_check(&ms, std::move(addr), as); + co_await with_timeout(as, rpc_timeout, [&ms, node] (abort_source& as) mutable { + return ser::migration_manager_rpc_verbs::send_schema_check(&ms, node, as); })); } catch (abort_requested_exception&) { upgrade_log.warn("synchronize_schema: abort requested during `send_schema_check({})`", node); @@ -1566,7 +1570,7 @@ static future synchronize_schema( try { upgrade_log.info("synchronize_schema: `merge_schema_from({})`", addr); - co_await mm.merge_schema_from(netw::msg_addr(addr)); + co_await mm.merge_schema_from(addr); } catch (const rpc::closed_error& e) { upgrade_log.warn("synchronize_schema: `merge_schema_from({})` failed due to connection error: {}", addr, e); last_pull_successful = false; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index b3e54c974c..0207ee05e2 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -483,16 +483,12 @@ public: private: future get_schema_for_read(table_schema_version v, locator::host_id from, uint32_t from_shard, clock_type::time_point timeout) { abort_on_expiry aoe(timeout); - // FIXME: provide get_schema_for_read that gets host_id - auto ip = _gossiper.get_address_map().find(from); - co_return co_await _mm.get_schema_for_read(std::move(v), netw::msg_addr{ip.value(), from_shard}, _ms, aoe.abort_source()); + co_return co_await _mm.get_schema_for_read(std::move(v), from, from_shard, _ms, aoe.abort_source()); } future get_schema_for_write(table_schema_version v, locator::host_id from, uint32_t from_shard, clock_type::time_point timeout) { abort_on_expiry aoe(timeout); - // FIXME: provide get_schema_for_write that gets host_id - auto ip = _gossiper.get_address_map().find(from); - co_return co_await _mm.get_schema_for_write(std::move(v), netw::msg_addr{ip.value(), from_shard}, _ms, aoe.abort_source()); + co_return co_await _mm.get_schema_for_write(std::move(v), from, from_shard, _ms, aoe.abort_source()); } future handle_counter_mutation( diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 052aa56ec4..57ab8a775a 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -118,6 +118,8 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { rpc::source> source, rpc::optional session) { auto from = netw::messaging_service::get_source(cinfo); + auto src = cinfo.retrieve_auxiliary("host_id"); + auto reason = reason_opt ? *reason_opt: stream_reason::unspecified; service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id); sslog.trace("Got stream_mutation_fragments from {} reason {}, session {}", from, int(reason), session); @@ -125,7 +127,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { return make_exception_future>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later", _db.local().get_token_metadata().get_topology().my_address()))); } - return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, &as] (schema_ptr s) mutable { + return _mm.local().get_schema_for_write(schema_id, src, from.cpu_id, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, &as] (schema_ptr s) mutable { auto permit = _db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(s, "stream-session", db::no_timeout, {}); struct stream_mutation_fragments_cmd_status { bool got_cmd = false;