streaming: Use separate streaming reason for replace operation

Currently, replace and bootstrap share the same streaming reason,
stream_reason::bootstrap, because they share most of the code
in boot_strapper.

In order to distinguish the two, we need to introduce a new stream
reason, stream_reason::replace. It is safe to do so in a mixed cluster
because current code only check if the stream_reason is
stream_reason::repair.

Refs: #6351
This commit is contained in:
Asias He
2020-05-21 17:15:29 +08:00
parent c29ccdea7e
commit fa9ee234a0
6 changed files with 21 additions and 6 deletions

View File

@@ -43,15 +43,23 @@
#include "log.hh"
#include "db/config.hh"
#include "database.hh"
#include "streaming/stream_reason.hh"
static logging::logger blogger("boot_strapper");
namespace dht {
future<> boot_strapper::bootstrap() {
future<> boot_strapper::bootstrap(streaming::stream_reason reason) {
blogger.debug("Beginning bootstrap process: sorted_tokens={}", _token_metadata.sorted_tokens());
auto streamer = make_lw_shared<range_streamer>(_db, _token_metadata, _abort_source, _tokens, _address, "Bootstrap", streaming::stream_reason::bootstrap);
sstring description;
if (reason == streaming::stream_reason::bootstrap) {
description = "Bootstrap";
} else if (reason == streaming::stream_reason::replace) {
description = "Replace";
} else {
return make_exception_future<>(std::runtime_error("Wrong stream_reason provided: it can only be replace or bootstrap"));
}
auto streamer = make_lw_shared<range_streamer>(_db, _token_metadata, _abort_source, _tokens, _address, description, reason);
streamer->add_source_filter(std::make_unique<range_streamer::failure_detector_source_filter>(gms::get_local_gossiper().get_unreachable_members()));
auto keyspaces = make_lw_shared<std::vector<sstring>>(_db.local().get_non_system_keyspaces());
return do_for_each(*keyspaces, [this, keyspaces, streamer] (sstring& keyspace_name) {

View File

@@ -41,6 +41,7 @@
#include "dht/i_partitioner.hh"
#include <unordered_set>
#include "database_fwd.hh"
#include "streaming/stream_reason.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/abort_source.hh>
@@ -66,7 +67,7 @@ public:
, _token_metadata(tmd) {
}
future<> bootstrap();
future<> bootstrap(streaming::stream_reason reason);
/**
* if initialtoken was specified, use that (split on comma).

View File

@@ -49,6 +49,7 @@ enum class stream_reason : uint8_t {
removenode,
rebuild,
repair,
replace,
};
enum class stream_mutation_fragments_cmd : uint8_t {

View File

@@ -1945,7 +1945,7 @@ future<> rebuild_with_repair(seastar::sharded<database>& db, locator::token_meta
future<> replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, std::unordered_set<dht::token> replacing_tokens) {
auto op = sstring("replace_with_repair");
auto source_dc = get_local_dc();
auto reason = streaming::stream_reason::bootstrap;
auto reason = streaming::stream_reason::replace;
tm.update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address());
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc), reason);
}

View File

@@ -973,7 +973,11 @@ void storage_service::bootstrap() {
} else {
dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata);
// Does the actual streaming of newly replicated token ranges.
bs.bootstrap().get();
if (db().local().is_replacing()) {
bs.bootstrap(streaming::stream_reason::replace).get();
} else {
bs.bootstrap(streaming::stream_reason::bootstrap).get();
}
}
_db.invoke_on_all([this] (database& db) {
for (auto& cf : db.get_non_system_column_families()) {

View File

@@ -32,6 +32,7 @@ enum class stream_reason : uint8_t {
removenode,
rebuild,
repair,
replace,
};
}