From fa9ee234a0b3ace33374d58bfdcdfb516f07824b Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 21 May 2020 17:15:29 +0800 Subject: [PATCH] streaming: Use separate streaming reason for replace operation Currently, replace and bootstrap share the same streaming reason, stream_reason::bootstrap, because they share most of the code in boot_strapper. In order to distinguish the two, we need to introduce a new stream reason, stream_reason::replace. It is safe to do so in a mixed cluster because current code only check if the stream_reason is stream_reason::repair. Refs: #6351 --- dht/boot_strapper.cc | 14 +++++++++++--- dht/boot_strapper.hh | 3 ++- idl/streaming.idl.hh | 1 + repair/repair.cc | 2 +- service/storage_service.cc | 6 +++++- streaming/stream_reason.hh | 1 + 6 files changed, 21 insertions(+), 6 deletions(-) diff --git a/dht/boot_strapper.cc b/dht/boot_strapper.cc index 8e8c6a969f..7f76763d52 100644 --- a/dht/boot_strapper.cc +++ b/dht/boot_strapper.cc @@ -43,15 +43,23 @@ #include "log.hh" #include "db/config.hh" #include "database.hh" +#include "streaming/stream_reason.hh" static logging::logger blogger("boot_strapper"); namespace dht { -future<> boot_strapper::bootstrap() { +future<> boot_strapper::bootstrap(streaming::stream_reason reason) { blogger.debug("Beginning bootstrap process: sorted_tokens={}", _token_metadata.sorted_tokens()); - - auto streamer = make_lw_shared(_db, _token_metadata, _abort_source, _tokens, _address, "Bootstrap", streaming::stream_reason::bootstrap); + sstring description; + if (reason == streaming::stream_reason::bootstrap) { + description = "Bootstrap"; + } else if (reason == streaming::stream_reason::replace) { + description = "Replace"; + } else { + return make_exception_future<>(std::runtime_error("Wrong stream_reason provided: it can only be replace or bootstrap")); + } + auto streamer = make_lw_shared(_db, _token_metadata, _abort_source, _tokens, _address, description, reason); streamer->add_source_filter(std::make_unique(gms::get_local_gossiper().get_unreachable_members())); auto keyspaces = make_lw_shared>(_db.local().get_non_system_keyspaces()); return do_for_each(*keyspaces, [this, keyspaces, streamer] (sstring& keyspace_name) { diff --git a/dht/boot_strapper.hh b/dht/boot_strapper.hh index c07226915a..55abc06019 100644 --- a/dht/boot_strapper.hh +++ b/dht/boot_strapper.hh @@ -41,6 +41,7 @@ #include "dht/i_partitioner.hh" #include #include "database_fwd.hh" +#include "streaming/stream_reason.hh" #include #include @@ -66,7 +67,7 @@ public: , _token_metadata(tmd) { } - future<> bootstrap(); + future<> bootstrap(streaming::stream_reason reason); /** * if initialtoken was specified, use that (split on comma). diff --git a/idl/streaming.idl.hh b/idl/streaming.idl.hh index 7c043d40a4..0f1d47ef09 100644 --- a/idl/streaming.idl.hh +++ b/idl/streaming.idl.hh @@ -49,6 +49,7 @@ enum class stream_reason : uint8_t { removenode, rebuild, repair, + replace, }; enum class stream_mutation_fragments_cmd : uint8_t { diff --git a/repair/repair.cc b/repair/repair.cc index 08a6997b29..c659f2e833 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1945,7 +1945,7 @@ future<> rebuild_with_repair(seastar::sharded& db, locator::token_meta future<> replace_with_repair(seastar::sharded& db, locator::token_metadata tm, std::unordered_set replacing_tokens) { auto op = sstring("replace_with_repair"); auto source_dc = get_local_dc(); - auto reason = streaming::stream_reason::bootstrap; + auto reason = streaming::stream_reason::replace; tm.update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address()); return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc), reason); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 76d842ffd4..5f8c33880b 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -973,7 +973,11 @@ void storage_service::bootstrap() { } else { dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata); // Does the actual streaming of newly replicated token ranges. - bs.bootstrap().get(); + if (db().local().is_replacing()) { + bs.bootstrap(streaming::stream_reason::replace).get(); + } else { + bs.bootstrap(streaming::stream_reason::bootstrap).get(); + } } _db.invoke_on_all([this] (database& db) { for (auto& cf : db.get_non_system_column_families()) { diff --git a/streaming/stream_reason.hh b/streaming/stream_reason.hh index 1d41033147..90821332d7 100644 --- a/streaming/stream_reason.hh +++ b/streaming/stream_reason.hh @@ -32,6 +32,7 @@ enum class stream_reason : uint8_t { removenode, rebuild, repair, + replace, }; }