message/messaging_service: add feature_service dependency

(cherry-picked from 71a03ef6b0)
This commit is contained in:
Michał Jadwiszczak
2024-08-02 12:10:56 +02:00
parent f928bb7967
commit d11df0fcbc
6 changed files with 23 additions and 9 deletions

View File

@@ -1375,7 +1375,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}
// Delay listening messaging_service until gossip message handlers are registered
messaging.start(mscfg, scfg, creds).get();
messaging.start(mscfg, scfg, creds, std::ref(feature_service)).get();
auto stop_ms = defer_verbose_shutdown("messaging service", [&messaging] {
messaging.invoke_on_all(&netw::messaging_service::stop).get();
});

View File

@@ -118,6 +118,7 @@
#include "idl/mapreduce_request.dist.impl.hh"
#include "idl/storage_service.dist.impl.hh"
#include "idl/join_node.dist.impl.hh"
#include "gms/feature_service.hh"
namespace netw {
@@ -231,9 +232,9 @@ future<> messaging_service::unregister_handler(messaging_verb verb) {
return _rpc->unregister_handler(verb);
}
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port)
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service)
: messaging_service(config{std::move(id), ip, ip, port},
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr)
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr, feature_service)
{}
static
@@ -418,13 +419,14 @@ void messaging_service::do_start_listen() {
}
}
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials)
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials, gms::feature_service& feature_service)
: _cfg(std::move(cfg))
, _rpc(new rpc_protocol_wrapper(serializer { }))
, _credentials_builder(credentials ? std::make_unique<seastar::tls::credentials_builder>(*credentials) : nullptr)
, _clients(PER_SHARD_CONNECTION_COUNT + scfg.statement_tenants.size() * PER_TENANT_CONNECTION_COUNT)
, _scheduling_config(scfg)
, _scheduling_info_for_connection_index(initial_scheduling_info())
, _feature_service(feature_service)
{
_rpc->set_logger(&rpc_logger);

View File

@@ -44,6 +44,7 @@ namespace gms {
class gossip_digest_ack2;
class gossip_get_endpoint_states_request;
class gossip_get_endpoint_states_response;
class feature_service;
}
namespace db {
@@ -332,6 +333,7 @@ private:
scheduling_config _scheduling_config;
std::vector<scheduling_info_for_connection_index> _scheduling_info_for_connection_index;
std::vector<tenant_connection_index> _connection_index_for_tenant;
gms::feature_service& _feature_service;
struct connection_ref;
std::unordered_multimap<locator::host_id, connection_ref> _host_connections;
@@ -346,8 +348,8 @@ private:
public:
using clock_type = lowres_clock;
messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port);
messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder>);
messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service);
messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder>, gms::feature_service& feature_service);
~messaging_service();
future<> start();

View File

@@ -683,7 +683,7 @@ private:
port = distrib(gen);
}
// Don't start listening so tests can be run in parallel if cfg_in.ms_listen is not set to true explicitly.
_ms.start(host_id, listen, std::move(port)).get();
_ms.start(host_id, listen, std::move(port), std::ref(_feature_service)).get();
auto stop_ms = defer([this] { _ms.stop().get(); });
if (cfg_in.ms_listen) {

View File

@@ -12,7 +12,9 @@
#include <seastar/core/app-template.hh>
#include <seastar/util/closeable.hh>
#include "db/config.hh"
#include "db/system_distributed_keyspace.hh"
#include "gms/feature_service.hh"
#include "message/messaging_service.hh"
#include "gms/gossiper.hh"
#include "gms/application_state.hh"
@@ -56,6 +58,7 @@ int main(int ac, char ** av) {
sharded<abort_source> abort_sources;
sharded<locator::shared_token_metadata> token_metadata;
sharded<gms::feature_service> feature_service;
sharded<netw::messaging_service> messaging;
abort_sources.start().get();
@@ -68,7 +71,10 @@ int main(int ac, char ** av) {
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_token_mgr = defer([&] { token_metadata.stop().get(); });
messaging.start(locator::host_id{}, listen, 7000).get();
auto cfg = gms::feature_config_from_db_config(db::config(), {});
feature_service.start(cfg).get();
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service)).get();
auto stop_messaging = deferred_stop(messaging);
gms::gossip_config gcfg;

View File

@@ -15,6 +15,7 @@
#include <seastar/core/thread.hh>
#include <seastar/rpc/rpc_types.hh>
#include <seastar/util/closeable.hh>
#include "gms/feature_service.hh"
#include "message/messaging_service.hh"
#include "gms/gossip_digest_syn.hh"
#include "gms/gossip_digest_ack.hh"
@@ -192,8 +193,11 @@ int main(int ac, char ** av) {
sharded<locator::shared_token_metadata> token_metadata;
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_tm = deferred_stop(token_metadata);
seastar::sharded<gms::feature_service> feature_service;
auto cfg = gms::feature_config_from_db_config(db::config(), {});
feature_service.start(cfg).get();
seastar::sharded<netw::messaging_service> messaging;
messaging.start(locator::host_id{}, listen, 7000).get();
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service)).get();
auto stop_messaging = deferred_stop(messaging);
seastar::sharded<tester> testers;
testers.start(std::ref(messaging)).get();