gms: i_endpoint_state_change_subscriber: make callbacks to return futures
Coroutinize a few simple callbacks in the process. Signed-off-by: Pavel Solodovnikov <pa.solodovnikov@scylladb.com>
This commit is contained in:
@@ -733,28 +733,28 @@ future<> generation_service::after_join(std::optional<cdc::generation_id>&& star
|
||||
_cdc_streams_rewrite_complete = maybe_rewrite_streams_descriptions();
|
||||
}
|
||||
|
||||
void generation_service::on_join(gms::inet_address ep, gms::endpoint_state ep_state) {
|
||||
future<> generation_service::on_join(gms::inet_address ep, gms::endpoint_state ep_state) {
|
||||
assert_shard_zero(__PRETTY_FUNCTION__);
|
||||
|
||||
auto val = ep_state.get_application_state_ptr(gms::application_state::CDC_GENERATION_ID);
|
||||
if (!val) {
|
||||
return;
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
on_change(ep, gms::application_state::CDC_GENERATION_ID, *val);
|
||||
return on_change(ep, gms::application_state::CDC_GENERATION_ID, *val);
|
||||
}
|
||||
|
||||
void generation_service::on_change(gms::inet_address ep, gms::application_state app_state, const gms::versioned_value& v) {
|
||||
future<> generation_service::on_change(gms::inet_address ep, gms::application_state app_state, const gms::versioned_value& v) {
|
||||
assert_shard_zero(__PRETTY_FUNCTION__);
|
||||
|
||||
if (app_state != gms::application_state::CDC_GENERATION_ID) {
|
||||
return;
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value);
|
||||
cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id);
|
||||
|
||||
handle_cdc_generation(gen_id).get();
|
||||
return handle_cdc_generation(gen_id);
|
||||
}
|
||||
|
||||
future<> generation_service::check_and_repair_cdc_streams() {
|
||||
|
||||
@@ -113,14 +113,14 @@ public:
|
||||
return _cdc_metadata;
|
||||
}
|
||||
|
||||
virtual void before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override {}
|
||||
virtual void on_alive(gms::inet_address, gms::endpoint_state) override {}
|
||||
virtual void on_dead(gms::inet_address, gms::endpoint_state) override {}
|
||||
virtual void on_remove(gms::inet_address) override {}
|
||||
virtual void on_restart(gms::inet_address, gms::endpoint_state) override {}
|
||||
virtual future<> before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override { return make_ready_future(); }
|
||||
virtual future<> on_alive(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
virtual future<> on_dead(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
virtual future<> on_remove(gms::inet_address) override { return make_ready_future(); }
|
||||
virtual future<> on_restart(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
|
||||
virtual void on_join(gms::inet_address, gms::endpoint_state) override;
|
||||
virtual void on_change(gms::inet_address, gms::application_state, const gms::versioned_value&) override;
|
||||
virtual future<> on_join(gms::inet_address, gms::endpoint_state) override;
|
||||
virtual future<> on_change(gms::inet_address, gms::application_state, const gms::versioned_value&) override;
|
||||
|
||||
future<> check_and_repair_cdc_streams();
|
||||
|
||||
|
||||
@@ -106,19 +106,20 @@ class feature_enabler : public i_endpoint_state_change_subscriber {
|
||||
gossiper& _g;
|
||||
public:
|
||||
feature_enabler(gossiper& g) : _g(g) {}
|
||||
void on_join(inet_address ep, endpoint_state state) override {
|
||||
_g.maybe_enable_features().get();
|
||||
future<> on_join(inet_address ep, endpoint_state state) override {
|
||||
return _g.maybe_enable_features();
|
||||
}
|
||||
void on_change(inet_address ep, application_state state, const versioned_value&) override {
|
||||
future<> on_change(inet_address ep, application_state state, const versioned_value&) override {
|
||||
if (state == application_state::SUPPORTED_FEATURES) {
|
||||
_g.maybe_enable_features().get();
|
||||
return _g.maybe_enable_features();
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
void before_change(inet_address, endpoint_state, application_state, const versioned_value&) override { }
|
||||
void on_alive(inet_address, endpoint_state) override {}
|
||||
void on_dead(inet_address, endpoint_state) override {}
|
||||
void on_remove(inet_address) override {}
|
||||
void on_restart(inet_address, endpoint_state) override {}
|
||||
future<> before_change(inet_address, endpoint_state, application_state, const versioned_value&) override { return make_ready_future(); }
|
||||
future<> on_alive(inet_address, endpoint_state) override { return make_ready_future(); }
|
||||
future<> on_dead(inet_address, endpoint_state) override { return make_ready_future(); }
|
||||
future<> on_remove(inet_address) override { return make_ready_future(); }
|
||||
future<> on_restart(inet_address, endpoint_state) override { return make_ready_future(); }
|
||||
};
|
||||
|
||||
gossiper::gossiper(abort_source& as, feature_service& features, const locator::shared_token_metadata& stm, netw::messaging_service& ms, db::config& cfg, gossip_config gcfg)
|
||||
@@ -676,9 +677,9 @@ future<> gossiper::remove_endpoint(inet_address endpoint) {
|
||||
// We can not run on_remove callbacks here becasue on_remove in
|
||||
// storage_service might take the gossiper::timer_callback_lock
|
||||
(void)seastar::async([this, endpoint] {
|
||||
_subscribers.thread_for_each([endpoint] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_remove(endpoint);
|
||||
});
|
||||
_subscribers.for_each([endpoint] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->on_remove(endpoint);
|
||||
}).get();
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to call on_remove callback: {}", ep);
|
||||
});
|
||||
@@ -1578,10 +1579,10 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
logger.info("InetAddress {} is now UP, status = {}", addr, status);
|
||||
}
|
||||
|
||||
_subscribers.thread_for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_alive(addr, state);
|
||||
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
|
||||
co_await subscriber->on_alive(addr, state);
|
||||
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
|
||||
});
|
||||
}).get();
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
@@ -1593,10 +1594,10 @@ void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
|
||||
update_live_endpoints_version().get();
|
||||
_unreachable_endpoints[addr] = now();
|
||||
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state));
|
||||
_subscribers.thread_for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_dead(addr, state);
|
||||
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
|
||||
co_await subscriber->on_dead(addr, state);
|
||||
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
|
||||
});
|
||||
}).get();
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
@@ -1625,9 +1626,9 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state&
|
||||
|
||||
if (eps_old) {
|
||||
// the node restarted: it is up to the subscriber to take whatever action is necessary
|
||||
_subscribers.thread_for_each([ep, eps_old] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_restart(ep, *eps_old);
|
||||
});
|
||||
_subscribers.for_each([ep, eps_old] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->on_restart(ep, *eps_old);
|
||||
}).get();
|
||||
}
|
||||
|
||||
auto& ep_state = endpoint_state_map.at(ep);
|
||||
@@ -1640,9 +1641,9 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state&
|
||||
|
||||
auto* eps_new = get_endpoint_state_for_endpoint_ptr(ep);
|
||||
if (eps_new) {
|
||||
_subscribers.thread_for_each([ep, eps_new] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_join(ep, *eps_new);
|
||||
});
|
||||
_subscribers.for_each([ep, eps_new] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->on_join(ep, *eps_new);
|
||||
}).get();
|
||||
}
|
||||
// check this at the end so nodes will learn about the endpoint
|
||||
if (is_shutdown(ep)) {
|
||||
@@ -1742,16 +1743,16 @@ void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state,
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void gossiper::do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value) {
|
||||
_subscribers.thread_for_each([addr, ep_state, ap_state, new_value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->before_change(addr, ep_state, ap_state, new_value);
|
||||
});
|
||||
_subscribers.for_each([addr, ep_state, ap_state, new_value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->before_change(addr, ep_state, ap_state, new_value);
|
||||
}).get();
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) {
|
||||
_subscribers.thread_for_each([addr, state, value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_change(addr, state, value);
|
||||
});
|
||||
_subscribers.for_each([addr, state, value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->on_change(addr, state, value);
|
||||
}).get();
|
||||
}
|
||||
|
||||
void gossiper::request_all(gossip_digest& g_digest,
|
||||
|
||||
@@ -63,17 +63,17 @@ public:
|
||||
* @param endpoint endpoint for which the state change occurred.
|
||||
* @param epState state that actually changed for the above endpoint.
|
||||
*/
|
||||
virtual void on_join(inet_address endpoint, endpoint_state ep_state) = 0;
|
||||
virtual future<> on_join(inet_address endpoint, endpoint_state ep_state) = 0;
|
||||
|
||||
virtual void before_change(inet_address endpoint, endpoint_state current_state, application_state new_statekey, const versioned_value& newvalue) = 0;
|
||||
virtual future<> before_change(inet_address endpoint, endpoint_state current_state, application_state new_statekey, const versioned_value& newvalue) = 0;
|
||||
|
||||
virtual void on_change(inet_address endpoint, application_state state, const versioned_value& value) = 0;
|
||||
virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value) = 0;
|
||||
|
||||
virtual void on_alive(inet_address endpoint, endpoint_state state) = 0;
|
||||
virtual future<> on_alive(inet_address endpoint, endpoint_state state) = 0;
|
||||
|
||||
virtual void on_dead(inet_address endpoint, endpoint_state state) = 0;
|
||||
virtual future<> on_dead(inet_address endpoint, endpoint_state state) = 0;
|
||||
|
||||
virtual void on_remove(inet_address endpoint) = 0;
|
||||
virtual future<> on_remove(inet_address endpoint) = 0;
|
||||
|
||||
/**
|
||||
* Called whenever a node is restarted.
|
||||
@@ -81,7 +81,7 @@ public:
|
||||
* previously marked down. It will have only if {@code state.isAlive() == false}
|
||||
* as {@code state} is from before the restarted node is marked up.
|
||||
*/
|
||||
virtual void on_restart(inet_address endpoint, endpoint_state state) = 0;
|
||||
virtual future<> on_restart(inet_address endpoint, endpoint_state state) = 0;
|
||||
};
|
||||
|
||||
} // namespace gms
|
||||
|
||||
@@ -261,37 +261,43 @@ void reconnectable_snitch_helper::reconnect(gms::inet_address public_address, gm
|
||||
reconnectable_snitch_helper::reconnectable_snitch_helper(sstring local_dc)
|
||||
: _local_dc(local_dc) {}
|
||||
|
||||
void reconnectable_snitch_helper::before_change(gms::inet_address endpoint, gms::endpoint_state cs, gms::application_state new_state_key, const gms::versioned_value& new_value) {
|
||||
future<> reconnectable_snitch_helper::before_change(gms::inet_address endpoint, gms::endpoint_state cs, gms::application_state new_state_key, const gms::versioned_value& new_value) {
|
||||
// do nothing.
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void reconnectable_snitch_helper::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
future<> reconnectable_snitch_helper::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
auto* internal_ip_state = ep_state.get_application_state_ptr(gms::application_state::INTERNAL_IP);
|
||||
if (internal_ip_state) {
|
||||
reconnect(endpoint, *internal_ip_state);
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void reconnectable_snitch_helper::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) {
|
||||
future<> reconnectable_snitch_helper::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) {
|
||||
if (state == gms::application_state::INTERNAL_IP) {
|
||||
reconnect(endpoint, value);
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void reconnectable_snitch_helper::on_alive(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
on_join(std::move(endpoint), std::move(ep_state));
|
||||
future<> reconnectable_snitch_helper::on_alive(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
return on_join(std::move(endpoint), std::move(ep_state));
|
||||
}
|
||||
|
||||
void reconnectable_snitch_helper::on_dead(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
future<> reconnectable_snitch_helper::on_dead(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
// do nothing.
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void reconnectable_snitch_helper::on_remove(gms::inet_address endpoint) {
|
||||
future<> reconnectable_snitch_helper::on_remove(gms::inet_address endpoint) {
|
||||
// do nothing.
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void reconnectable_snitch_helper::on_restart(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
future<> reconnectable_snitch_helper::on_restart(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
// do nothing.
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
} // namespace locator
|
||||
|
||||
@@ -52,12 +52,12 @@ private:
|
||||
void reconnect(gms::inet_address public_address, gms::inet_address local_address);
|
||||
public:
|
||||
reconnectable_snitch_helper(sstring local_dc);
|
||||
void before_change(gms::inet_address endpoint, gms::endpoint_state cs, gms::application_state new_state_key, const gms::versioned_value& new_value) override;
|
||||
void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
void on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override;
|
||||
void on_alive(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
void on_dead(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
void on_remove(gms::inet_address endpoint) override;
|
||||
void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
future<> before_change(gms::inet_address endpoint, gms::endpoint_state cs, gms::application_state new_state_key, const gms::versioned_value& new_value) override;
|
||||
future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override;
|
||||
future<> on_alive(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
future<> on_dead(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
future<> on_remove(gms::inet_address endpoint) override;
|
||||
future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
};
|
||||
} // namespace locator
|
||||
|
||||
@@ -3059,38 +3059,45 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc
|
||||
rlogger.warn("Failed to remove row level repair for node {}: {}", node, ep);
|
||||
}).get();
|
||||
}
|
||||
virtual void on_join(
|
||||
virtual future<> on_join(
|
||||
gms::inet_address endpoint,
|
||||
gms::endpoint_state ep_state) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
virtual void before_change(
|
||||
virtual future<> before_change(
|
||||
gms::inet_address endpoint,
|
||||
gms::endpoint_state current_state,
|
||||
gms::application_state new_state_key,
|
||||
const gms::versioned_value& new_value) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
virtual void on_change(
|
||||
virtual future<> on_change(
|
||||
gms::inet_address endpoint,
|
||||
gms::application_state state,
|
||||
const gms::versioned_value& value) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
virtual void on_alive(
|
||||
virtual future<> on_alive(
|
||||
gms::inet_address endpoint,
|
||||
gms::endpoint_state state) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
virtual void on_dead(
|
||||
virtual future<> on_dead(
|
||||
gms::inet_address endpoint,
|
||||
gms::endpoint_state state) override {
|
||||
remove_row_level_repair(endpoint);
|
||||
return make_ready_future();
|
||||
}
|
||||
virtual void on_remove(
|
||||
virtual future<> on_remove(
|
||||
gms::inet_address endpoint) override {
|
||||
remove_row_level_repair(endpoint);
|
||||
return make_ready_future();
|
||||
}
|
||||
virtual void on_restart(
|
||||
virtual future<> on_restart(
|
||||
gms::inet_address endpoint,
|
||||
gms::endpoint_state ep_state) override {
|
||||
remove_row_level_repair(endpoint);
|
||||
return make_ready_future();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -66,29 +66,32 @@ public:
|
||||
assert(_stopped);
|
||||
}
|
||||
|
||||
virtual void on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override {
|
||||
virtual future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override {
|
||||
if (state == gms::application_state::LOAD) {
|
||||
_load_info[endpoint] = std::stod(value.value);
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override {
|
||||
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override {
|
||||
auto* local_value = ep_state.get_application_state_ptr(gms::application_state::LOAD);
|
||||
if (local_value) {
|
||||
on_change(endpoint, gms::application_state::LOAD, *local_value);
|
||||
return on_change(endpoint, gms::application_state::LOAD, *local_value);
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& newValue) override {}
|
||||
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& newValue) override { return make_ready_future(); }
|
||||
|
||||
void on_alive(gms::inet_address endpoint, gms::endpoint_state) override {}
|
||||
future<> on_alive(gms::inet_address endpoint, gms::endpoint_state) override { return make_ready_future(); }
|
||||
|
||||
void on_dead(gms::inet_address endpoint, gms::endpoint_state) override {}
|
||||
future<> on_dead(gms::inet_address endpoint, gms::endpoint_state) override { return make_ready_future(); }
|
||||
|
||||
void on_restart(gms::inet_address endpoint, gms::endpoint_state) override {}
|
||||
future<> on_restart(gms::inet_address endpoint, gms::endpoint_state) override { return make_ready_future(); }
|
||||
|
||||
virtual void on_remove(gms::inet_address endpoint) override {
|
||||
virtual future<> on_remove(gms::inet_address endpoint) override {
|
||||
_load_info.erase(endpoint);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
const std::unordered_map<gms::inet_address, double> get_load_info() const {
|
||||
|
||||
@@ -1232,25 +1232,28 @@ future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_ver
|
||||
return db::schema_tables::get_column_mapping(table_id, v);
|
||||
}
|
||||
|
||||
void migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
schedule_schema_pull(endpoint, ep_state);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void migration_manager::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) {
|
||||
future<> migration_manager::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) {
|
||||
if (state == gms::application_state::SCHEMA) {
|
||||
auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
|
||||
if (!ep_state || _gossiper.is_dead_state(*ep_state)) {
|
||||
mlogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
|
||||
return;
|
||||
return make_ready_future();
|
||||
}
|
||||
if (_storage_proxy.get_token_metadata_ptr()->is_member(endpoint)) {
|
||||
schedule_schema_pull(endpoint, *ep_state);
|
||||
}
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
schedule_schema_pull(endpoint, state);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -235,13 +235,13 @@ public:
|
||||
future<schema_ptr> get_schema_for_write(table_schema_version, netw::msg_addr from, netw::messaging_service& ms);
|
||||
|
||||
private:
|
||||
virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
virtual void on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override;
|
||||
virtual void on_alive(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
virtual void on_dead(gms::inet_address endpoint, gms::endpoint_state state) override {}
|
||||
virtual void on_remove(gms::inet_address endpoint) override {}
|
||||
virtual void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override {}
|
||||
virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override {}
|
||||
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
virtual future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override;
|
||||
virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state state) override { return make_ready_future(); }
|
||||
virtual future<> on_remove(gms::inet_address endpoint) override { return make_ready_future(); }
|
||||
virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state) override { return make_ready_future(); }
|
||||
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { return make_ready_future(); }
|
||||
};
|
||||
|
||||
future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_version v);
|
||||
|
||||
@@ -277,7 +277,7 @@ future<> view_update_backlog_broker::stop() {
|
||||
});
|
||||
}
|
||||
|
||||
void view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) {
|
||||
future<> view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) {
|
||||
if (state == gms::application_state::VIEW_BACKLOG) {
|
||||
size_t current;
|
||||
size_t max;
|
||||
@@ -287,16 +287,16 @@ void view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::appl
|
||||
for (auto* ptr : {¤t, &max}) {
|
||||
*ptr = std::strtoull(start_bound, &end_bound, 10);
|
||||
if (*ptr == ULLONG_MAX) {
|
||||
return;
|
||||
return make_ready_future();;
|
||||
}
|
||||
start_bound = end_bound + 1;
|
||||
}
|
||||
if (max == 0) {
|
||||
return;
|
||||
return make_ready_future();
|
||||
}
|
||||
ticks = std::strtoll(start_bound, &end_bound, 10);
|
||||
if (ticks == 0 || ticks == LLONG_MAX || end_bound != value.value.data() + value.value.size()) {
|
||||
return;
|
||||
return make_ready_future();
|
||||
}
|
||||
auto backlog = view_update_backlog_timestamped{db::view::update_backlog{current, max}, ticks};
|
||||
auto[it, inserted] = _sp.local()._view_update_backlogs.try_emplace(endpoint, std::move(backlog));
|
||||
@@ -304,10 +304,12 @@ void view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::appl
|
||||
it->second = std::move(backlog);
|
||||
}
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void view_update_backlog_broker::on_remove(gms::inet_address endpoint) {
|
||||
future<> view_update_backlog_broker::on_remove(gms::inet_address endpoint) {
|
||||
_sp.local()._view_update_backlogs.erase(endpoint);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1113,14 +1113,14 @@ void storage_service::handle_state_removing(inet_address endpoint, std::vector<s
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
future<> storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
slogger.debug("endpoint={} on_join", endpoint);
|
||||
for (const auto& e : ep_state.get_application_state_map()) {
|
||||
on_change(endpoint, e.first, e.second);
|
||||
co_await on_change(endpoint, e.first, e.second);
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
future<> storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
slogger.debug("endpoint={} on_alive", endpoint);
|
||||
if (get_token_metadata().is_member(endpoint)) {
|
||||
notify_up(endpoint);
|
||||
@@ -1128,25 +1128,26 @@ void storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state s
|
||||
if (_replacing_nodes_pending_ranges_updater.contains(endpoint)) {
|
||||
_replacing_nodes_pending_ranges_updater.erase(endpoint);
|
||||
slogger.info("Trigger pending ranges updater for replacing node {}", endpoint);
|
||||
auto tmlock = get_token_metadata_lock().get0();
|
||||
auto tmptr = get_mutable_token_metadata_ptr().get0();
|
||||
auto tmlock = co_await get_token_metadata_lock();
|
||||
auto tmptr = co_await get_mutable_token_metadata_ptr();
|
||||
handle_state_replacing_update_pending_ranges(tmptr, endpoint);
|
||||
replicate_to_all_cores(std::move(tmptr)).get();
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) {
|
||||
future<> storage_service::before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) {
|
||||
slogger.debug("endpoint={} before_change: new app_state={}, new versioned_value={}", endpoint, new_state_key, new_value);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void storage_service::on_change(inet_address endpoint, application_state state, const versioned_value& value) {
|
||||
future<> storage_service::on_change(inet_address endpoint, application_state state, const versioned_value& value) {
|
||||
slogger.debug("endpoint={} on_change: app_state={}, versioned_value={}", endpoint, state, value);
|
||||
if (state == application_state::STATUS) {
|
||||
std::vector<sstring> pieces;
|
||||
boost::split(pieces, value.value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
|
||||
if (pieces.empty()) {
|
||||
slogger.warn("Fail to split status in on_change: endpoint={}, app_state={}, value={}", endpoint, state, value);
|
||||
return;
|
||||
co_return;
|
||||
}
|
||||
sstring move_name = pieces[0];
|
||||
if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) {
|
||||
@@ -1166,13 +1167,13 @@ void storage_service::on_change(inet_address endpoint, application_state state,
|
||||
} else if (move_name == sstring(versioned_value::HIBERNATE)) {
|
||||
handle_state_replacing(endpoint);
|
||||
} else {
|
||||
return; // did nothing.
|
||||
co_return; // did nothing.
|
||||
}
|
||||
} else {
|
||||
auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
|
||||
if (!ep_state || _gossiper.is_dead_state(*ep_state)) {
|
||||
slogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
|
||||
return;
|
||||
co_return;
|
||||
}
|
||||
if (get_token_metadata().is_member(endpoint)) {
|
||||
do_update_system_peers_table(endpoint, state, value);
|
||||
@@ -1185,26 +1186,28 @@ void storage_service::on_change(inet_address endpoint, application_state state,
|
||||
}
|
||||
|
||||
|
||||
void storage_service::on_remove(gms::inet_address endpoint) {
|
||||
future<> storage_service::on_remove(gms::inet_address endpoint) {
|
||||
slogger.debug("endpoint={} on_remove", endpoint);
|
||||
auto tmlock = get_token_metadata_lock().get0();
|
||||
auto tmptr = get_mutable_token_metadata_ptr().get0();
|
||||
auto tmlock = co_await get_token_metadata_lock();
|
||||
auto tmptr = co_await get_mutable_token_metadata_ptr();
|
||||
tmptr->remove_endpoint(endpoint);
|
||||
update_pending_ranges(tmptr, format("on_remove {}", endpoint)).get();
|
||||
replicate_to_all_cores(std::move(tmptr)).get();
|
||||
co_await update_pending_ranges(tmptr, format("on_remove {}", endpoint));
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
}
|
||||
|
||||
void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
future<> storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
slogger.debug("endpoint={} on_dead", endpoint);
|
||||
notify_down(endpoint);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
future<> storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
slogger.debug("endpoint={} on_restart", endpoint);
|
||||
// If we have restarted before the node was even marked down, we need to reset the connection pool
|
||||
if (state.is_alive()) {
|
||||
on_dead(endpoint, state);
|
||||
return on_dead(endpoint, state);
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
|
||||
@@ -499,8 +499,8 @@ public:
|
||||
const sstring& keyspace,
|
||||
const dht::token_range_vector& ranges) const;
|
||||
public:
|
||||
virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) override;
|
||||
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) override;
|
||||
/*
|
||||
* Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the
|
||||
* ApplicationState has not necessarily "changed" since the last known value, if we already received the same update
|
||||
@@ -533,11 +533,11 @@ public:
|
||||
* Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that
|
||||
* you should never bootstrap a new node during a removenode, decommission or move.
|
||||
*/
|
||||
virtual void on_change(inet_address endpoint, application_state state, const versioned_value& value) override;
|
||||
virtual void on_alive(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
virtual void on_dead(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
virtual void on_remove(gms::inet_address endpoint) override;
|
||||
virtual void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value) override;
|
||||
virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
virtual future<> on_remove(gms::inet_address endpoint) override;
|
||||
virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
|
||||
public:
|
||||
// For migration_listener
|
||||
|
||||
@@ -52,15 +52,15 @@ public:
|
||||
|
||||
seastar::future<> stop();
|
||||
|
||||
virtual void on_change(gms::inet_address, gms::application_state, const gms::versioned_value&) override;
|
||||
virtual future<> on_change(gms::inet_address, gms::application_state, const gms::versioned_value&) override;
|
||||
|
||||
virtual void on_remove(gms::inet_address) override;
|
||||
virtual future<> on_remove(gms::inet_address) override;
|
||||
|
||||
virtual void on_join(gms::inet_address, gms::endpoint_state) override { }
|
||||
virtual void before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override { }
|
||||
virtual void on_alive(gms::inet_address, gms::endpoint_state) override { }
|
||||
virtual void on_dead(gms::inet_address, gms::endpoint_state) override { }
|
||||
virtual void on_restart(gms::inet_address, gms::endpoint_state) override { }
|
||||
virtual future<> on_join(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
virtual future<> before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override { return make_ready_future(); }
|
||||
virtual future<> on_alive(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
virtual future<> on_dead(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
virtual future<> on_restart(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -307,7 +307,7 @@ void stream_manager::fail_all_sessions() {
|
||||
}
|
||||
}
|
||||
|
||||
void stream_manager::on_remove(inet_address endpoint) {
|
||||
future<> stream_manager::on_remove(inet_address endpoint) {
|
||||
if (has_peer(endpoint)) {
|
||||
sslog.info("stream_manager: Close all stream_session with peer = {} in on_remove", endpoint);
|
||||
//FIXME: discarded future.
|
||||
@@ -317,9 +317,10 @@ void stream_manager::on_remove(inet_address endpoint) {
|
||||
sslog.warn("stream_manager: Fail to close sessions peer = {} in on_remove", endpoint);
|
||||
});
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state) {
|
||||
future<> stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state) {
|
||||
if (has_peer(endpoint)) {
|
||||
sslog.info("stream_manager: Close all stream_session with peer = {} in on_restart", endpoint);
|
||||
//FIXME: discarded future.
|
||||
@@ -329,9 +330,10 @@ void stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state)
|
||||
sslog.warn("stream_manager: Fail to close sessions peer = {} in on_restart", endpoint);
|
||||
});
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state) {
|
||||
future<> stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state) {
|
||||
if (has_peer(endpoint)) {
|
||||
sslog.info("stream_manager: Close all stream_session with peer = {} in on_dead", endpoint);
|
||||
//FIXME: discarded future.
|
||||
@@ -341,6 +343,7 @@ void stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state) {
|
||||
sslog.warn("stream_manager: Fail to close sessions peer = {} in on_dead", endpoint);
|
||||
});
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
shared_ptr<stream_session> stream_manager::get_session(utils::UUID plan_id, gms::inet_address from, const char* verb, std::optional<utils::UUID> cf_id) {
|
||||
|
||||
@@ -193,13 +193,13 @@ public:
|
||||
shared_ptr<stream_session> get_session(utils::UUID plan_id, gms::inet_address from, const char* verb, std::optional<utils::UUID> cf_id = {});
|
||||
|
||||
public:
|
||||
virtual void on_join(inet_address endpoint, endpoint_state ep_state) override {}
|
||||
virtual void before_change(inet_address endpoint, endpoint_state current_state, application_state new_state_key, const versioned_value& new_value) override {}
|
||||
virtual void on_change(inet_address endpoint, application_state state, const versioned_value& value) override {}
|
||||
virtual void on_alive(inet_address endpoint, endpoint_state state) override {}
|
||||
virtual void on_dead(inet_address endpoint, endpoint_state state) override;
|
||||
virtual void on_remove(inet_address endpoint) override;
|
||||
virtual void on_restart(inet_address endpoint, endpoint_state ep_state) override;
|
||||
virtual future<> on_join(inet_address endpoint, endpoint_state ep_state) override { return make_ready_future(); }
|
||||
virtual future<> before_change(inet_address endpoint, endpoint_state current_state, application_state new_state_key, const versioned_value& new_value) override { return make_ready_future(); }
|
||||
virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value) override { return make_ready_future(); }
|
||||
virtual future<> on_alive(inet_address endpoint, endpoint_state state) override { return make_ready_future(); }
|
||||
virtual future<> on_dead(inet_address endpoint, endpoint_state state) override;
|
||||
virtual future<> on_remove(inet_address endpoint) override;
|
||||
virtual future<> on_restart(inet_address endpoint, endpoint_state ep_state) override;
|
||||
|
||||
private:
|
||||
void fail_all_sessions();
|
||||
|
||||
Reference in New Issue
Block a user