locator: topology: add helpers to retrieve this host_id and address
And respective `is_me()` predicates, to prepare for getting rid of fb_utilities. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -22,7 +22,6 @@
|
|||||||
#include <boost/range/adaptors.hpp>
|
#include <boost/range/adaptors.hpp>
|
||||||
#include <seastar/core/smp.hh>
|
#include <seastar/core/smp.hh>
|
||||||
#include "utils/stall_free.hh"
|
#include "utils/stall_free.hh"
|
||||||
#include "utils/fb_utilities.hh"
|
|
||||||
|
|
||||||
namespace locator {
|
namespace locator {
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,6 @@
|
|||||||
#include "locator/topology.hh"
|
#include "locator/topology.hh"
|
||||||
#include "locator/production_snitch_base.hh"
|
#include "locator/production_snitch_base.hh"
|
||||||
#include "utils/stall_free.hh"
|
#include "utils/stall_free.hh"
|
||||||
#include "utils/fb_utilities.hh"
|
|
||||||
|
|
||||||
namespace locator {
|
namespace locator {
|
||||||
|
|
||||||
|
|||||||
@@ -161,6 +161,7 @@ class topology {
|
|||||||
public:
|
public:
|
||||||
struct config {
|
struct config {
|
||||||
inet_address this_endpoint;
|
inet_address this_endpoint;
|
||||||
|
inet_address this_cql_address; // corresponds to broadcast_rpc_address
|
||||||
host_id this_host_id;
|
host_id this_host_id;
|
||||||
endpoint_dc_rack local_dc_rack;
|
endpoint_dc_rack local_dc_rack;
|
||||||
bool disable_proximity_sorting = false;
|
bool disable_proximity_sorting = false;
|
||||||
@@ -336,6 +337,26 @@ public:
|
|||||||
|
|
||||||
void for_each_node(std::function<void(const node*)> func) const;
|
void for_each_node(std::function<void(const node*)> func) const;
|
||||||
|
|
||||||
|
host_id my_host_id() const noexcept {
|
||||||
|
return _cfg.this_host_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
inet_address my_address() const noexcept {
|
||||||
|
return _cfg.this_endpoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
inet_address my_cql_address() const noexcept {
|
||||||
|
return _cfg.this_cql_address;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool is_me(const locator::host_id& id) const noexcept {
|
||||||
|
return id == my_host_id();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool is_me(const inet_address& addr) const noexcept {
|
||||||
|
return addr == my_address();
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool is_configured_this_node(const node&) const;
|
bool is_configured_this_node(const node&) const;
|
||||||
const node* add_node(node_holder node);
|
const node* add_node(node_holder node);
|
||||||
|
|||||||
5
main.cc
5
main.cc
@@ -900,7 +900,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
|||||||
|
|
||||||
supervisor::notify("starting tokens manager");
|
supervisor::notify("starting tokens manager");
|
||||||
locator::token_metadata::config tm_cfg;
|
locator::token_metadata::config tm_cfg;
|
||||||
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
|
tm_cfg.topo_cfg.this_endpoint = broadcast_addr;
|
||||||
|
tm_cfg.topo_cfg.this_cql_address = broadcast_rpc_addr;
|
||||||
tm_cfg.topo_cfg.local_dc_rack = snitch.local()->get_location();
|
tm_cfg.topo_cfg.local_dc_rack = snitch.local()->get_location();
|
||||||
if (snitch.local()->get_name() == "org.apache.cassandra.locator.SimpleSnitch") {
|
if (snitch.local()->get_name() == "org.apache.cassandra.locator.SimpleSnitch") {
|
||||||
//
|
//
|
||||||
@@ -1206,7 +1207,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
|||||||
const auto listen_address = utils::resolve(cfg->listen_address, family).get0();
|
const auto listen_address = utils::resolve(cfg->listen_address, family).get0();
|
||||||
const auto host_id = initialize_local_info_thread(sys_ks, snitch, listen_address, *cfg);
|
const auto host_id = initialize_local_info_thread(sys_ks, snitch, listen_address, *cfg);
|
||||||
|
|
||||||
shared_token_metadata::mutate_on_all_shards(token_metadata, [host_id, endpoint = utils::fb_utilities::get_broadcast_address()] (locator::token_metadata& tm) {
|
shared_token_metadata::mutate_on_all_shards(token_metadata, [host_id, endpoint = broadcast_addr] (locator::token_metadata& tm) {
|
||||||
// Makes local host id available in topology cfg as soon as possible.
|
// Makes local host id available in topology cfg as soon as possible.
|
||||||
// Raft topology discard the endpoint-to-id map, so the local id can
|
// Raft topology discard the endpoint-to-id map, so the local id can
|
||||||
// still be found in the config.
|
// still be found in the config.
|
||||||
|
|||||||
@@ -14,7 +14,6 @@
|
|||||||
#include "locator/types.hh"
|
#include "locator/types.hh"
|
||||||
#include "test/lib/scylla_test_case.hh"
|
#include "test/lib/scylla_test_case.hh"
|
||||||
|
|
||||||
#include "utils/fb_utilities.hh"
|
|
||||||
#include "locator/host_id.hh"
|
#include "locator/host_id.hh"
|
||||||
#include "locator/topology.hh"
|
#include "locator/topology.hh"
|
||||||
#include "locator/load_sketch.hh"
|
#include "locator/load_sketch.hh"
|
||||||
@@ -32,7 +31,6 @@ SEASTAR_THREAD_TEST_CASE(test_add_node) {
|
|||||||
auto id3 = host_id::create_random_id();
|
auto id3 = host_id::create_random_id();
|
||||||
auto ep3 = gms::inet_address("127.0.0.3");
|
auto ep3 = gms::inet_address("127.0.0.3");
|
||||||
|
|
||||||
utils::fb_utilities::set_broadcast_address(ep1);
|
|
||||||
topology::config cfg = {
|
topology::config cfg = {
|
||||||
.this_endpoint = ep1,
|
.this_endpoint = ep1,
|
||||||
.local_dc_rack = endpoint_dc_rack::default_location,
|
.local_dc_rack = endpoint_dc_rack::default_location,
|
||||||
@@ -70,7 +68,6 @@ SEASTAR_THREAD_TEST_CASE(test_moving) {
|
|||||||
auto id1 = host_id::create_random_id();
|
auto id1 = host_id::create_random_id();
|
||||||
auto ep1 = gms::inet_address("127.0.0.1");
|
auto ep1 = gms::inet_address("127.0.0.1");
|
||||||
|
|
||||||
utils::fb_utilities::set_broadcast_address(ep1);
|
|
||||||
topology::config cfg = {
|
topology::config cfg = {
|
||||||
.this_endpoint = ep1,
|
.this_endpoint = ep1,
|
||||||
.local_dc_rack = endpoint_dc_rack::default_location,
|
.local_dc_rack = endpoint_dc_rack::default_location,
|
||||||
@@ -100,7 +97,6 @@ SEASTAR_THREAD_TEST_CASE(test_update_node) {
|
|||||||
auto ep2 = gms::inet_address("127.0.0.2");
|
auto ep2 = gms::inet_address("127.0.0.2");
|
||||||
auto ep3 = gms::inet_address("127.0.0.3");
|
auto ep3 = gms::inet_address("127.0.0.3");
|
||||||
|
|
||||||
utils::fb_utilities::set_broadcast_address(ep1);
|
|
||||||
topology::config cfg = {
|
topology::config cfg = {
|
||||||
.this_endpoint = ep1,
|
.this_endpoint = ep1,
|
||||||
.local_dc_rack = endpoint_dc_rack::default_location,
|
.local_dc_rack = endpoint_dc_rack::default_location,
|
||||||
@@ -193,7 +189,6 @@ SEASTAR_THREAD_TEST_CASE(test_remove_endpoint) {
|
|||||||
.rack = "rack2"
|
.rack = "rack2"
|
||||||
};
|
};
|
||||||
|
|
||||||
utils::fb_utilities::set_broadcast_address(ep1);
|
|
||||||
topology::config cfg = {
|
topology::config cfg = {
|
||||||
.this_endpoint = ep1,
|
.this_endpoint = ep1,
|
||||||
.local_dc_rack = dc_rack1
|
.local_dc_rack = dc_rack1
|
||||||
|
|||||||
@@ -23,7 +23,6 @@
|
|||||||
#include "locator/tablet_sharder.hh"
|
#include "locator/tablet_sharder.hh"
|
||||||
#include "locator/load_sketch.hh"
|
#include "locator/load_sketch.hh"
|
||||||
#include "locator/tablet_replication_strategy.hh"
|
#include "locator/tablet_replication_strategy.hh"
|
||||||
#include "utils/fb_utilities.hh"
|
|
||||||
#include "utils/UUID_gen.hh"
|
#include "utils/UUID_gen.hh"
|
||||||
#include "utils/error_injection.hh"
|
#include "utils/error_injection.hh"
|
||||||
|
|
||||||
@@ -434,7 +433,7 @@ SEASTAR_TEST_CASE(test_sharder) {
|
|||||||
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
|
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
|
||||||
|
|
||||||
token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1 } });
|
token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1 } });
|
||||||
tokm.get_topology().add_or_update_endpoint(utils::fb_utilities::get_broadcast_address(), h1);
|
tokm.get_topology().add_or_update_endpoint(tokm.get_topology().my_address(), h1);
|
||||||
|
|
||||||
std::vector<tablet_id> tablet_ids;
|
std::vector<tablet_id> tablet_ids;
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -8,7 +8,6 @@
|
|||||||
|
|
||||||
#include <boost/test/unit_test.hpp>
|
#include <boost/test/unit_test.hpp>
|
||||||
#include "test/lib/scylla_test_case.hh"
|
#include "test/lib/scylla_test_case.hh"
|
||||||
#include "utils/fb_utilities.hh"
|
|
||||||
#include "locator/token_metadata.hh"
|
#include "locator/token_metadata.hh"
|
||||||
#include "locator/simple_strategy.hh"
|
#include "locator/simple_strategy.hh"
|
||||||
#include "locator/everywhere_replication_strategy.hh"
|
#include "locator/everywhere_replication_strategy.hh"
|
||||||
@@ -29,6 +28,7 @@ namespace {
|
|||||||
return make_lw_shared<token_metadata>(token_metadata::config {
|
return make_lw_shared<token_metadata>(token_metadata::config {
|
||||||
topology::config {
|
topology::config {
|
||||||
.this_endpoint = this_endpoint,
|
.this_endpoint = this_endpoint,
|
||||||
|
.this_cql_address = this_endpoint,
|
||||||
.local_dc_rack = get_dc_rack(this_endpoint)
|
.local_dc_rack = get_dc_rack(this_endpoint)
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -500,6 +500,7 @@ private:
|
|||||||
|
|
||||||
locator::token_metadata::config tm_cfg;
|
locator::token_metadata::config tm_cfg;
|
||||||
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
|
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
|
||||||
|
tm_cfg.topo_cfg.this_cql_address = tm_cfg.topo_cfg.this_endpoint;
|
||||||
tm_cfg.topo_cfg.local_dc_rack = { _snitch.local()->get_datacenter(), _snitch.local()->get_rack() };
|
tm_cfg.topo_cfg.local_dc_rack = { _snitch.local()->get_datacenter(), _snitch.local()->get_rack() };
|
||||||
_token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
|
_token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
|
||||||
auto stop_token_metadata = defer([this] { _token_metadata.stop().get(); });
|
auto stop_token_metadata = defer([this] { _token_metadata.stop().get(); });
|
||||||
|
|||||||
@@ -38,7 +38,6 @@
|
|||||||
#include "gms/feature_service.hh"
|
#include "gms/feature_service.hh"
|
||||||
#include "locator/abstract_replication_strategy.hh"
|
#include "locator/abstract_replication_strategy.hh"
|
||||||
#include "tools/schema_loader.hh"
|
#include "tools/schema_loader.hh"
|
||||||
#include "utils/fb_utilities.hh"
|
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
@@ -219,8 +218,11 @@ std::vector<schema_ptr> do_load_schemas(std::string_view schema_str) {
|
|||||||
feature_service.enable(feature_service.supported_feature_set()).get();
|
feature_service.enable(feature_service.supported_feature_set()).get();
|
||||||
sharded<locator::shared_token_metadata> token_metadata;
|
sharded<locator::shared_token_metadata> token_metadata;
|
||||||
|
|
||||||
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
|
auto my_address = gms::inet_address("localhost");
|
||||||
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, locator::token_metadata::config{}).get();
|
locator::token_metadata::config tm_cfg;
|
||||||
|
tm_cfg.topo_cfg.this_endpoint = my_address;
|
||||||
|
tm_cfg.topo_cfg.this_cql_address = my_address;
|
||||||
|
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
|
||||||
auto stop_token_metadata = deferred_stop(token_metadata);
|
auto stop_token_metadata = deferred_stop(token_metadata);
|
||||||
|
|
||||||
data_dictionary_impl dd_impl;
|
data_dictionary_impl dd_impl;
|
||||||
|
|||||||
Reference in New Issue
Block a user