Merge 'Cut feature_service -> system_keyspace dependency' from Pavel Emelyanov
This implicit link it pretty bad, because feature service is a low-level one which lots of other services depend on. System keyspace is opposite -- a high-level one that needs e.g. query processor and database to operate. This inverse dependency is created by the feature service need to commit enabled features' names into system keyspace on cluster join. And it uses the qctx thing for that in a best-effort manner (not doing anything if it's null). The dependency can be cut. The only place when enabled features are committed is when gossiper enables features on join or by receiving state changes from other nodes. By that time the sharded<system_keyspace> is up and running and can be used. Despite gossiper already has system keyspace dependency, it's better not to overload it with the need to mess with enabling and persisting features. Instead, the feature_enabler instance is equipped with needed dependencies and takes care of it. Eventually the enabler is also moved to feature_service.cc where it naturally belongs. Fixes: #13837 Closes #13172 * github.com:scylladb/scylladb: gossiper: Remove features and sysks from gossiper system_keyspace: De-static save_local_supported_features() system_keyspace: De-static load_|save_local_enabled_features() system_keyspace: Move enable_features_on_startup to feature_service (cont) system_keyspace: Move enable_features_on_startup to feature_service feature_service: Open-code persist_enabled_feature_info() into enabler gms: Move feature enabler to feature_service.cc gms: Move gossiper::enable_features() to feature_service::enable_features_on_join() gms: Persist features explicitly in features enabler feature_service: Make persist_enabled_feature_info() return a future system_keyspace: De-static load_peer_features() gms: Move gossiper::do_enable_features to persistent_feature_enabler::enable_features() gossiper: Enable features and register enabler from outside gms: Add feature_service and system_keyspace to feature_enabler
This commit is contained in:
@@ -1400,7 +1400,7 @@ future<> system_keyspace::setup_version(sharded<netw::messaging_service>& ms) {
|
||||
|
||||
future<> system_keyspace::save_local_supported_features(const std::set<std::string_view>& feats) {
|
||||
static const auto req = format("INSERT INTO system.{} (key, supported_features) VALUES (?, ?)", LOCAL);
|
||||
return qctx->execute_cql(req,
|
||||
return execute_cql(req,
|
||||
sstring(db::system_keyspace::LOCAL),
|
||||
fmt::to_string(fmt::join(feats, ","))).discard_result();
|
||||
}
|
||||
@@ -1636,7 +1636,7 @@ future<std::vector<gms::inet_address>> system_keyspace::load_peers() {
|
||||
|
||||
future<std::unordered_map<gms::inet_address, sstring>> system_keyspace::load_peer_features() {
|
||||
sstring req = format("SELECT peer, supported_features FROM system.{}", PEERS);
|
||||
return qctx->execute_cql(req).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
return execute_cql(req).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
std::unordered_map<gms::inet_address, sstring> ret;
|
||||
for (auto& row : *cql_result) {
|
||||
if (row.has("supported_features")) {
|
||||
@@ -3364,44 +3364,6 @@ future<> system_keyspace::save_local_enabled_features(std::set<sstring> features
|
||||
co_await set_scylla_local_param(gms::feature_service::ENABLED_FEATURES_KEY, features_str);
|
||||
}
|
||||
|
||||
future<> system_keyspace::enable_features_on_startup(sharded<gms::feature_service>& feat) {
|
||||
std::set<sstring> features_to_enable;
|
||||
const auto persisted_features = co_await load_local_enabled_features();
|
||||
if (persisted_features.empty()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
gms::feature_service& local_feat_srv = feat.local();
|
||||
const auto known_features = local_feat_srv.supported_feature_set();
|
||||
const auto& registered_features = local_feat_srv.registered_features();
|
||||
for (auto&& f : persisted_features) {
|
||||
slogger.debug("Enabling persisted feature '{}'", f);
|
||||
const bool is_registered_feat = registered_features.contains(sstring(f));
|
||||
if (!is_registered_feat || !known_features.contains(f)) {
|
||||
if (is_registered_feat) {
|
||||
throw std::runtime_error(format(
|
||||
"Feature '{}' was previously enabled in the cluster but its support is disabled by this node. "
|
||||
"Set the corresponding configuration option to enable the support for the feature.", f));
|
||||
} else {
|
||||
throw std::runtime_error(format("Unknown feature '{}' was previously enabled in the cluster. "
|
||||
" That means this node is performing a prohibited downgrade procedure"
|
||||
" and should not be allowed to boot.", f));
|
||||
}
|
||||
}
|
||||
if (is_registered_feat) {
|
||||
features_to_enable.insert(std::move(f));
|
||||
}
|
||||
// If a feature is not in `registered_features` but still in `known_features` list
|
||||
// that means the feature name is used for backward compatibility and should be implicitly
|
||||
// enabled in the code by default, so just skip it.
|
||||
}
|
||||
|
||||
co_await feat.invoke_on_all([&features_to_enable] (auto& srv) -> future<> {
|
||||
std::set<std::string_view> feat = boost::copy_range<std::set<std::string_view>>(features_to_enable);
|
||||
co_await srv.enable(std::move(feat));
|
||||
});
|
||||
}
|
||||
|
||||
future<utils::UUID> system_keyspace::get_raft_group0_id() {
|
||||
auto opt = co_await get_scylla_local_param_as<utils::UUID>("raft_group0_id");
|
||||
co_return opt.value_or<utils::UUID>({});
|
||||
|
||||
@@ -371,9 +371,9 @@ public:
|
||||
*/
|
||||
future<std::unordered_set<dht::token>> get_local_tokens();
|
||||
|
||||
static future<std::unordered_map<gms::inet_address, sstring>> load_peer_features();
|
||||
static future<std::set<sstring>> load_local_enabled_features();
|
||||
static future<> save_local_enabled_features(std::set<sstring> features);
|
||||
future<std::unordered_map<gms::inet_address, sstring>> load_peer_features();
|
||||
future<std::set<sstring>> load_local_enabled_features();
|
||||
future<> save_local_enabled_features(std::set<sstring> features);
|
||||
|
||||
future<int> increment_and_get_generation();
|
||||
bool bootstrap_needed() const;
|
||||
@@ -436,8 +436,6 @@ public:
|
||||
future<bool> cdc_is_rewritten();
|
||||
future<> cdc_set_rewritten(std::optional<cdc::generation_id_v1>);
|
||||
|
||||
static future<> enable_features_on_startup(sharded<gms::feature_service>& feat);
|
||||
|
||||
// Load Raft Group 0 id from scylla.local
|
||||
static future<utils::UUID> get_raft_group0_id();
|
||||
|
||||
@@ -445,7 +443,7 @@ public:
|
||||
static future<> set_raft_group0_id(utils::UUID id);
|
||||
|
||||
// Save advertised gossip feature set to system.local
|
||||
static future<> save_local_supported_features(const std::set<std::string_view>& feats);
|
||||
future<> save_local_supported_features(const std::set<std::string_view>& feats);
|
||||
|
||||
// Get the last (the greatest in timeuuid order) state ID in the group 0 history table.
|
||||
// Assumes that the history table exists, i.e. Raft experimental feature is enabled.
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/i_endpoint_state_change_subscriber.hh"
|
||||
|
||||
namespace gms {
|
||||
|
||||
@@ -181,13 +183,97 @@ std::set<sstring> feature_service::to_feature_set(sstring features_string) {
|
||||
return features;
|
||||
}
|
||||
|
||||
void feature_service::persist_enabled_feature_info(const gms::feature& f) const {
|
||||
// Executed in seastar::async context, because `gms::feature::enable`
|
||||
// is only allowed to run within a thread context
|
||||
class persistent_feature_enabler : public i_endpoint_state_change_subscriber {
|
||||
gossiper& _g;
|
||||
feature_service& _feat;
|
||||
db::system_keyspace& _sys_ks;
|
||||
|
||||
std::set<sstring> feats_set = db::system_keyspace::load_local_enabled_features().get0();
|
||||
feats_set.emplace(f.name());
|
||||
db::system_keyspace::save_local_enabled_features(std::move(feats_set)).get0();
|
||||
public:
|
||||
persistent_feature_enabler(gossiper& g, feature_service& f, db::system_keyspace& s)
|
||||
: _g(g)
|
||||
, _feat(f)
|
||||
, _sys_ks(s)
|
||||
{
|
||||
}
|
||||
future<> on_join(inet_address ep, endpoint_state state) override {
|
||||
return enable_features();
|
||||
}
|
||||
future<> on_change(inet_address ep, application_state state, const versioned_value&) override {
|
||||
if (state == application_state::SUPPORTED_FEATURES) {
|
||||
return enable_features();
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
future<> before_change(inet_address, endpoint_state, application_state, const versioned_value&) override { return make_ready_future(); }
|
||||
future<> on_alive(inet_address, endpoint_state) override { return make_ready_future(); }
|
||||
future<> on_dead(inet_address, endpoint_state) override { return make_ready_future(); }
|
||||
future<> on_remove(inet_address) override { return make_ready_future(); }
|
||||
future<> on_restart(inet_address, endpoint_state) override { return make_ready_future(); }
|
||||
|
||||
future<> enable_features();
|
||||
};
|
||||
|
||||
future<> feature_service::enable_features_on_join(gossiper& g, db::system_keyspace& sys_ks) {
|
||||
auto enabler = make_shared<persistent_feature_enabler>(g, *this, sys_ks);
|
||||
g.register_(enabler);
|
||||
return enabler->enable_features();
|
||||
}
|
||||
|
||||
future<> feature_service::enable_features_on_startup(db::system_keyspace& sys_ks) {
|
||||
std::set<sstring> features_to_enable;
|
||||
const auto persisted_features = co_await sys_ks.load_local_enabled_features();
|
||||
if (persisted_features.empty()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
const auto known_features = supported_feature_set();
|
||||
for (auto&& f : persisted_features) {
|
||||
logger.debug("Enabling persisted feature '{}'", f);
|
||||
const bool is_registered_feat = _registered_features.contains(sstring(f));
|
||||
if (!is_registered_feat || !known_features.contains(f)) {
|
||||
if (is_registered_feat) {
|
||||
throw std::runtime_error(format(
|
||||
"Feature '{}' was previously enabled in the cluster but its support is disabled by this node. "
|
||||
"Set the corresponding configuration option to enable the support for the feature.", f));
|
||||
} else {
|
||||
throw std::runtime_error(format("Unknown feature '{}' was previously enabled in the cluster. "
|
||||
" That means this node is performing a prohibited downgrade procedure"
|
||||
" and should not be allowed to boot.", f));
|
||||
}
|
||||
}
|
||||
if (is_registered_feat) {
|
||||
features_to_enable.insert(std::move(f));
|
||||
}
|
||||
// If a feature is not in `registered_features` but still in `known_features` list
|
||||
// that means the feature name is used for backward compatibility and should be implicitly
|
||||
// enabled in the code by default, so just skip it.
|
||||
}
|
||||
|
||||
co_await container().invoke_on_all([&features_to_enable] (auto& srv) -> future<> {
|
||||
std::set<std::string_view> feat = boost::copy_range<std::set<std::string_view>>(features_to_enable);
|
||||
co_await srv.enable(std::move(feat));
|
||||
});
|
||||
}
|
||||
|
||||
future<> persistent_feature_enabler::enable_features() {
|
||||
auto loaded_peer_features = co_await _sys_ks.load_peer_features();
|
||||
auto&& features = _g.get_supported_features(loaded_peer_features, gossiper::ignore_features_of_local_node::no);
|
||||
|
||||
// Persist enabled feature in the `system.scylla_local` table under the "enabled_features" key.
|
||||
// The key itself is maintained as an `unordered_set<string>` and serialized via `to_string`
|
||||
// function to preserve readability.
|
||||
std::set<sstring> feats_set = co_await _sys_ks.load_local_enabled_features();
|
||||
for (feature& f : _feat.registered_features() | boost::adaptors::map_values) {
|
||||
if (!f && features.contains(f.name())) {
|
||||
feats_set.emplace(f.name());
|
||||
}
|
||||
}
|
||||
co_await _sys_ks.save_local_enabled_features(std::move(feats_set));
|
||||
|
||||
co_await _feat.container().invoke_on_all([&features] (feature_service& fs) -> future<> {
|
||||
std::set<std::string_view> features_v = boost::copy_range<std::set<std::string_view>>(features);
|
||||
co_await fs.enable(std::move(features_v));
|
||||
});
|
||||
}
|
||||
|
||||
future<> feature_service::enable(std::set<std::string_view> list) {
|
||||
@@ -195,9 +281,6 @@ future<> feature_service::enable(std::set<std::string_view> list) {
|
||||
return seastar::async([this, list = std::move(list)] {
|
||||
for (gms::feature& f : _registered_features | boost::adaptors::map_values) {
|
||||
if (list.contains(f.name())) {
|
||||
if (db::qctx && !f) {
|
||||
persist_enabled_feature_info(f);
|
||||
}
|
||||
f.enable();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <unordered_map>
|
||||
#include <functional>
|
||||
#include <set>
|
||||
@@ -19,11 +20,15 @@
|
||||
#include "db/schema_features.hh"
|
||||
#include "gms/feature.hh"
|
||||
|
||||
namespace db { class config; }
|
||||
namespace db {
|
||||
class config;
|
||||
class system_keyspace;
|
||||
}
|
||||
namespace service { class storage_service; }
|
||||
|
||||
namespace gms {
|
||||
|
||||
class gossiper;
|
||||
class feature_service;
|
||||
|
||||
struct feature_config {
|
||||
@@ -46,7 +51,7 @@ using namespace std::literals;
|
||||
* A pointer to `cql3::query_processor` can be optionally supplied
|
||||
* if the instance needs to persist enabled features in a system table.
|
||||
*/
|
||||
class feature_service final {
|
||||
class feature_service final : public peering_sharded_service<feature_service> {
|
||||
void register_feature(feature& f);
|
||||
void unregister_feature(feature& f);
|
||||
friend class feature;
|
||||
@@ -118,10 +123,8 @@ public:
|
||||
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;
|
||||
|
||||
static std::set<sstring> to_feature_set(sstring features_string);
|
||||
// Persist enabled feature in the `system.scylla_local` table under the "enabled_features" key.
|
||||
// The key itself is maintained as an `unordered_set<string>` and serialized via `to_string`
|
||||
// function to preserve readability.
|
||||
void persist_enabled_feature_info(const gms::feature& f) const;
|
||||
future<> enable_features_on_startup(db::system_keyspace&);
|
||||
future<> enable_features_on_join(gossiper&, db::system_keyspace&);
|
||||
};
|
||||
|
||||
} // namespace gms
|
||||
|
||||
@@ -70,32 +70,10 @@ std::chrono::milliseconds gossiper::quarantine_delay() const noexcept {
|
||||
return ring_delay * 2;
|
||||
}
|
||||
|
||||
class feature_enabler : public i_endpoint_state_change_subscriber {
|
||||
gossiper& _g;
|
||||
public:
|
||||
feature_enabler(gossiper& g) : _g(g) {}
|
||||
future<> on_join(inet_address ep, endpoint_state state) override {
|
||||
return _g.maybe_enable_features();
|
||||
}
|
||||
future<> on_change(inet_address ep, application_state state, const versioned_value&) override {
|
||||
if (state == application_state::SUPPORTED_FEATURES) {
|
||||
return _g.maybe_enable_features();
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
future<> before_change(inet_address, endpoint_state, application_state, const versioned_value&) override { return make_ready_future(); }
|
||||
future<> on_alive(inet_address, endpoint_state) override { return make_ready_future(); }
|
||||
future<> on_dead(inet_address, endpoint_state) override { return make_ready_future(); }
|
||||
future<> on_remove(inet_address) override { return make_ready_future(); }
|
||||
future<> on_restart(inet_address, endpoint_state) override { return make_ready_future(); }
|
||||
};
|
||||
|
||||
gossiper::gossiper(abort_source& as, feature_service& features, const locator::shared_token_metadata& stm, netw::messaging_service& ms, sharded<db::system_keyspace>& sys_ks, const db::config& cfg, gossip_config gcfg)
|
||||
gossiper::gossiper(abort_source& as, const locator::shared_token_metadata& stm, netw::messaging_service& ms, const db::config& cfg, gossip_config gcfg)
|
||||
: _abort_source(as)
|
||||
, _feature_service(features)
|
||||
, _shared_token_metadata(stm)
|
||||
, _messaging(ms)
|
||||
, _sys_ks(sys_ks)
|
||||
, _failure_detector_timeout_ms(cfg.failure_detector_timeout_in_ms)
|
||||
, _force_gossip_generation(cfg.force_gossip_generation)
|
||||
, _gcfg(std::move(gcfg)) {
|
||||
@@ -107,7 +85,6 @@ gossiper::gossiper(abort_source& as, feature_service& features, const locator::s
|
||||
_scheduled_gossip_task.set_callback(_gcfg.gossip_scheduling_group, [this] { run(); });
|
||||
// half of QUARATINE_DELAY, to ensure _just_removed_endpoints has enough leeway to prevent re-gossip
|
||||
fat_client_timeout = quarantine_delay() / 2;
|
||||
register_(make_shared<feature_enabler>(*this));
|
||||
// Register this instance with JMX
|
||||
namespace sm = seastar::metrics;
|
||||
auto ep = get_broadcast_address();
|
||||
@@ -2379,9 +2356,6 @@ future<> gossiper::wait_for_gossip_to_settle() {
|
||||
if (force_after != 0) {
|
||||
co_await wait_for_gossip(GOSSIP_SETTLE_MIN_WAIT_MS, force_after);
|
||||
}
|
||||
if (!std::exchange(_gossip_settled, true)) {
|
||||
co_await maybe_enable_features();
|
||||
}
|
||||
}
|
||||
|
||||
future<> gossiper::wait_for_range_setup() {
|
||||
@@ -2553,18 +2527,6 @@ void gossiper::append_endpoint_state(std::stringstream& ss, const endpoint_state
|
||||
}
|
||||
}
|
||||
|
||||
future<> gossiper::maybe_enable_features() {
|
||||
if (!_gossip_settled) {
|
||||
co_return;
|
||||
}
|
||||
auto loaded_peer_features = co_await db::system_keyspace::load_peer_features();
|
||||
auto&& features = get_supported_features(loaded_peer_features, ignore_features_of_local_node::no);
|
||||
co_await container().invoke_on_all([&features] (gossiper& g) -> future<> {
|
||||
std::set<std::string_view> features_v = boost::copy_range<std::set<std::string_view>>(features);
|
||||
co_await g._feature_service.enable(std::move(features_v));
|
||||
});
|
||||
}
|
||||
|
||||
locator::token_metadata_ptr gossiper::get_token_metadata_ptr() const noexcept {
|
||||
return _shared_token_metadata.get();
|
||||
}
|
||||
|
||||
@@ -42,7 +42,6 @@
|
||||
|
||||
namespace db {
|
||||
class config;
|
||||
class system_keyspace;
|
||||
}
|
||||
|
||||
namespace gms {
|
||||
@@ -56,8 +55,6 @@ class i_endpoint_state_change_subscriber;
|
||||
class gossip_get_endpoint_states_request;
|
||||
class gossip_get_endpoint_states_response;
|
||||
|
||||
class feature_service;
|
||||
|
||||
using advertise_myself = bool_class<class advertise_myself_tag>;
|
||||
|
||||
struct syn_msg_pending {
|
||||
@@ -242,7 +239,7 @@ private:
|
||||
// The value must be kept alive until completes and not change.
|
||||
future<> replicate(inet_address, application_state key, const versioned_value& value);
|
||||
public:
|
||||
explicit gossiper(abort_source& as, feature_service& features, const locator::shared_token_metadata& stm, netw::messaging_service& ms, sharded<db::system_keyspace>& sys_ks, const db::config& cfg, gossip_config gcfg);
|
||||
explicit gossiper(abort_source& as, const locator::shared_token_metadata& stm, netw::messaging_service& ms, const db::config& cfg, gossip_config gcfg);
|
||||
|
||||
/**
|
||||
* Register for interesting state changes.
|
||||
@@ -589,26 +586,22 @@ private:
|
||||
|
||||
uint64_t _nr_run = 0;
|
||||
uint64_t _msg_processing = 0;
|
||||
bool _gossip_settled = false;
|
||||
|
||||
class msg_proc_guard;
|
||||
private:
|
||||
abort_source& _abort_source;
|
||||
feature_service& _feature_service;
|
||||
const locator::shared_token_metadata& _shared_token_metadata;
|
||||
netw::messaging_service& _messaging;
|
||||
sharded<db::system_keyspace>& _sys_ks;
|
||||
utils::updateable_value<uint32_t> _failure_detector_timeout_ms;
|
||||
utils::updateable_value<int32_t> _force_gossip_generation;
|
||||
gossip_config _gcfg;
|
||||
// Get features supported by a particular node
|
||||
std::set<sstring> get_supported_features(inet_address endpoint) const;
|
||||
// Get features supported by all the nodes this node knows about
|
||||
std::set<sstring> get_supported_features(const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, ignore_features_of_local_node ignore_local_node) const;
|
||||
locator::token_metadata_ptr get_token_metadata_ptr() const noexcept;
|
||||
public:
|
||||
void check_knows_remote_features(std::set<std::string_view>& local_features, const std::unordered_map<inet_address, sstring>& loaded_peer_features) const;
|
||||
future<> maybe_enable_features();
|
||||
// Get features supported by all the nodes this node knows about
|
||||
std::set<sstring> get_supported_features(const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, ignore_features_of_local_node ignore_local_node) const;
|
||||
private:
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
public:
|
||||
|
||||
4
main.cc
4
main.cc
@@ -1031,7 +1031,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}
|
||||
|
||||
debug::the_gossiper = &gossiper;
|
||||
gossiper.start(std::ref(stop_signal.as_sharded_abort_source()), std::ref(feature_service), std::ref(token_metadata), std::ref(messaging), std::ref(sys_ks), std::ref(*cfg), std::ref(gcfg)).get();
|
||||
gossiper.start(std::ref(stop_signal.as_sharded_abort_source()), std::ref(token_metadata), std::ref(messaging), std::ref(*cfg), std::ref(gcfg)).get();
|
||||
auto stop_gossiper = defer_verbose_shutdown("gossiper", [&gossiper] {
|
||||
// call stop on each instance, but leave the sharded<> pointers alive
|
||||
gossiper.invoke_on_all(&gms::gossiper::stop).get();
|
||||
@@ -1294,7 +1294,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// Re-enable previously enabled features on node startup.
|
||||
// This should be done before commitlog starts replaying
|
||||
// since some features affect storage.
|
||||
db::system_keyspace::enable_features_on_startup(feature_service).get();
|
||||
feature_service.local().enable_features_on_startup(sys_ks.local()).get();
|
||||
|
||||
db.local().maybe_init_schema_commitlog();
|
||||
|
||||
|
||||
@@ -1478,7 +1478,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
||||
// Save the advertised feature set to system.local table after
|
||||
// all remote feature checks are complete and after gossip shadow rounds are done.
|
||||
// At this point, the final feature set is already determined before the node joins the ring.
|
||||
co_await db::system_keyspace::save_local_supported_features(features);
|
||||
co_await _sys_ks.local().save_local_supported_features(features);
|
||||
|
||||
// If this is a restarting node, we should update tokens before gossip starts
|
||||
auto my_tokens = co_await _sys_ks.local().get_saved_tokens();
|
||||
@@ -1585,6 +1585,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
||||
});
|
||||
_listeners.emplace_back(make_lw_shared(std::move(schema_change_announce)));
|
||||
co_await _gossiper.wait_for_gossip_to_settle();
|
||||
co_await _feature_service.enable_features_on_join(_gossiper, _sys_ks.local());
|
||||
|
||||
set_mode(mode::JOINING);
|
||||
|
||||
@@ -2664,7 +2665,7 @@ future<> storage_service::join_cluster(cdc::generation_service& cdc_gen_service,
|
||||
auto initial_contact_nodes = loaded_endpoints.empty() ?
|
||||
std::unordered_set<gms::inet_address>(seeds.begin(), seeds.end()) :
|
||||
loaded_endpoints;
|
||||
auto loaded_peer_features = db::system_keyspace::load_peer_features().get0();
|
||||
auto loaded_peer_features = _sys_ks.local().load_peer_features().get0();
|
||||
slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}",
|
||||
initial_contact_nodes, loaded_endpoints, loaded_peer_features.size());
|
||||
for (auto& x : loaded_peer_features) {
|
||||
|
||||
@@ -649,7 +649,7 @@ public:
|
||||
gcfg.cluster_name = "Test Cluster";
|
||||
gcfg.seeds = std::move(seeds);
|
||||
gcfg.skip_wait_for_gossip_to_settle = 0;
|
||||
gossiper.start(std::ref(abort_sources), std::ref(feature_service), std::ref(token_metadata), std::ref(ms), std::ref(sys_ks), std::ref(*cfg), std::move(gcfg)).get();
|
||||
gossiper.start(std::ref(abort_sources), std::ref(token_metadata), std::ref(ms), std::ref(*cfg), std::move(gcfg)).get();
|
||||
auto stop_ms_fd_gossiper = defer([&gossiper] {
|
||||
gossiper.stop().get();
|
||||
});
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/application_state.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
@@ -60,14 +59,9 @@ int main(int ac, char ** av) {
|
||||
utils::fb_utilities::set_broadcast_rpc_address(listen);
|
||||
auto cfg = std::make_unique<db::config>();
|
||||
|
||||
sharded<gms::feature_service> feature_service;
|
||||
feature_service.start(gms::feature_config_from_db_config(*cfg)).get();
|
||||
auto stop_feature_service = deferred_stop(feature_service);
|
||||
|
||||
sharded<abort_source> abort_sources;
|
||||
sharded<locator::shared_token_metadata> token_metadata;
|
||||
sharded<netw::messaging_service> messaging;
|
||||
sharded<db::system_keyspace> sys_ks;
|
||||
|
||||
abort_sources.start().get();
|
||||
auto stop_abort_source = defer([&] { abort_sources.stop().get(); });
|
||||
@@ -83,7 +77,7 @@ int main(int ac, char ** av) {
|
||||
gcfg.seeds.emplace(std::move(s));
|
||||
}
|
||||
sharded<gms::gossiper> gossiper;
|
||||
gossiper.start(std::ref(abort_sources), std::ref(feature_service), std::ref(token_metadata), std::ref(messaging), std::ref(sys_ks), std::ref(*cfg), std::move(gcfg)).get();
|
||||
gossiper.start(std::ref(abort_sources), std::ref(token_metadata), std::ref(messaging), std::ref(*cfg), std::move(gcfg)).get();
|
||||
|
||||
auto& server = messaging.local();
|
||||
auto port = server.port();
|
||||
|
||||
Reference in New Issue
Block a user