From 360c4f86082ed18b60ae31b57413f50b0babb103 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 24 Aug 2022 09:50:44 +0300 Subject: [PATCH] dht: Carry dc-rack over boot_strapper and range_streamer Both classes may populate (temporarly clones of) token metadata object with endpoint:tokens pairs for the endpoint they work with. Next patches will require that endpoint comes with the dc/rack info. This patch makes sure dht classes have the necessary information at hand (for now it's just empty pair of strings). Signed-off-by: Pavel Emelyanov --- dht/boot_strapper.cc | 2 +- dht/boot_strapper.hh | 6 +++++- dht/range_streamer.hh | 10 +++++++--- service/storage_service.cc | 12 ++++++------ 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/dht/boot_strapper.cc b/dht/boot_strapper.cc index b2e8e9da7f..b6d3094b53 100644 --- a/dht/boot_strapper.cc +++ b/dht/boot_strapper.cc @@ -39,7 +39,7 @@ future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper throw std::runtime_error("Wrong stream_reason provided: it can only be replace or bootstrap"); } try { - auto streamer = make_lw_shared(_db, _stream_manager, _token_metadata_ptr, _abort_source, _tokens, _address, description, reason); + auto streamer = make_lw_shared(_db, _stream_manager, _token_metadata_ptr, _abort_source, _tokens, _address, _dr, description, reason); auto nodes_to_filter = gossiper.get_unreachable_members(); if (reason == streaming::stream_reason::replace) { nodes_to_filter.insert(std::move(replace_address)); diff --git a/dht/boot_strapper.hh b/dht/boot_strapper.hh index 6884959da6..28da8eec84 100644 --- a/dht/boot_strapper.hh +++ b/dht/boot_strapper.hh @@ -35,15 +35,19 @@ class boot_strapper { abort_source& _abort_source; /* endpoint that needs to be bootstrapped */ inet_address _address; + /* its DC/RACK info */ + locator::endpoint_dc_rack _dr; /* token of the node being bootstrapped. */ std::unordered_set _tokens; const token_metadata_ptr _token_metadata_ptr; public: - boot_strapper(distributed& db, sharded& sm, abort_source& abort_source, inet_address addr, std::unordered_set tokens, const token_metadata_ptr tmptr) + boot_strapper(distributed& db, sharded& sm, abort_source& abort_source, + inet_address addr, locator::endpoint_dc_rack dr, std::unordered_set tokens, const token_metadata_ptr tmptr) : _db(db) , _stream_manager(sm) , _abort_source(abort_source) , _address(addr) + , _dr(std::move(dr)) , _tokens(tokens) , _token_metadata_ptr(std::move(tmptr)) { } diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index 0ecacca185..fb5c92c808 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -76,21 +76,24 @@ public: } }; - range_streamer(distributed& db, sharded& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set tokens, inet_address address, sstring description, streaming::stream_reason reason) + range_streamer(distributed& db, sharded& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set tokens, + inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason) : _db(db) , _stream_manager(sm) , _token_metadata_ptr(std::move(tmptr)) , _abort_source(abort_source) , _tokens(std::move(tokens)) , _address(address) + , _dr(std::move(dr)) , _description(std::move(description)) , _reason(reason) { _abort_source.check(); } - range_streamer(distributed& db, sharded& sm, const token_metadata_ptr tmptr, abort_source& abort_source, inet_address address, sstring description, streaming::stream_reason reason) - : range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set(), address, description, reason) { + range_streamer(distributed& db, sharded& sm, const token_metadata_ptr tmptr, abort_source& abort_source, + inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason) + : range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set(), address, std::move(dr), description, reason) { } void add_source_filter(std::unique_ptr filter) { @@ -149,6 +152,7 @@ private: abort_source& _abort_source; std::unordered_set _tokens; inet_address _address; + locator::endpoint_dc_rack _dr; sstring _description; streaming::stream_reason _reason; std::unordered_multimap> _to_stream; diff --git a/service/storage_service.cc b/service/storage_service.cc index 65daef9b2b..f94ee69e23 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -705,7 +705,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st slogger.info("sleeping {} ms for pending range setup", get_ring_delay().count()); _gossiper.wait_for_range_setup().get(); - dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), bootstrap_tokens, get_token_metadata_ptr()); + dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), {}, bootstrap_tokens, get_token_metadata_ptr()); slogger.info("Starting to bootstrap..."); bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper).get(); } else { @@ -2229,7 +2229,7 @@ void storage_service::run_replace_ops(std::unordered_set& bootstrap_token _repair.local().replace_with_repair(get_token_metadata_ptr(), bootstrap_tokens, ignore_nodes).get(); } else { slogger.info("replace[{}]: Using streaming based node ops to sync data", uuid); - dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), bootstrap_tokens, get_token_metadata_ptr()); + dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), {}, bootstrap_tokens, get_token_metadata_ptr()); bs.bootstrap(streaming::stream_reason::replace, _gossiper, replace_address).get(); } @@ -2664,7 +2664,7 @@ future<> storage_service::rebuild(sstring source_dc) { co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); } else { auto streamer = make_lw_shared(ss._db, ss._stream_manager, tmptr, ss._abort_source, - ss.get_broadcast_address(), "Rebuild", streaming::stream_reason::rebuild); + ss.get_broadcast_address(), locator::endpoint_dc_rack{}, "Rebuild", streaming::stream_reason::rebuild); streamer->add_source_filter(std::make_unique(ss._gossiper.get_unreachable_members())); if (source_dc != "") { streamer->add_source_filter(std::make_unique(source_dc)); @@ -2839,7 +2839,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, as.request_abort(); } }); - auto streamer = make_lw_shared(_db, _stream_manager, tmptr, as, get_broadcast_address(), "Removenode", streaming::stream_reason::removenode); + auto streamer = make_lw_shared(_db, _stream_manager, tmptr, as, get_broadcast_address(), locator::endpoint_dc_rack{}, "Removenode", streaming::stream_reason::removenode); removenode_add_ranges(streamer, leaving_node).get(); try { streamer->stream_async().get(); @@ -2866,7 +2866,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr as.request_abort(); } }); - auto streamer = make_lw_shared(_db, _stream_manager, tmptr, as, get_broadcast_address(), "Restore_replica_count", streaming::stream_reason::removenode); + auto streamer = make_lw_shared(_db, _stream_manager, tmptr, as, get_broadcast_address(), locator::endpoint_dc_rack{}, "Restore_replica_count", streaming::stream_reason::removenode); removenode_add_ranges(streamer, endpoint).get(); auto status_checker = seastar::async([this, endpoint, &as] { slogger.info("restore_replica_count: Started status checker for removing node {}", endpoint); @@ -2986,7 +2986,7 @@ future<> storage_service::leave_ring() { future<> storage_service::stream_ranges(std::unordered_map> ranges_to_stream_by_keyspace) { - auto streamer = make_lw_shared(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_broadcast_address(), "Unbootstrap", streaming::stream_reason::decommission); + auto streamer = make_lw_shared(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_broadcast_address(), locator::endpoint_dc_rack{}, "Unbootstrap", streaming::stream_reason::decommission); for (auto& entry : ranges_to_stream_by_keyspace) { const auto& keyspace = entry.first; auto& ranges_with_endpoints = entry.second;