diff --git a/cdc/generation.cc b/cdc/generation.cc index d6541eb574..efc27fcc6d 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -733,28 +733,28 @@ future<> generation_service::after_join(std::optional&& star _cdc_streams_rewrite_complete = maybe_rewrite_streams_descriptions(); } -void generation_service::on_join(gms::inet_address ep, gms::endpoint_state ep_state) { +future<> generation_service::on_join(gms::inet_address ep, gms::endpoint_state ep_state) { assert_shard_zero(__PRETTY_FUNCTION__); auto val = ep_state.get_application_state_ptr(gms::application_state::CDC_GENERATION_ID); if (!val) { - return; + return make_ready_future(); } - on_change(ep, gms::application_state::CDC_GENERATION_ID, *val); + return on_change(ep, gms::application_state::CDC_GENERATION_ID, *val); } -void generation_service::on_change(gms::inet_address ep, gms::application_state app_state, const gms::versioned_value& v) { +future<> generation_service::on_change(gms::inet_address ep, gms::application_state app_state, const gms::versioned_value& v) { assert_shard_zero(__PRETTY_FUNCTION__); if (app_state != gms::application_state::CDC_GENERATION_ID) { - return; + return make_ready_future(); } auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value); cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id); - handle_cdc_generation(gen_id).get(); + return handle_cdc_generation(gen_id); } future<> generation_service::check_and_repair_cdc_streams() { diff --git a/cdc/generation_service.hh b/cdc/generation_service.hh index 7baa594862..603d346e16 100644 --- a/cdc/generation_service.hh +++ b/cdc/generation_service.hh @@ -113,14 +113,14 @@ public: return _cdc_metadata; } - virtual void before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override {} - virtual void on_alive(gms::inet_address, gms::endpoint_state) override {} - virtual void on_dead(gms::inet_address, gms::endpoint_state) override {} - virtual void on_remove(gms::inet_address) override {} - virtual void on_restart(gms::inet_address, gms::endpoint_state) override {} + virtual future<> before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override { return make_ready_future(); } + virtual future<> on_alive(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); } + virtual future<> on_dead(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); } + virtual future<> on_remove(gms::inet_address) override { return make_ready_future(); } + virtual future<> on_restart(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); } - virtual void on_join(gms::inet_address, gms::endpoint_state) override; - virtual void on_change(gms::inet_address, gms::application_state, const gms::versioned_value&) override; + virtual future<> on_join(gms::inet_address, gms::endpoint_state) override; + virtual future<> on_change(gms::inet_address, gms::application_state, const gms::versioned_value&) override; future<> check_and_repair_cdc_streams(); diff --git a/gms/gossiper.cc b/gms/gossiper.cc index cc3743a518..82631df9f3 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -106,19 +106,20 @@ class feature_enabler : public i_endpoint_state_change_subscriber { gossiper& _g; public: feature_enabler(gossiper& g) : _g(g) {} - void on_join(inet_address ep, endpoint_state state) override { - _g.maybe_enable_features().get(); + future<> on_join(inet_address ep, endpoint_state state) override { + return _g.maybe_enable_features(); } - void on_change(inet_address ep, application_state state, const versioned_value&) override { + future<> on_change(inet_address ep, application_state state, const versioned_value&) override { if (state == application_state::SUPPORTED_FEATURES) { - _g.maybe_enable_features().get(); + return _g.maybe_enable_features(); } + return make_ready_future(); } - void before_change(inet_address, endpoint_state, application_state, const versioned_value&) override { } - void on_alive(inet_address, endpoint_state) override {} - void on_dead(inet_address, endpoint_state) override {} - void on_remove(inet_address) override {} - void on_restart(inet_address, endpoint_state) override {} + future<> before_change(inet_address, endpoint_state, application_state, const versioned_value&) override { return make_ready_future(); } + future<> on_alive(inet_address, endpoint_state) override { return make_ready_future(); } + future<> on_dead(inet_address, endpoint_state) override { return make_ready_future(); } + future<> on_remove(inet_address) override { return make_ready_future(); } + future<> on_restart(inet_address, endpoint_state) override { return make_ready_future(); } }; gossiper::gossiper(abort_source& as, feature_service& features, const locator::shared_token_metadata& stm, netw::messaging_service& ms, db::config& cfg, gossip_config gcfg) @@ -676,9 +677,9 @@ future<> gossiper::remove_endpoint(inet_address endpoint) { // We can not run on_remove callbacks here becasue on_remove in // storage_service might take the gossiper::timer_callback_lock (void)seastar::async([this, endpoint] { - _subscribers.thread_for_each([endpoint] (shared_ptr subscriber) { - subscriber->on_remove(endpoint); - }); + _subscribers.for_each([endpoint] (shared_ptr subscriber) { + return subscriber->on_remove(endpoint); + }).get(); }).handle_exception([] (auto ep) { logger.warn("Fail to call on_remove callback: {}", ep); }); @@ -1578,10 +1579,10 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) { logger.info("InetAddress {} is now UP, status = {}", addr, status); } - _subscribers.thread_for_each([addr, state] (shared_ptr subscriber) { - subscriber->on_alive(addr, state); + _subscribers.for_each([addr, state] (shared_ptr subscriber) -> future<> { + co_await subscriber->on_alive(addr, state); logger.trace("Notified {}", fmt::ptr(subscriber.get())); - }); + }).get(); } // Runs inside seastar::async context @@ -1593,10 +1594,10 @@ void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) { update_live_endpoints_version().get(); _unreachable_endpoints[addr] = now(); logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state)); - _subscribers.thread_for_each([addr, state] (shared_ptr subscriber) { - subscriber->on_dead(addr, state); + _subscribers.for_each([addr, state] (shared_ptr subscriber) -> future<> { + co_await subscriber->on_dead(addr, state); logger.trace("Notified {}", fmt::ptr(subscriber.get())); - }); + }).get(); } // Runs inside seastar::async context @@ -1625,9 +1626,9 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& if (eps_old) { // the node restarted: it is up to the subscriber to take whatever action is necessary - _subscribers.thread_for_each([ep, eps_old] (shared_ptr subscriber) { - subscriber->on_restart(ep, *eps_old); - }); + _subscribers.for_each([ep, eps_old] (shared_ptr subscriber) { + return subscriber->on_restart(ep, *eps_old); + }).get(); } auto& ep_state = endpoint_state_map.at(ep); @@ -1640,9 +1641,9 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& auto* eps_new = get_endpoint_state_for_endpoint_ptr(ep); if (eps_new) { - _subscribers.thread_for_each([ep, eps_new] (shared_ptr subscriber) { - subscriber->on_join(ep, *eps_new); - }); + _subscribers.for_each([ep, eps_new] (shared_ptr subscriber) { + return subscriber->on_join(ep, *eps_new); + }).get(); } // check this at the end so nodes will learn about the endpoint if (is_shutdown(ep)) { @@ -1742,16 +1743,16 @@ void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state, // Runs inside seastar::async context void gossiper::do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value) { - _subscribers.thread_for_each([addr, ep_state, ap_state, new_value] (shared_ptr subscriber) { - subscriber->before_change(addr, ep_state, ap_state, new_value); - }); + _subscribers.for_each([addr, ep_state, ap_state, new_value] (shared_ptr subscriber) { + return subscriber->before_change(addr, ep_state, ap_state, new_value); + }).get(); } // Runs inside seastar::async context void gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) { - _subscribers.thread_for_each([addr, state, value] (shared_ptr subscriber) { - subscriber->on_change(addr, state, value); - }); + _subscribers.for_each([addr, state, value] (shared_ptr subscriber) { + return subscriber->on_change(addr, state, value); + }).get(); } void gossiper::request_all(gossip_digest& g_digest, diff --git a/gms/i_endpoint_state_change_subscriber.hh b/gms/i_endpoint_state_change_subscriber.hh index 91931420f3..a5dcffe635 100644 --- a/gms/i_endpoint_state_change_subscriber.hh +++ b/gms/i_endpoint_state_change_subscriber.hh @@ -63,17 +63,17 @@ public: * @param endpoint endpoint for which the state change occurred. * @param epState state that actually changed for the above endpoint. */ - virtual void on_join(inet_address endpoint, endpoint_state ep_state) = 0; + virtual future<> on_join(inet_address endpoint, endpoint_state ep_state) = 0; - virtual void before_change(inet_address endpoint, endpoint_state current_state, application_state new_statekey, const versioned_value& newvalue) = 0; + virtual future<> before_change(inet_address endpoint, endpoint_state current_state, application_state new_statekey, const versioned_value& newvalue) = 0; - virtual void on_change(inet_address endpoint, application_state state, const versioned_value& value) = 0; + virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value) = 0; - virtual void on_alive(inet_address endpoint, endpoint_state state) = 0; + virtual future<> on_alive(inet_address endpoint, endpoint_state state) = 0; - virtual void on_dead(inet_address endpoint, endpoint_state state) = 0; + virtual future<> on_dead(inet_address endpoint, endpoint_state state) = 0; - virtual void on_remove(inet_address endpoint) = 0; + virtual future<> on_remove(inet_address endpoint) = 0; /** * Called whenever a node is restarted. @@ -81,7 +81,7 @@ public: * previously marked down. It will have only if {@code state.isAlive() == false} * as {@code state} is from before the restarted node is marked up. */ - virtual void on_restart(inet_address endpoint, endpoint_state state) = 0; + virtual future<> on_restart(inet_address endpoint, endpoint_state state) = 0; }; } // namespace gms diff --git a/locator/production_snitch_base.cc b/locator/production_snitch_base.cc index c87a80b067..8f5fbdce6d 100644 --- a/locator/production_snitch_base.cc +++ b/locator/production_snitch_base.cc @@ -261,37 +261,43 @@ void reconnectable_snitch_helper::reconnect(gms::inet_address public_address, gm reconnectable_snitch_helper::reconnectable_snitch_helper(sstring local_dc) : _local_dc(local_dc) {} -void reconnectable_snitch_helper::before_change(gms::inet_address endpoint, gms::endpoint_state cs, gms::application_state new_state_key, const gms::versioned_value& new_value) { +future<> reconnectable_snitch_helper::before_change(gms::inet_address endpoint, gms::endpoint_state cs, gms::application_state new_state_key, const gms::versioned_value& new_value) { // do nothing. + return make_ready_future(); } -void reconnectable_snitch_helper::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) { +future<> reconnectable_snitch_helper::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) { auto* internal_ip_state = ep_state.get_application_state_ptr(gms::application_state::INTERNAL_IP); if (internal_ip_state) { reconnect(endpoint, *internal_ip_state); } + return make_ready_future(); } -void reconnectable_snitch_helper::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) { +future<> reconnectable_snitch_helper::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) { if (state == gms::application_state::INTERNAL_IP) { reconnect(endpoint, value); } + return make_ready_future(); } -void reconnectable_snitch_helper::on_alive(gms::inet_address endpoint, gms::endpoint_state ep_state) { - on_join(std::move(endpoint), std::move(ep_state)); +future<> reconnectable_snitch_helper::on_alive(gms::inet_address endpoint, gms::endpoint_state ep_state) { + return on_join(std::move(endpoint), std::move(ep_state)); } -void reconnectable_snitch_helper::on_dead(gms::inet_address endpoint, gms::endpoint_state ep_state) { +future<> reconnectable_snitch_helper::on_dead(gms::inet_address endpoint, gms::endpoint_state ep_state) { // do nothing. + return make_ready_future(); } -void reconnectable_snitch_helper::on_remove(gms::inet_address endpoint) { +future<> reconnectable_snitch_helper::on_remove(gms::inet_address endpoint) { // do nothing. + return make_ready_future(); } -void reconnectable_snitch_helper::on_restart(gms::inet_address endpoint, gms::endpoint_state state) { +future<> reconnectable_snitch_helper::on_restart(gms::inet_address endpoint, gms::endpoint_state state) { // do nothing. + return make_ready_future(); } } // namespace locator diff --git a/locator/reconnectable_snitch_helper.hh b/locator/reconnectable_snitch_helper.hh index 7a8ce65ef2..1a41606c4d 100644 --- a/locator/reconnectable_snitch_helper.hh +++ b/locator/reconnectable_snitch_helper.hh @@ -52,12 +52,12 @@ private: void reconnect(gms::inet_address public_address, gms::inet_address local_address); public: reconnectable_snitch_helper(sstring local_dc); - void before_change(gms::inet_address endpoint, gms::endpoint_state cs, gms::application_state new_state_key, const gms::versioned_value& new_value) override; - void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override; - void on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override; - void on_alive(gms::inet_address endpoint, gms::endpoint_state ep_state) override; - void on_dead(gms::inet_address endpoint, gms::endpoint_state ep_state) override; - void on_remove(gms::inet_address endpoint) override; - void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override; + future<> before_change(gms::inet_address endpoint, gms::endpoint_state cs, gms::application_state new_state_key, const gms::versioned_value& new_value) override; + future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override; + future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override; + future<> on_alive(gms::inet_address endpoint, gms::endpoint_state ep_state) override; + future<> on_dead(gms::inet_address endpoint, gms::endpoint_state ep_state) override; + future<> on_remove(gms::inet_address endpoint) override; + future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state) override; }; } // namespace locator diff --git a/repair/row_level.cc b/repair/row_level.cc index 509c614a41..cffb4e1c67 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -3059,38 +3059,45 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc rlogger.warn("Failed to remove row level repair for node {}: {}", node, ep); }).get(); } - virtual void on_join( + virtual future<> on_join( gms::inet_address endpoint, gms::endpoint_state ep_state) override { + return make_ready_future(); } - virtual void before_change( + virtual future<> before_change( gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) override { + return make_ready_future(); } - virtual void on_change( + virtual future<> on_change( gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override { + return make_ready_future(); } - virtual void on_alive( + virtual future<> on_alive( gms::inet_address endpoint, gms::endpoint_state state) override { + return make_ready_future(); } - virtual void on_dead( + virtual future<> on_dead( gms::inet_address endpoint, gms::endpoint_state state) override { remove_row_level_repair(endpoint); + return make_ready_future(); } - virtual void on_remove( + virtual future<> on_remove( gms::inet_address endpoint) override { remove_row_level_repair(endpoint); + return make_ready_future(); } - virtual void on_restart( + virtual future<> on_restart( gms::inet_address endpoint, gms::endpoint_state ep_state) override { remove_row_level_repair(endpoint); + return make_ready_future(); } }; diff --git a/service/load_broadcaster.hh b/service/load_broadcaster.hh index fe204dc63a..9ade8e7838 100644 --- a/service/load_broadcaster.hh +++ b/service/load_broadcaster.hh @@ -66,29 +66,32 @@ public: assert(_stopped); } - virtual void on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override { + virtual future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override { if (state == gms::application_state::LOAD) { _load_info[endpoint] = std::stod(value.value); } + return make_ready_future(); } - virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override { + virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override { auto* local_value = ep_state.get_application_state_ptr(gms::application_state::LOAD); if (local_value) { - on_change(endpoint, gms::application_state::LOAD, *local_value); + return on_change(endpoint, gms::application_state::LOAD, *local_value); } + return make_ready_future(); } - virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& newValue) override {} + virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& newValue) override { return make_ready_future(); } - void on_alive(gms::inet_address endpoint, gms::endpoint_state) override {} + future<> on_alive(gms::inet_address endpoint, gms::endpoint_state) override { return make_ready_future(); } - void on_dead(gms::inet_address endpoint, gms::endpoint_state) override {} + future<> on_dead(gms::inet_address endpoint, gms::endpoint_state) override { return make_ready_future(); } - void on_restart(gms::inet_address endpoint, gms::endpoint_state) override {} + future<> on_restart(gms::inet_address endpoint, gms::endpoint_state) override { return make_ready_future(); } - virtual void on_remove(gms::inet_address endpoint) override { + virtual future<> on_remove(gms::inet_address endpoint) override { _load_info.erase(endpoint); + return make_ready_future(); } const std::unordered_map get_load_info() const { diff --git a/service/migration_manager.cc b/service/migration_manager.cc index e275446857..e1f6e968e4 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -1232,25 +1232,28 @@ future get_column_mapping(utils::UUID table_id, table_schema_ver return db::schema_tables::get_column_mapping(table_id, v); } -void migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) { +future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) { schedule_schema_pull(endpoint, ep_state); + return make_ready_future(); } -void migration_manager::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) { +future<> migration_manager::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) { if (state == gms::application_state::SCHEMA) { auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint); if (!ep_state || _gossiper.is_dead_state(*ep_state)) { mlogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint); - return; + return make_ready_future(); } if (_storage_proxy.get_token_metadata_ptr()->is_member(endpoint)) { schedule_schema_pull(endpoint, *ep_state); } } + return make_ready_future(); } -void migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state state) { +future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state state) { schedule_schema_pull(endpoint, state); + return make_ready_future(); } } diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 5943f2d1c8..493c08b9db 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -235,13 +235,13 @@ public: future get_schema_for_write(table_schema_version, netw::msg_addr from, netw::messaging_service& ms); private: - virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override; - virtual void on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override; - virtual void on_alive(gms::inet_address endpoint, gms::endpoint_state state) override; - virtual void on_dead(gms::inet_address endpoint, gms::endpoint_state state) override {} - virtual void on_remove(gms::inet_address endpoint) override {} - virtual void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override {} - virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override {} + virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override; + virtual future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override; + virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state state) override; + virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state state) override { return make_ready_future(); } + virtual future<> on_remove(gms::inet_address endpoint) override { return make_ready_future(); } + virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state) override { return make_ready_future(); } + virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { return make_ready_future(); } }; future get_column_mapping(utils::UUID table_id, table_schema_version v); diff --git a/service/misc_services.cc b/service/misc_services.cc index 4c0448f3d7..64f0ff7bdf 100644 --- a/service/misc_services.cc +++ b/service/misc_services.cc @@ -277,7 +277,7 @@ future<> view_update_backlog_broker::stop() { }); } -void view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) { +future<> view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) { if (state == gms::application_state::VIEW_BACKLOG) { size_t current; size_t max; @@ -287,16 +287,16 @@ void view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::appl for (auto* ptr : {¤t, &max}) { *ptr = std::strtoull(start_bound, &end_bound, 10); if (*ptr == ULLONG_MAX) { - return; + return make_ready_future();; } start_bound = end_bound + 1; } if (max == 0) { - return; + return make_ready_future(); } ticks = std::strtoll(start_bound, &end_bound, 10); if (ticks == 0 || ticks == LLONG_MAX || end_bound != value.value.data() + value.value.size()) { - return; + return make_ready_future(); } auto backlog = view_update_backlog_timestamped{db::view::update_backlog{current, max}, ticks}; auto[it, inserted] = _sp.local()._view_update_backlogs.try_emplace(endpoint, std::move(backlog)); @@ -304,10 +304,12 @@ void view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::appl it->second = std::move(backlog); } } + return make_ready_future(); } -void view_update_backlog_broker::on_remove(gms::inet_address endpoint) { +future<> view_update_backlog_broker::on_remove(gms::inet_address endpoint) { _sp.local()._view_update_backlogs.erase(endpoint); + return make_ready_future(); } } diff --git a/service/storage_service.cc b/service/storage_service.cc index 514634943f..4215ba5d4a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1113,14 +1113,14 @@ void storage_service::handle_state_removing(inet_address endpoint, std::vector storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) { slogger.debug("endpoint={} on_join", endpoint); for (const auto& e : ep_state.get_application_state_map()) { - on_change(endpoint, e.first, e.second); + co_await on_change(endpoint, e.first, e.second); } } -void storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state) { +future<> storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state) { slogger.debug("endpoint={} on_alive", endpoint); if (get_token_metadata().is_member(endpoint)) { notify_up(endpoint); @@ -1128,25 +1128,26 @@ void storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state s if (_replacing_nodes_pending_ranges_updater.contains(endpoint)) { _replacing_nodes_pending_ranges_updater.erase(endpoint); slogger.info("Trigger pending ranges updater for replacing node {}", endpoint); - auto tmlock = get_token_metadata_lock().get0(); - auto tmptr = get_mutable_token_metadata_ptr().get0(); + auto tmlock = co_await get_token_metadata_lock(); + auto tmptr = co_await get_mutable_token_metadata_ptr(); handle_state_replacing_update_pending_ranges(tmptr, endpoint); - replicate_to_all_cores(std::move(tmptr)).get(); + co_await replicate_to_all_cores(std::move(tmptr)); } } -void storage_service::before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) { +future<> storage_service::before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) { slogger.debug("endpoint={} before_change: new app_state={}, new versioned_value={}", endpoint, new_state_key, new_value); + return make_ready_future(); } -void storage_service::on_change(inet_address endpoint, application_state state, const versioned_value& value) { +future<> storage_service::on_change(inet_address endpoint, application_state state, const versioned_value& value) { slogger.debug("endpoint={} on_change: app_state={}, versioned_value={}", endpoint, state, value); if (state == application_state::STATUS) { std::vector pieces; boost::split(pieces, value.value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); if (pieces.empty()) { slogger.warn("Fail to split status in on_change: endpoint={}, app_state={}, value={}", endpoint, state, value); - return; + co_return; } sstring move_name = pieces[0]; if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) { @@ -1166,13 +1167,13 @@ void storage_service::on_change(inet_address endpoint, application_state state, } else if (move_name == sstring(versioned_value::HIBERNATE)) { handle_state_replacing(endpoint); } else { - return; // did nothing. + co_return; // did nothing. } } else { auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint); if (!ep_state || _gossiper.is_dead_state(*ep_state)) { slogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint); - return; + co_return; } if (get_token_metadata().is_member(endpoint)) { do_update_system_peers_table(endpoint, state, value); @@ -1185,26 +1186,28 @@ void storage_service::on_change(inet_address endpoint, application_state state, } -void storage_service::on_remove(gms::inet_address endpoint) { +future<> storage_service::on_remove(gms::inet_address endpoint) { slogger.debug("endpoint={} on_remove", endpoint); - auto tmlock = get_token_metadata_lock().get0(); - auto tmptr = get_mutable_token_metadata_ptr().get0(); + auto tmlock = co_await get_token_metadata_lock(); + auto tmptr = co_await get_mutable_token_metadata_ptr(); tmptr->remove_endpoint(endpoint); - update_pending_ranges(tmptr, format("on_remove {}", endpoint)).get(); - replicate_to_all_cores(std::move(tmptr)).get(); + co_await update_pending_ranges(tmptr, format("on_remove {}", endpoint)); + co_await replicate_to_all_cores(std::move(tmptr)); } -void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) { +future<> storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) { slogger.debug("endpoint={} on_dead", endpoint); notify_down(endpoint); + return make_ready_future(); } -void storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state) { +future<> storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state) { slogger.debug("endpoint={} on_restart", endpoint); // If we have restarted before the node was even marked down, we need to reset the connection pool if (state.is_alive()) { - on_dead(endpoint, state); + return on_dead(endpoint, state); } + return make_ready_future(); } // Runs inside seastar::async context diff --git a/service/storage_service.hh b/service/storage_service.hh index 0dd6fbf7f8..4d354cd36c 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -499,8 +499,8 @@ public: const sstring& keyspace, const dht::token_range_vector& ranges) const; public: - virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override; - virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) override; + virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override; + virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) override; /* * Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the * ApplicationState has not necessarily "changed" since the last known value, if we already received the same update @@ -533,11 +533,11 @@ public: * Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that * you should never bootstrap a new node during a removenode, decommission or move. */ - virtual void on_change(inet_address endpoint, application_state state, const versioned_value& value) override; - virtual void on_alive(gms::inet_address endpoint, gms::endpoint_state state) override; - virtual void on_dead(gms::inet_address endpoint, gms::endpoint_state state) override; - virtual void on_remove(gms::inet_address endpoint) override; - virtual void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override; + virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value) override; + virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state state) override; + virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state state) override; + virtual future<> on_remove(gms::inet_address endpoint) override; + virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state) override; public: // For migration_listener diff --git a/service/view_update_backlog_broker.hh b/service/view_update_backlog_broker.hh index 3ff096bf32..605998c27f 100644 --- a/service/view_update_backlog_broker.hh +++ b/service/view_update_backlog_broker.hh @@ -52,15 +52,15 @@ public: seastar::future<> stop(); - virtual void on_change(gms::inet_address, gms::application_state, const gms::versioned_value&) override; + virtual future<> on_change(gms::inet_address, gms::application_state, const gms::versioned_value&) override; - virtual void on_remove(gms::inet_address) override; + virtual future<> on_remove(gms::inet_address) override; - virtual void on_join(gms::inet_address, gms::endpoint_state) override { } - virtual void before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override { } - virtual void on_alive(gms::inet_address, gms::endpoint_state) override { } - virtual void on_dead(gms::inet_address, gms::endpoint_state) override { } - virtual void on_restart(gms::inet_address, gms::endpoint_state) override { } + virtual future<> on_join(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); } + virtual future<> before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override { return make_ready_future(); } + virtual future<> on_alive(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); } + virtual future<> on_dead(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); } + virtual future<> on_restart(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); } }; } diff --git a/streaming/stream_manager.cc b/streaming/stream_manager.cc index 59d6092a19..fafc5be0da 100644 --- a/streaming/stream_manager.cc +++ b/streaming/stream_manager.cc @@ -307,7 +307,7 @@ void stream_manager::fail_all_sessions() { } } -void stream_manager::on_remove(inet_address endpoint) { +future<> stream_manager::on_remove(inet_address endpoint) { if (has_peer(endpoint)) { sslog.info("stream_manager: Close all stream_session with peer = {} in on_remove", endpoint); //FIXME: discarded future. @@ -317,9 +317,10 @@ void stream_manager::on_remove(inet_address endpoint) { sslog.warn("stream_manager: Fail to close sessions peer = {} in on_remove", endpoint); }); } + return make_ready_future(); } -void stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state) { +future<> stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state) { if (has_peer(endpoint)) { sslog.info("stream_manager: Close all stream_session with peer = {} in on_restart", endpoint); //FIXME: discarded future. @@ -329,9 +330,10 @@ void stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state) sslog.warn("stream_manager: Fail to close sessions peer = {} in on_restart", endpoint); }); } + return make_ready_future(); } -void stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state) { +future<> stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state) { if (has_peer(endpoint)) { sslog.info("stream_manager: Close all stream_session with peer = {} in on_dead", endpoint); //FIXME: discarded future. @@ -341,6 +343,7 @@ void stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state) { sslog.warn("stream_manager: Fail to close sessions peer = {} in on_dead", endpoint); }); } + return make_ready_future(); } shared_ptr stream_manager::get_session(utils::UUID plan_id, gms::inet_address from, const char* verb, std::optional cf_id) { diff --git a/streaming/stream_manager.hh b/streaming/stream_manager.hh index b71d0cb3ce..2c06aa1fe2 100644 --- a/streaming/stream_manager.hh +++ b/streaming/stream_manager.hh @@ -193,13 +193,13 @@ public: shared_ptr get_session(utils::UUID plan_id, gms::inet_address from, const char* verb, std::optional cf_id = {}); public: - virtual void on_join(inet_address endpoint, endpoint_state ep_state) override {} - virtual void before_change(inet_address endpoint, endpoint_state current_state, application_state new_state_key, const versioned_value& new_value) override {} - virtual void on_change(inet_address endpoint, application_state state, const versioned_value& value) override {} - virtual void on_alive(inet_address endpoint, endpoint_state state) override {} - virtual void on_dead(inet_address endpoint, endpoint_state state) override; - virtual void on_remove(inet_address endpoint) override; - virtual void on_restart(inet_address endpoint, endpoint_state ep_state) override; + virtual future<> on_join(inet_address endpoint, endpoint_state ep_state) override { return make_ready_future(); } + virtual future<> before_change(inet_address endpoint, endpoint_state current_state, application_state new_state_key, const versioned_value& new_value) override { return make_ready_future(); } + virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value) override { return make_ready_future(); } + virtual future<> on_alive(inet_address endpoint, endpoint_state state) override { return make_ready_future(); } + virtual future<> on_dead(inet_address endpoint, endpoint_state state) override; + virtual future<> on_remove(inet_address endpoint) override; + virtual future<> on_restart(inet_address endpoint, endpoint_state ep_state) override; private: void fail_all_sessions();