dht: Carry dc-rack over boot_strapper and range_streamer
Both classes may populate (temporarly clones of) token metadata object with endpoint:tokens pairs for the endpoint they work with. Next patches will require that endpoint comes with the dc/rack info. This patch makes sure dht classes have the necessary information at hand (for now it's just empty pair of strings). Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -39,7 +39,7 @@ future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper
|
||||
throw std::runtime_error("Wrong stream_reason provided: it can only be replace or bootstrap");
|
||||
}
|
||||
try {
|
||||
auto streamer = make_lw_shared<range_streamer>(_db, _stream_manager, _token_metadata_ptr, _abort_source, _tokens, _address, description, reason);
|
||||
auto streamer = make_lw_shared<range_streamer>(_db, _stream_manager, _token_metadata_ptr, _abort_source, _tokens, _address, _dr, description, reason);
|
||||
auto nodes_to_filter = gossiper.get_unreachable_members();
|
||||
if (reason == streaming::stream_reason::replace) {
|
||||
nodes_to_filter.insert(std::move(replace_address));
|
||||
|
||||
@@ -35,15 +35,19 @@ class boot_strapper {
|
||||
abort_source& _abort_source;
|
||||
/* endpoint that needs to be bootstrapped */
|
||||
inet_address _address;
|
||||
/* its DC/RACK info */
|
||||
locator::endpoint_dc_rack _dr;
|
||||
/* token of the node being bootstrapped. */
|
||||
std::unordered_set<token> _tokens;
|
||||
const token_metadata_ptr _token_metadata_ptr;
|
||||
public:
|
||||
boot_strapper(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, abort_source& abort_source, inet_address addr, std::unordered_set<token> tokens, const token_metadata_ptr tmptr)
|
||||
boot_strapper(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, abort_source& abort_source,
|
||||
inet_address addr, locator::endpoint_dc_rack dr, std::unordered_set<token> tokens, const token_metadata_ptr tmptr)
|
||||
: _db(db)
|
||||
, _stream_manager(sm)
|
||||
, _abort_source(abort_source)
|
||||
, _address(addr)
|
||||
, _dr(std::move(dr))
|
||||
, _tokens(tokens)
|
||||
, _token_metadata_ptr(std::move(tmptr)) {
|
||||
}
|
||||
|
||||
@@ -76,21 +76,24 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set<token> tokens, inet_address address, sstring description, streaming::stream_reason reason)
|
||||
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set<token> tokens,
|
||||
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason)
|
||||
: _db(db)
|
||||
, _stream_manager(sm)
|
||||
, _token_metadata_ptr(std::move(tmptr))
|
||||
, _abort_source(abort_source)
|
||||
, _tokens(std::move(tokens))
|
||||
, _address(address)
|
||||
, _dr(std::move(dr))
|
||||
, _description(std::move(description))
|
||||
, _reason(reason)
|
||||
{
|
||||
_abort_source.check();
|
||||
}
|
||||
|
||||
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source, inet_address address, sstring description, streaming::stream_reason reason)
|
||||
: range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set<token>(), address, description, reason) {
|
||||
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source,
|
||||
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason)
|
||||
: range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set<token>(), address, std::move(dr), description, reason) {
|
||||
}
|
||||
|
||||
void add_source_filter(std::unique_ptr<i_source_filter> filter) {
|
||||
@@ -149,6 +152,7 @@ private:
|
||||
abort_source& _abort_source;
|
||||
std::unordered_set<token> _tokens;
|
||||
inet_address _address;
|
||||
locator::endpoint_dc_rack _dr;
|
||||
sstring _description;
|
||||
streaming::stream_reason _reason;
|
||||
std::unordered_multimap<sstring, std::unordered_map<inet_address, dht::token_range_vector>> _to_stream;
|
||||
|
||||
@@ -705,7 +705,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
|
||||
|
||||
slogger.info("sleeping {} ms for pending range setup", get_ring_delay().count());
|
||||
_gossiper.wait_for_range_setup().get();
|
||||
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), bootstrap_tokens, get_token_metadata_ptr());
|
||||
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), {}, bootstrap_tokens, get_token_metadata_ptr());
|
||||
slogger.info("Starting to bootstrap...");
|
||||
bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper).get();
|
||||
} else {
|
||||
@@ -2229,7 +2229,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
|
||||
_repair.local().replace_with_repair(get_token_metadata_ptr(), bootstrap_tokens, ignore_nodes).get();
|
||||
} else {
|
||||
slogger.info("replace[{}]: Using streaming based node ops to sync data", uuid);
|
||||
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), bootstrap_tokens, get_token_metadata_ptr());
|
||||
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), {}, bootstrap_tokens, get_token_metadata_ptr());
|
||||
bs.bootstrap(streaming::stream_reason::replace, _gossiper, replace_address).get();
|
||||
}
|
||||
|
||||
@@ -2664,7 +2664,7 @@ future<> storage_service::rebuild(sstring source_dc) {
|
||||
co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc));
|
||||
} else {
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._stream_manager, tmptr, ss._abort_source,
|
||||
ss.get_broadcast_address(), "Rebuild", streaming::stream_reason::rebuild);
|
||||
ss.get_broadcast_address(), locator::endpoint_dc_rack{}, "Rebuild", streaming::stream_reason::rebuild);
|
||||
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(ss._gossiper.get_unreachable_members()));
|
||||
if (source_dc != "") {
|
||||
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
|
||||
@@ -2839,7 +2839,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
|
||||
as.request_abort();
|
||||
}
|
||||
});
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), "Removenode", streaming::stream_reason::removenode);
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), locator::endpoint_dc_rack{}, "Removenode", streaming::stream_reason::removenode);
|
||||
removenode_add_ranges(streamer, leaving_node).get();
|
||||
try {
|
||||
streamer->stream_async().get();
|
||||
@@ -2866,7 +2866,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
|
||||
as.request_abort();
|
||||
}
|
||||
});
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), "Restore_replica_count", streaming::stream_reason::removenode);
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), locator::endpoint_dc_rack{}, "Restore_replica_count", streaming::stream_reason::removenode);
|
||||
removenode_add_ranges(streamer, endpoint).get();
|
||||
auto status_checker = seastar::async([this, endpoint, &as] {
|
||||
slogger.info("restore_replica_count: Started status checker for removing node {}", endpoint);
|
||||
@@ -2986,7 +2986,7 @@ future<> storage_service::leave_ring() {
|
||||
|
||||
future<>
|
||||
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream_by_keyspace) {
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_broadcast_address(), "Unbootstrap", streaming::stream_reason::decommission);
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_broadcast_address(), locator::endpoint_dc_rack{}, "Unbootstrap", streaming::stream_reason::decommission);
|
||||
for (auto& entry : ranges_to_stream_by_keyspace) {
|
||||
const auto& keyspace = entry.first;
|
||||
auto& ranges_with_endpoints = entry.second;
|
||||
|
||||
Reference in New Issue
Block a user