migration_manager: move to use host ids instead of ips

Users also amended to pass ids instead of ips.
This commit is contained in:
Gleb Natapov
2024-11-21 10:33:50 +02:00
parent 2f23a21a23
commit cd9b349886
8 changed files with 77 additions and 66 deletions

View File

@@ -1555,14 +1555,14 @@ public:
// RPC handler
static future<repair_row_level_start_response>
repair_row_level_start_handler(repair_service& repair, gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, sstring ks_name, sstring cf_name,
repair_row_level_start_handler(repair_service& repair, gms::inet_address from, locator::host_id from_id, uint32_t src_cpu_id, uint32_t repair_meta_id, sstring ks_name, sstring cf_name,
dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size,
uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version, streaming::stream_reason reason,
gc_clock::time_point compaction_time, abort_source& as) {
rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}",
repair.my_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size);
try {
co_await repair.insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version), reason, compaction_time, as);
co_await repair.insert_repair_meta(from, from_id, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version), reason, compaction_time, as);
co_return repair_row_level_start_response{repair_row_level_start_status::ok};
} catch (replica::no_such_column_family&) {
co_return repair_row_level_start_response{repair_row_level_start_status::no_such_column_family};
@@ -2520,7 +2520,8 @@ future<> repair_service::init_ms_handlers() {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return container().invoke_on(shard, [from, src_cpu_id, repair_meta_id, ks_name, cf_name,
auto from_id = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(shard, [from, from_id, src_cpu_id, repair_meta_id, ks_name, cf_name,
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, schema_version, reason, compaction_time, this] (repair_service& local_repair) mutable {
if (!local_repair._view_builder.local_is_initialized()) {
return make_exception_future<repair_row_level_start_response>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later",
@@ -2528,7 +2529,7 @@ future<> repair_service::init_ms_handlers() {
}
streaming::stream_reason r = reason ? *reason : streaming::stream_reason::repair;
const gc_clock::time_point ct = compaction_time ? *compaction_time : gc_clock::now();
return repair_meta::repair_row_level_start_handler(local_repair, from, src_cpu_id, repair_meta_id, std::move(ks_name),
return repair_meta::repair_row_level_start_handler(local_repair, from, from_id, src_cpu_id, repair_meta_id, std::move(ks_name),
std::move(cf_name), std::move(range), algo, max_row_buf_size, seed,
shard_config{remote_shard, remote_shard_count, remote_ignore_msb},
schema_version, r, ct, _repair_module->abort_source());
@@ -3394,6 +3395,7 @@ repair_meta_ptr repair_service::get_repair_meta(gms::inet_address from, uint32_t
future<>
repair_service::insert_repair_meta(
const gms::inet_address& from,
locator::host_id from_id,
uint32_t src_cpu_id,
uint32_t repair_meta_id,
dht::token_range range,
@@ -3405,7 +3407,7 @@ repair_service::insert_repair_meta(
streaming::stream_reason reason,
gc_clock::time_point compaction_time,
abort_source& as) {
schema_ptr s = co_await get_migration_manager().get_schema_for_write(schema_version, {from, src_cpu_id}, get_messaging(), as);
schema_ptr s = co_await get_migration_manager().get_schema_for_write(schema_version, from_id, src_cpu_id, get_messaging(), as);
auto& db = get_db();
reader_permit permit = co_await db.local().obtain_reader_permit(db.local().find_column_family(s->id()), "repair-meta", db::no_timeout, {});
node_repair_meta_id id{from, repair_meta_id};

View File

@@ -233,6 +233,7 @@ public:
future<>
insert_repair_meta(
const gms::inet_address& from,
locator::host_id from_id,
uint32_t src_cpu_id,
uint32_t repair_meta_id,
dht::token_range range,

View File

@@ -8,6 +8,8 @@
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#include <algorithm>
#include <ranges>
#include <seastar/core/sleep.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
@@ -46,7 +48,7 @@ static logging::logger mlogger("migration_manager");
using namespace std::chrono_literals;
const std::chrono::milliseconds migration_manager::migration_delay = 60000ms;
static future<schema_ptr> get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, service::storage_proxy& sp);
static future<schema_ptr> get_schema_definition(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, service::storage_proxy& sp);
migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms,
service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks) :
@@ -122,7 +124,7 @@ void migration_manager::init_messaging_service()
}
ser::migration_manager_rpc_verbs::register_definitions_update(&_messaging, [this] (const rpc::client_info& cinfo, std::vector<frozen_mutation>, rpc::optional<std::vector<canonical_mutation>> cm) {
auto src = netw::messaging_service::get_source(cinfo);
auto src = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
if (!cm) {
on_internal_error(mlogger, ::format(
"definitions_update handler: canonical mutations not supported by {}", src));
@@ -209,7 +211,7 @@ future<> migration_notifier::unregister_listener(migration_listener* listener)
return _listeners.remove(listener);
}
void migration_manager::schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state)
void migration_manager::schedule_schema_pull(locator::host_id endpoint, const gms::endpoint_state& state)
{
if (!_enable_schema_pulls) {
mlogger.debug("Not pulling schema because schema pulls were disabled due to Raft.");
@@ -218,7 +220,7 @@ void migration_manager::schedule_schema_pull(const gms::inet_address& endpoint,
const auto* value = state.get_application_state_ptr(gms::application_state::SCHEMA);
if (endpoint != _messaging.broadcast_address() && value) {
if (endpoint != _gossiper.my_host_id() && value) {
// FIXME: discarded future
(void)maybe_schedule_schema_pull(table_schema_version(utils::UUID{value->value()}), endpoint).handle_exception([endpoint] (auto ep) {
mlogger.warn("Fail to pull schema from {}: {}", endpoint, ep);
@@ -278,12 +280,18 @@ future<> migration_manager::wait_for_schema_agreement(const replica::database& d
* If versions differ this node sends request with local migration list to the endpoint
* and expecting to receive a list of migrations to apply locally.
*/
future<> migration_manager::maybe_schedule_schema_pull(const table_schema_version& their_version, const gms::inet_address& endpoint)
future<> migration_manager::maybe_schedule_schema_pull(const table_schema_version& their_version, locator::host_id endpoint)
{
auto& proxy = _storage_proxy;
auto& db = proxy.get_db().local();
if (db.get_version() == their_version || !should_pull_schema_from(endpoint)) {
auto ip = _gossiper.get_address_map().find(endpoint);
if (!ip) {
mlogger.debug("No ip address for {}, not submitting migration task", endpoint);
return make_ready_future<>();
}
if (db.get_version() == their_version || !should_pull_schema_from(*ip)) {
mlogger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
return make_ready_future<>();
}
@@ -294,12 +302,12 @@ future<> migration_manager::maybe_schedule_schema_pull(const table_schema_versio
return submit_migration_task(endpoint);
}
return with_gate(_background_tasks, [this, &db, endpoint] {
return with_gate(_background_tasks, [this, &db, endpoint, ip = *ip] {
// Include a delay to make sure we have a chance to apply any changes being
// pushed out simultaneously. See CASSANDRA-5025
return sleep_abortable(migration_delay, _as).then([this, &db, endpoint] {
return sleep_abortable(migration_delay, _as).then([this, &db, endpoint, ip] {
// grab the latest version of the schema since it may have changed again since the initial scheduling
auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
auto ep_state = _gossiper.get_endpoint_state_ptr(ip);
if (!ep_state) {
mlogger.debug("epState vanished for {}, not submitting migration task", endpoint);
return make_ready_future<>();
@@ -326,14 +334,13 @@ future<> migration_manager::disable_schema_pulls() {
});
}
future<> migration_manager::submit_migration_task(const gms::inet_address& endpoint, bool can_ignore_down_node)
future<> migration_manager::submit_migration_task(locator::host_id id, bool can_ignore_down_node)
{
if (!_gossiper.is_alive(endpoint)) {
auto msg = format("Can't send migration request: node {} is down.", endpoint);
if (!_gossiper.is_alive(id)) {
auto msg = format("Can't send migration request: node {} is down.", id);
mlogger.warn("{}", msg);
return can_ignore_down_node ? make_ready_future<>() : make_exception_future<>(std::runtime_error(msg));
}
netw::messaging_service::msg_addr id{endpoint, 0};
return merge_schema_from(id).handle_exception([](std::exception_ptr e) {
try {
std::rethrow_exception(e);
@@ -344,7 +351,7 @@ future<> migration_manager::submit_migration_task(const gms::inet_address& endpo
});
}
future<> migration_manager::do_merge_schema_from(netw::messaging_service::msg_addr id)
future<> migration_manager::do_merge_schema_from(locator::host_id id)
{
mlogger.info("Pulling schema from {}", id);
abort_source as;
@@ -359,7 +366,7 @@ future<> migration_manager::do_merge_schema_from(netw::messaging_service::msg_ad
mlogger.info("Schema merge with {} completed", id);
}
future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr id)
future<> migration_manager::merge_schema_from(locator::host_id id)
{
if (_as.abort_requested()) {
return make_exception_future<>(abort_requested_exception());
@@ -373,7 +380,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
return res.first->second.trigger();
}
future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector<canonical_mutation>& canonical_mutations) {
future<> migration_manager::merge_schema_from(locator::host_id src, const std::vector<canonical_mutation>& canonical_mutations) {
canonical_mutation_merge_count++;
mlogger.debug("Applying schema mutations from {}", src);
auto& proxy = _storage_proxy;
@@ -883,9 +890,8 @@ future<std::vector<mutation>> prepare_view_drop_announcement(storage_proxy& sp,
}
}
future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoint, const std::vector<mutation>& schema)
future<> migration_manager::push_schema_mutation(locator::host_id id, const std::vector<mutation>& schema)
{
netw::messaging_service::msg_addr id{endpoint, 0};
auto schema_features = _feat.cluster_schema_features();
auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(schema, schema_features);
auto cm = std::vector<canonical_mutation>(adjusted_schema.begin(), adjusted_schema.end());
@@ -919,7 +925,9 @@ future<> migration_manager::announce_without_raft(std::vector<mutation> schema,
_messaging.knows_version(endpoint) &&
_messaging.get_raw_version(endpoint) == netw::messaging_service::current_version;
});
co_await coroutine::parallel_for_each(live_members,
// FIXME: gossiper should return host id set
auto live_host_ids = live_members | std::views::transform([&] (const gms::inet_address& ip) { return _gossiper.get_host_id(ip); });
co_await coroutine::parallel_for_each(live_host_ids,
std::bind(std::mem_fn(&migration_manager::push_schema_mutation), this, std::placeholders::_1, schema));
} catch (...) {
mlogger.error("failed to announce migration to all nodes: {}", std::current_exception());
@@ -1017,7 +1025,7 @@ future<> migration_manager::passive_announce() {
//
// The endpoint is the node from which 's' originated.
//
future<> migration_manager::maybe_sync(const schema_ptr& s, netw::messaging_service::msg_addr endpoint) {
future<> migration_manager::maybe_sync(const schema_ptr& s, locator::host_id endpoint) {
if (s->is_synced()) {
return make_ready_future<>();
}
@@ -1038,10 +1046,10 @@ future<> migration_manager::maybe_sync(const schema_ptr& s, netw::messaging_serv
// Returns schema of given version, either from cache or from remote node identified by 'from'.
// Doesn't affect current node's schema in any way.
static future<schema_ptr> get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, service::storage_proxy& storage_proxy) {
return local_schema_registry().get_or_load(v, [&ms, &storage_proxy, dst] (table_schema_version v) {
static future<schema_ptr> get_schema_definition(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, service::storage_proxy& storage_proxy) {
return local_schema_registry().get_or_load(v, [&ms, &storage_proxy, dst, shard] (table_schema_version v) {
mlogger.debug("Requesting schema {} from {}", v, dst);
return ser::migration_manager_rpc_verbs::send_get_schema_version(&ms, dst, static_cast<unsigned>(dst.cpu_id), v).then([&storage_proxy] (frozen_schema s) {
return ser::migration_manager_rpc_verbs::send_get_schema_version(&ms, dst, shard, v).then([&storage_proxy] (frozen_schema s) {
auto& proxy = storage_proxy.container();
// Since the latest schema version is always present in the schema registry
// we only happen to query already outdated schema version, which is
@@ -1076,11 +1084,11 @@ static future<schema_ptr> get_schema_definition(table_schema_version v, netw::me
});
}
future<schema_ptr> migration_manager::get_schema_for_read(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, abort_source& as) {
return get_schema_for_write(v, dst, ms, as);
future<schema_ptr> migration_manager::get_schema_for_read(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, abort_source& as) {
return get_schema_for_write(v, dst, shard, ms, as);
}
future<schema_ptr> migration_manager::get_schema_for_write(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, abort_source& as) {
future<schema_ptr> migration_manager::get_schema_for_write(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, abort_source& as) {
if (_as.abort_requested()) {
co_return coroutine::exception(std::make_exception_ptr(abort_requested_exception()));
}
@@ -1096,7 +1104,7 @@ future<schema_ptr> migration_manager::get_schema_for_write(table_schema_version
}
if (!s) {
s = co_await get_schema_definition(v, dst, ms, _storage_proxy);
s = co_await get_schema_definition(v, dst, shard, ms, _storage_proxy);
}
if (!s->is_synced()) {
@@ -1151,7 +1159,7 @@ future<> migration_manager::sync_schema(const replica::database& db, const std::
const auto& src = hosts.front();
mlogger.debug("Pulling schema {} from {}", schema, src);
bool can_ignore_down_node = false;
return submit_migration_task(src, can_ignore_down_node);
return submit_migration_task(_gossiper.get_host_id(src), can_ignore_down_node);
});
}
@@ -1164,7 +1172,7 @@ future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id
}
future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) {
schedule_schema_pull(endpoint, *ep_state);
schedule_schema_pull(ep_state->get_host_id(), *ep_state);
return make_ready_future();
}
@@ -1178,14 +1186,14 @@ future<> migration_manager::on_change(gms::inet_address endpoint, const gms::app
const auto host_id = _gossiper.get_host_id(endpoint);
const auto* node = _storage_proxy.get_token_metadata_ptr()->get_topology().find_node(host_id);
if (node && node->is_member()) {
schedule_schema_pull(endpoint, *ep_state);
schedule_schema_pull(host_id, *ep_state);
}
return make_ready_future<>();
});
}
future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) {
schedule_schema_pull(endpoint, *state);
schedule_schema_pull(state->get_host_id(), *state);
return make_ready_future();
}

View File

@@ -56,7 +56,7 @@ class migration_manager : public seastar::async_sharded_service<migration_manage
private:
migration_notifier& _notifier;
std::unordered_map<netw::msg_addr, serialized_action, netw::msg_addr::hash> _schema_pulls;
std::unordered_map<locator::host_id, serialized_action> _schema_pulls;
serialized_action _group0_barrier;
std::vector<gms::feature::listener_registration> _feature_listeners;
seastar::gate _background_tasks;
@@ -104,7 +104,7 @@ public:
// Disable schema pulls when Raft group 0 is fully responsible for managing schema.
future<> disable_schema_pulls();
future<> submit_migration_task(const gms::inet_address& endpoint, bool can_ignore_down_node = true);
future<> submit_migration_task(locator::host_id endpoint, bool can_ignore_down_node = true);
// Makes sure that this node knows about all schema changes known by "nodes" that were made prior to this call.
future<> sync_schema(const replica::database& db, const std::vector<gms::inet_address>& nodes);
@@ -112,13 +112,13 @@ public:
// Fetches schema from remote node and applies it locally.
// Differs from submit_migration_task() in that all errors are propagated.
// Coalesces requests.
future<> merge_schema_from(netw::msg_addr);
future<> do_merge_schema_from(netw::msg_addr);
future<> merge_schema_from(locator::host_id);
future<> do_merge_schema_from(locator::host_id);
future<> reload_schema();
// Merge mutations received from src.
// Keep mutations alive around whole async operation.
future<> merge_schema_from(netw::msg_addr src, const std::vector<canonical_mutation>& mutations);
future<> merge_schema_from(locator::host_id src, const std::vector<canonical_mutation>& mutations);
// Incremented each time the function above is called. Needed by tests.
size_t canonical_mutation_merge_count = 0;
@@ -158,30 +158,30 @@ private:
void init_messaging_service();
future<> uninit_messaging_service();
future<> push_schema_mutation(const gms::inet_address& endpoint, const std::vector<mutation>& schema);
future<> push_schema_mutation(locator::host_id endpoint, const std::vector<mutation>& schema);
future<> passive_announce();
void schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state);
void schedule_schema_pull(locator::host_id endpoint, const gms::endpoint_state& state);
future<> maybe_schedule_schema_pull(const table_schema_version& their_version, const gms::inet_address& endpoint);
future<> maybe_schedule_schema_pull(const table_schema_version& their_version, locator::host_id endpoint);
template<typename mutation_type = schema_change>
future<> announce_with_raft(std::vector<mutation> schema, group0_guard, std::string_view description);
future<> announce_without_raft(std::vector<mutation> schema, group0_guard);
public:
future<> maybe_sync(const schema_ptr& s, netw::msg_addr endpoint);
future<> maybe_sync(const schema_ptr& s, locator::host_id endpoint);
// Returns schema of given version, either from cache or from remote node identified by 'from'.
// The returned schema may not be synchronized. See schema::is_synced().
// Intended to be used in the read path.
future<schema_ptr> get_schema_for_read(table_schema_version, netw::msg_addr from, netw::messaging_service& ms, abort_source& as);
future<schema_ptr> get_schema_for_read(table_schema_version, locator::host_id from, unsigned shard, netw::messaging_service& ms, abort_source& as);
// Returns schema of given version, either from cache or from remote node identified by 'from'.
// Ensures that this node is synchronized with the returned schema. See schema::is_synced().
// Intended to be used in the write path, which relies on synchronized schema.
future<schema_ptr> get_schema_for_write(table_schema_version, netw::msg_addr from, netw::messaging_service& ms, abort_source& as);
future<schema_ptr> get_schema_for_write(table_schema_version, locator::host_id from, unsigned shard, netw::messaging_service& ms, abort_source& as);
private:
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override;

View File

@@ -186,7 +186,7 @@ future<> group0_state_machine::merge_and_apply(group0_state_machine_merger& merg
co_await std::visit(make_visitor(
[&] (schema_change& chng) -> future<> {
return _mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations));
return _mm.merge_schema_from(locator::host_id{cmd.creator_id.uuid()}, std::move(chng.mutations));
},
[&] (broadcast_table_query& query) -> future<> {
auto result = co_await service::broadcast_tables::execute_broadcast_table_query(_sp, query.query, cmd.new_state_id);
@@ -198,7 +198,7 @@ future<> group0_state_machine::merge_and_apply(group0_state_machine_merger& merg
co_await _ss.topology_transition({.tablets_hint = std::move(tablet_keys)});
},
[&] (mixed_change& chng) -> future<> {
co_await _mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations));
co_await _mm.merge_schema_from(locator::host_id{cmd.creator_id.uuid()}, std::move(chng.mutations));
co_await _ss.topology_transition();
co_return;
},
@@ -372,9 +372,7 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(as);
// FIXME: move schema merging to host id and pass hid here
// for now empty value is OK since it is used for logging only
co_await _mm.merge_schema_from(netw::messaging_service::msg_addr{gms::inet_address{}, 0}, std::move(*cm));
co_await _mm.merge_schema_from(hid, std::move(*cm));
if (topology_snp && !topology_snp->mutations.empty()) {
co_await _ss.merge_topology_snapshot(std::move(*topology_snp));

View File

@@ -40,6 +40,7 @@
#include <seastar/rpc/rpc_types.hh>
#include <stdexcept>
#include <csignal>
#include <unordered_set>
#include "idl/group0.dist.hh"
#include "idl/migration_manager.dist.hh"
@@ -1440,7 +1441,7 @@ static future<bool> wait_for_peers_to_enter_synchronize_state(
}
// Returning nullopt means we finished early (`can_finish_early` returned true).
static future<std::optional<std::unordered_map<gms::inet_address, table_schema_version>>>
static future<std::optional<std::unordered_map<locator::host_id, table_schema_version>>>
collect_schema_versions_from_group0_members(
netw::messaging_service& ms, const group0_members& members0,
const noncopyable_function<future<bool>()>& can_finish_early,
@@ -1448,17 +1449,20 @@ collect_schema_versions_from_group0_members(
static constexpr auto rpc_timeout = std::chrono::seconds{5};
static constexpr auto max_concurrency = 10;
std::unordered_map<gms::inet_address, table_schema_version> versions;
std::unordered_map<locator::host_id, table_schema_version> versions;
for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) {
// We fetch the config on each iteration; some nodes may leave.
auto current_config = members0.get_inet_addrs();
auto current_config = members0.get_members() |
std::views::transform([] (auto m) { return locator::host_id{m.addr.id.uuid()}; }) |
std::ranges::to<std::vector<locator::host_id>>();
if (current_config.empty()) {
continue;
}
bool failed = false;
co_await max_concurrent_for_each(current_config, max_concurrency, [&] (const gms::inet_address& node) -> future<> {
co_await max_concurrent_for_each(current_config, max_concurrency, [&] (locator::host_id& node) -> future<> {
if (versions.contains(node)) {
// This node was already contacted in a previous iteration.
co_return;
@@ -1467,8 +1471,8 @@ collect_schema_versions_from_group0_members(
try {
upgrade_log.info("synchronize_schema: `send_schema_check({})`", node);
versions.emplace(node,
co_await with_timeout(as, rpc_timeout, [&ms, addr = netw::msg_addr(node)] (abort_source& as) mutable {
return ser::migration_manager_rpc_verbs::send_schema_check(&ms, std::move(addr), as);
co_await with_timeout(as, rpc_timeout, [&ms, node] (abort_source& as) mutable {
return ser::migration_manager_rpc_verbs::send_schema_check(&ms, node, as);
}));
} catch (abort_requested_exception&) {
upgrade_log.warn("synchronize_schema: abort requested during `send_schema_check({})`", node);
@@ -1566,7 +1570,7 @@ static future<bool> synchronize_schema(
try {
upgrade_log.info("synchronize_schema: `merge_schema_from({})`", addr);
co_await mm.merge_schema_from(netw::msg_addr(addr));
co_await mm.merge_schema_from(addr);
} catch (const rpc::closed_error& e) {
upgrade_log.warn("synchronize_schema: `merge_schema_from({})` failed due to connection error: {}", addr, e);
last_pull_successful = false;

View File

@@ -483,16 +483,12 @@ public:
private:
future<schema_ptr> get_schema_for_read(table_schema_version v, locator::host_id from, uint32_t from_shard, clock_type::time_point timeout) {
abort_on_expiry aoe(timeout);
// FIXME: provide get_schema_for_read that gets host_id
auto ip = _gossiper.get_address_map().find(from);
co_return co_await _mm.get_schema_for_read(std::move(v), netw::msg_addr{ip.value(), from_shard}, _ms, aoe.abort_source());
co_return co_await _mm.get_schema_for_read(std::move(v), from, from_shard, _ms, aoe.abort_source());
}
future<schema_ptr> get_schema_for_write(table_schema_version v, locator::host_id from, uint32_t from_shard, clock_type::time_point timeout) {
abort_on_expiry aoe(timeout);
// FIXME: provide get_schema_for_write that gets host_id
auto ip = _gossiper.get_address_map().find(from);
co_return co_await _mm.get_schema_for_write(std::move(v), netw::msg_addr{ip.value(), from_shard}, _ms, aoe.abort_source());
co_return co_await _mm.get_schema_for_write(std::move(v), from, from_shard, _ms, aoe.abort_source());
}
future<replica::exception_variant> handle_counter_mutation(

View File

@@ -118,6 +118,8 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
rpc::source<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>> source,
rpc::optional<service::session_id> session) {
auto from = netw::messaging_service::get_source(cinfo);
auto src = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id);
sslog.trace("Got stream_mutation_fragments from {} reason {}, session {}", from, int(reason), session);
@@ -125,7 +127,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
return make_exception_future<rpc::sink<int>>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later",
_db.local().get_token_metadata().get_topology().my_address())));
}
return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, &as] (schema_ptr s) mutable {
return _mm.local().get_schema_for_write(schema_id, src, from.cpu_id, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, &as] (schema_ptr s) mutable {
auto permit = _db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(s, "stream-session", db::no_timeout, {});
struct stream_mutation_fragments_cmd_status {
bool got_cmd = false;