messaging_service: accept broadcast_addr in config rather than via fb_utilities

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-11-30 09:47:13 +02:00
parent 586f35bb55
commit 984a576405
4 changed files with 16 additions and 12 deletions

View File

@@ -1223,6 +1223,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
mscfg.id = host_id;
mscfg.ip = listen_address;
mscfg.broadcast_address = broadcast_addr;
mscfg.port = cfg->storage_port();
mscfg.ssl_port = cfg->ssl_storage_port();
mscfg.listen_on_broadcast_address = cfg->listen_on_broadcast_address();

View File

@@ -240,7 +240,7 @@ future<> messaging_service::unregister_handler(messaging_verb verb) {
}
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port)
: messaging_service(config{std::move(id), std::move(ip), port},
: messaging_service(config{std::move(id), ip, ip, port},
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr)
{}
@@ -336,7 +336,8 @@ bool messaging_service::is_host_banned(locator::host_id id) {
}
void messaging_service::do_start_listen() {
bool listen_to_bc = _cfg.listen_on_broadcast_address && _cfg.ip != utils::fb_utilities::get_broadcast_address();
auto broadcast_address = this->broadcast_address();
bool listen_to_bc = _cfg.listen_on_broadcast_address && _cfg.ip != broadcast_address;
rpc::server_options so;
if (_cfg.compress != compress_what::none) {
so.compressor_factory = &compressor_factory;
@@ -379,7 +380,7 @@ void messaging_service::do_start_listen() {
};
_server[0] = listen(_cfg.ip, rpc::streaming_domain_type(0x55AA));
if (listen_to_bc) {
_server[1] = listen(utils::fb_utilities::get_broadcast_address(), rpc::streaming_domain_type(0x66BB));
_server[1] = listen(broadcast_address, rpc::streaming_domain_type(0x66BB));
}
}
@@ -405,22 +406,22 @@ void messaging_service::do_start_listen() {
};
_server_tls[0] = listen(_cfg.ip, rpc::streaming_domain_type(0x77CC));
if (listen_to_bc) {
_server_tls[1] = listen(utils::fb_utilities::get_broadcast_address(), rpc::streaming_domain_type(0x88DD));
_server_tls[1] = listen(broadcast_address, rpc::streaming_domain_type(0x88DD));
}
}
// Do this on just cpu 0, to avoid duplicate logs.
if (this_shard_id() == 0) {
if (_server_tls[0]) {
mlogger.info("Starting Encrypted Messaging Service on SSL port {}", _cfg.ssl_port);
mlogger.info("Starting Encrypted Messaging Service on SSL address {} port {}", _cfg.ip, _cfg.ssl_port);
}
if (_server_tls[1]) {
mlogger.info("Starting Encrypted Messaging Service on SSL broadcast address {} port {}", utils::fb_utilities::get_broadcast_address(), _cfg.ssl_port);
mlogger.info("Starting Encrypted Messaging Service on SSL broadcast address {} port {}", broadcast_address, _cfg.ssl_port);
}
if (_server[0]) {
mlogger.info("Starting Messaging Service on port {}", _cfg.port);
mlogger.info("Starting Messaging Service on address {} port {}", _cfg.ip, _cfg.port);
}
if (_server[1]) {
mlogger.info("Starting Messaging Service on broadcast address {} port {}", utils::fb_utilities::get_broadcast_address(), _cfg.port);
mlogger.info("Starting Messaging Service on broadcast address {} port {}", broadcast_address, _cfg.port);
}
}
}
@@ -816,7 +817,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
}
auto my_host_id = _cfg.id;
auto broadcast_address = utils::fb_utilities::get_broadcast_address();
auto broadcast_address = _cfg.broadcast_address;
bool listen_to_bc = _cfg.listen_on_broadcast_address && _cfg.ip != broadcast_address;
auto laddr = socket_address(listen_to_bc ? broadcast_address : _cfg.ip, 0);
@@ -921,7 +922,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
// No reply is received, nothing to wait for.
(void)_rpc->make_client<
rpc::no_wait_type(gms::inet_address, uint32_t, uint64_t, utils::UUID)>(messaging_verb::CLIENT_ID)(
*it->second.rpc_client, utils::fb_utilities::get_broadcast_address(), src_cpu_id,
*it->second.rpc_client, broadcast_address, src_cpu_id,
query::result_memory_limiter::maximum_result_size, my_host_id.uuid())
.handle_exception([ms = shared_from_this(), remote_addr, verb] (std::exception_ptr ep) {
mlogger.debug("Failed to send client id to {} for verb {}: {}", remote_addr, std::underlying_type_t<messaging_verb>(verb), ep);

View File

@@ -268,6 +268,7 @@ public:
struct config {
locator::host_id id;
gms::inet_address ip; // a.k.a. listen_address - the address this node is listening on
gms::inet_address broadcast_address; // This node's address, as told to other nodes
uint16_t port;
uint16_t ssl_port = 0;
encrypt_what encrypt = encrypt_what::none;
@@ -346,6 +347,9 @@ public:
gms::inet_address listen_address() const noexcept {
return _cfg.ip;
}
gms::inet_address broadcast_address() const noexcept {
return _cfg.broadcast_address;
}
future<> shutdown();
future<> stop();
static rpc::no_wait_type no_wait();

View File

@@ -22,7 +22,6 @@
#include "gms/gossip_digest.hh"
#include "api/api.hh"
#include "utils/UUID.hh"
#include "utils/fb_utilities.hh"
#include "log.hh"
#include "locator/token_metadata.hh"
#include "db/schema_tables.hh"
@@ -191,7 +190,6 @@ int main(int ac, char ** av) {
bool stay_alive = config["stay-alive"].as<bool>();
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
auto my_address = listen != gms::inet_address("0.0.0.0") ? listen : gms::inet_address("localhost");
utils::fb_utilities::set_broadcast_address(my_address);
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = my_address;
sharded<locator::shared_token_metadata> token_metadata;