Merge 'gms: define and use generation and version types' from Benny Halevy

This series cleans up the generation and value types used in gms / gossiper.
Currently we use a blend of int, int32_t, and int64_t around messaging.
This change defines gms::generation_type and gms::version_type as int32_t
and add check in non-release modes that the respective int64 value passed over messaging do not overflow 32 bits.

Closes #12966

* github.com:scylladb/scylladb:
  gossiper: version_generator: add {debug_,}validate_gossip_generation
  gms: gossip_digest: use generation_type and version_type
  gms: heart_beat_state: use generation_type and version_type
  gms: versioned_value: use version_type
  gms: version_generator: define version_type and generation_type strong types
  utils: move generation-number to gms
  utils: add tagged_integer
  gms: versioned_value: make members private
  scylla-gdb: add get_gms_versioned_value
  gms: versioned_value: delete unused compare_to function
  gms: gossip_digest: delete unused compare_to function
This commit is contained in:
Botond Dénes
2023-04-24 08:44:48 +03:00
41 changed files with 416 additions and 306 deletions

View File

@@ -23,16 +23,16 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) {
fd::endpoint_state val;
val.addrs = fmt::to_string(i.first);
val.is_alive = i.second.is_alive();
val.generation = i.second.get_heart_beat_state().get_generation();
val.version = i.second.get_heart_beat_state().get_heart_beat_version();
val.generation = i.second.get_heart_beat_state().get_generation().value();
val.version = i.second.get_heart_beat_state().get_heart_beat_version().value();
val.update_time = i.second.get_update_timestamp().time_since_epoch().count();
for (auto a : i.second.get_application_state_map()) {
fd::version_value version_val;
// We return the enum index and not it's name to stay compatible to origin
// method that the state index are static but the name can be changed.
version_val.application_state = static_cast<std::underlying_type<gms::application_state>::type>(a.first);
version_val.value = a.second.value;
version_val.version = a.second.version;
version_val.value = a.second.value();
version_val.version = a.second.version().value();
val.application_state.push(version_val);
}
res.push_back(val);

View File

@@ -34,15 +34,15 @@ void set_gossiper(http_context& ctx, routes& r, gms::gossiper& g) {
httpd::gossiper_json::get_current_generation_number.set(r, [&g] (std::unique_ptr<http::request> req) {
gms::inet_address ep(req->param["addr"]);
return g.get_current_generation_number(ep).then([] (int res) {
return make_ready_future<json::json_return_type>(res);
return g.get_current_generation_number(ep).then([] (gms::generation_type res) {
return make_ready_future<json::json_return_type>(res.value());
});
});
httpd::gossiper_json::get_current_heart_beat_version.set(r, [&g] (std::unique_ptr<http::request> req) {
gms::inet_address ep(req->param["addr"]);
return g.get_current_heart_beat_version(ep).then([] (int res) {
return make_ready_future<json::json_return_type>(res);
return g.get_current_heart_beat_version(ep).then([] (gms::version_type res) {
return make_ready_future<json::json_return_type>(res.value());
});
});

View File

@@ -655,8 +655,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
ss::get_current_generation_number.set(r, [&g](std::unique_ptr<http::request> req) {
gms::inet_address ep(utils::fb_utilities::get_broadcast_address());
return g.get_current_generation_number(ep).then([](int res) {
return make_ready_future<json::json_return_type>(res);
return g.get_current_generation_number(ep).then([](gms::generation_type res) {
return make_ready_future<json::json_return_type>(res.value());
});
});

View File

@@ -35,12 +35,12 @@ extern logging::logger cdc_log;
static int get_shard_count(const gms::inet_address& endpoint, const gms::gossiper& g) {
auto ep_state = g.get_application_state_ptr(endpoint, gms::application_state::SHARD_COUNT);
return ep_state ? std::stoi(ep_state->value) : -1;
return ep_state ? std::stoi(ep_state->value()) : -1;
}
static unsigned get_sharding_ignore_msb(const gms::inet_address& endpoint, const gms::gossiper& g) {
auto ep_state = g.get_application_state_ptr(endpoint, gms::application_state::IGNORE_MSB_BITS);
return ep_state ? std::stoi(ep_state->value) : 0;
return ep_state ? std::stoi(ep_state->value()) : 0;
}
namespace db {
@@ -775,7 +775,7 @@ future<> generation_service::on_change(gms::inet_address ep, gms::application_st
return make_ready_future();
}
auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value);
auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value());
cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id);
return legacy_handle_cdc_generation(gen_id);

View File

@@ -700,7 +700,7 @@ scylla_core = (['message/messaging_service.cc',
'utils/limiting_data_source.cc',
'utils/updateable_value.cc',
'utils/directories.cc',
'utils/generation-number.cc',
'gms/generation-number.cc',
'utils/rjson.cc',
'utils/human_readable.cc',
'utils/histogram_metrics_helper.cc',
@@ -1155,6 +1155,7 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/position_in_partition.idl.hh',
'idl/experimental/broadcast_tables_lang.idl.hh',
'idl/storage_service.idl.hh',
'idl/utils.idl.hh',
]
headers = find_headers('.', excluded_dirs=['idl', 'build', 'seastar', '.git'])

View File

@@ -51,7 +51,7 @@
#include "db/view/build_progress_virtual_reader.hh"
#include "db/schema_tables.hh"
#include "index/built_indexes_virtual_reader.hh"
#include "utils/generation-number.hh"
#include "gms/generation-number.hh"
#include "db/virtual_table.hh"
#include "service/storage_service.hh"
#include "protocol_server.hh"
@@ -3120,16 +3120,16 @@ future<> system_keyspace::get_repair_history(::table_id table_id, repair_history
future<int> system_keyspace::increment_and_get_generation() {
auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL);
auto rs = co_await _qp.local().execute_internal(req, cql3::query_processor::cache_internal::yes);
int generation;
gms::generation_type generation;
if (rs->empty() || !rs->one().has("gossip_generation")) {
// seconds-since-epoch isn't a foolproof new generation
// (where foolproof is "guaranteed to be larger than the last one seen at this ip address"),
// but it's as close as sanely possible
generation = utils::get_generation_number();
generation = gms::get_generation_number();
} else {
// Other nodes will ignore gossip messages about a node that have a lower generation than previously seen.
int stored_generation = rs->one().template get_as<int>("gossip_generation") + 1;
int now = utils::get_generation_number();
auto stored_generation = gms::generation_type(rs->one().template get_as<int>("gossip_generation") + 1);
auto now = gms::get_generation_number();
if (stored_generation >= now) {
slogger.warn("Using stored Gossip Generation {} as it is greater than current system time {}."
"See CASSANDRA-3654 if you experience problems", stored_generation, now);
@@ -3139,7 +3139,7 @@ future<int> system_keyspace::increment_and_get_generation() {
}
}
req = format("INSERT INTO system.{} (key, gossip_generation) VALUES ('{}', ?)", LOCAL, LOCAL);
co_await _qp.local().execute_internal(req, {generation}, cql3::query_processor::cache_internal::yes);
co_await _qp.local().execute_internal(req, {generation.value()}, cql3::query_processor::cache_internal::yes);
co_await force_blocking_flush(LOCAL);
co_return generation;
}

View File

@@ -14,7 +14,7 @@
namespace gms {
static_assert(!std::is_default_constructible_v<heart_beat_state>);
static_assert(std::is_default_constructible_v<heart_beat_state>);
static_assert(std::is_nothrow_copy_constructible_v<heart_beat_state>);
static_assert(std::is_nothrow_move_constructible_v<heart_beat_state>);
@@ -48,7 +48,7 @@ bool endpoint_state::is_cql_ready() const noexcept {
return false;
}
try {
return boost::lexical_cast<int>(app_state->value);
return boost::lexical_cast<int>(app_state->value());
} catch (...) {
return false;
}

View File

@@ -43,7 +43,7 @@ public:
}
endpoint_state() noexcept
: _heart_beat_state(0)
: _heart_beat_state()
, _update_timestamp(clk::now())
, _is_alive(true) {
update_is_normal();
@@ -140,7 +140,7 @@ public:
if (!app_state) {
return empty;
}
const auto& value = app_state->value;
const auto& value = app_state->value();
if (value.empty()) {
return empty;
}

36
gms/generation-number.cc Normal file
View File

@@ -0,0 +1,36 @@
/*
* Copyright 2020-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <cassert>
#include <chrono>
#include <utility>
#include <exception>
#include <fmt/format.h>
#include "generation-number.hh"
namespace gms {
generation_type get_generation_number() {
using namespace std::chrono;
auto now = high_resolution_clock::now().time_since_epoch();
int generation_number = duration_cast<seconds>(now).count();
auto ret = generation_type(generation_number);
// Make sure the clock didn't overflow the 32 bits value
assert(ret.value() == generation_number);
return ret;
}
void validate_gossip_generation(int64_t generation_number) {
if (!std::in_range<gms::generation_type::value_type>(generation_number)) {
throw std::out_of_range(fmt::format("gossip generation {} is out of range", generation_number));
}
}
}

26
gms/generation-number.hh Normal file
View File

@@ -0,0 +1,26 @@
/*
* Copyright 2020-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "utils/tagged_integer.hh"
namespace gms {
using generation_type = utils::tagged_integer<struct generation_type_tag, int32_t>;
generation_type get_generation_number();
void validate_gossip_generation(int64_t generation_number);
inline void debug_validate_gossip_generation([[maybe_unused]] int64_t generation_number) {
#ifndef SCYLLA_BUILD_MODE_RELEASE
validate_gossip_generation(generation_number);
#endif
}
}

View File

@@ -13,6 +13,8 @@
#include <seastar/core/sstring.hh>
#include "utils/serialization.hh"
#include "gms/inet_address.hh"
#include "gms/generation-number.hh"
#include "gms/version_generator.hh"
namespace gms {
@@ -24,16 +26,12 @@ class gossip_digest { // implements Comparable<GossipDigest>
private:
using inet_address = gms::inet_address;
inet_address _endpoint;
int32_t _generation;
int32_t _max_version;
generation_type _generation;
version_type _max_version;
public:
gossip_digest()
: _endpoint(0)
, _generation(0)
, _max_version(0) {
}
gossip_digest() = default;
gossip_digest(inet_address ep, int32_t gen, int32_t version)
explicit gossip_digest(inet_address ep, generation_type gen = {}, version_type version = {}) noexcept
: _endpoint(ep)
, _generation(gen)
, _max_version(version) {
@@ -43,21 +41,14 @@ public:
return _endpoint;
}
int32_t get_generation() const {
generation_type get_generation() const {
return _generation;
}
int32_t get_max_version() const {
version_type get_max_version() const {
return _max_version;
}
int32_t compare_to(gossip_digest d) const {
if (_generation != d.get_generation()) {
return (_generation - d.get_generation());
}
return (_max_version - d.get_max_version());
}
friend bool operator<(const gossip_digest& x, const gossip_digest& y) {
if (x._generation != y._generation) {
return x._generation < y._generation;

View File

@@ -38,7 +38,7 @@
#include <boost/range/algorithm/partition.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include "utils/generation-number.hh"
#include "gms/generation-number.hh"
#include "locator/token_metadata.hh"
#include "utils/exceptions.hh"
@@ -50,7 +50,7 @@ static logging::logger logger("gossip");
constexpr std::chrono::milliseconds gossiper::INTERVAL;
constexpr std::chrono::hours gossiper::A_VERY_LONG_TIME;
constexpr int64_t gossiper::MAX_GENERATION_DIFFERENCE;
constexpr generation_type::value_type gossiper::MAX_GENERATION_DIFFERENCE;
netw::msg_addr gossiper::get_msg_addr(inet_address to) const noexcept {
return msg_addr{to, _default_cpuid};
@@ -116,7 +116,7 @@ gossiper::gossiper(abort_source& as, feature_service& features, const locator::s
[ep, this] {
auto es = get_endpoint_state_for_endpoint_ptr(ep);
if (es) {
return es->get_heart_beat_state().get_heart_beat_version();
return es->get_heart_beat_state().get_heart_beat_version().value();
} else {
return 0;
}
@@ -154,9 +154,9 @@ void gossiper::do_sort(utils::chunked_vector<gossip_digest>& g_digest_list) {
for (auto g_digest : g_digest_list) {
auto ep = g_digest.get_endpoint();
auto* ep_state = this->get_endpoint_state_for_endpoint_ptr(ep);
int version = ep_state ? this->get_max_endpoint_state_version(*ep_state) : 0;
int diff_version = ::abs(version - g_digest.get_max_version());
diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), diff_version));
version_type version = ep_state ? this->get_max_endpoint_state_version(*ep_state) : version_type();
int32_t diff_version = ::abs(version - g_digest.get_max_version());
diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), version_type(diff_version)));
}
g_digest_list.clear();
@@ -363,7 +363,7 @@ future<> gossiper::do_send_ack2_msg(msg_addr from, utils::chunked_vector<gossip_
std::map<inet_address, endpoint_state> delta_ep_state_map;
for (auto g_digest : ack_msg_digest) {
inet_address addr = g_digest.get_endpoint();
auto local_ep_state_ptr = this->get_state_for_version_bigger_than(addr, g_digest.get_max_version());
auto local_ep_state_ptr = this->get_state_for_version_bigger_than(addr, version_type(g_digest.get_max_version()));
if (local_ep_state_ptr) {
delta_ep_state_map.emplace(addr, *local_ep_state_ptr);
}
@@ -415,9 +415,9 @@ future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional<int64_t
} else {
auto es = get_endpoint_state_for_endpoint_ptr(from);
if (es) {
int64_t saved_generation_number = it->second;
int64_t current_generation_number = generation_number_opt ?
generation_number_opt.value() : es->get_heart_beat_state().get_generation();
auto saved_generation_number = it->second;
auto current_generation_number = generation_number_opt ?
generation_type(generation_number_opt.value()) : es->get_heart_beat_state().get_generation();
respond = saved_generation_number == current_generation_number;
logger.debug("handle_echo_msg: from={}, saved_generation_number={}, current_generation_number={}",
from, saved_generation_number, current_generation_number);
@@ -441,12 +441,13 @@ future<> gossiper::handle_shutdown_msg(inet_address from, std::optional<int64_t>
auto permit = co_await this->lock_endpoint(from);
if (generation_number_opt) {
debug_validate_gossip_generation(*generation_number_opt);
auto es = this->get_endpoint_state_for_endpoint_ptr(from);
if (es) {
int local_generation = es->get_heart_beat_state().get_generation();
auto local_generation = es->get_heart_beat_state().get_generation();
logger.info("Got shutdown message from {}, received_generation={}, local_generation={}",
from, generation_number_opt.value(), local_generation);
if (local_generation != generation_number_opt.value()) {
if (local_generation.value() != generation_number_opt.value()) {
logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation={}",
from, generation_number_opt.value(), local_generation);
co_return;
@@ -564,10 +565,10 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
auto es = this->get_endpoint_state_for_endpoint_ptr(node);
if (es) {
endpoint_state& local_state = *es;
int local_generation = local_state.get_heart_beat_state().get_generation();
int remote_generation = remote_state.get_heart_beat_state().get_generation();
auto local_generation = local_state.get_heart_beat_state().get_generation();
auto remote_generation = remote_state.get_heart_beat_state().get_generation();
logger.trace("{} local generation {}, remote generation {}", node, local_generation, remote_generation);
if (remote_generation > utils::get_generation_number() + MAX_GENERATION_DIFFERENCE) {
if (remote_generation > generation_type(get_generation_number().value() + MAX_GENERATION_DIFFERENCE)) {
// assume some peer has corrupted memory and is broadcasting an unbelievable generation about another peer (or itself)
logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}",
node, local_generation, remote_generation);
@@ -583,8 +584,8 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
} else if (remote_generation == local_generation) {
if (listener_notification) {
// find maximum state
int local_max_version = this->get_max_endpoint_state_version(local_state);
int remote_max_version = this->get_max_endpoint_state_version(remote_state);
auto local_max_version = this->get_max_endpoint_state_version(local_state);
auto remote_max_version = this->get_max_endpoint_state_version(remote_state);
if (remote_max_version > local_max_version) {
// apply states, but do not notify since there is no major change
co_await this->apply_new_states(node, local_state, remote_state);
@@ -599,7 +600,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
const auto& remote_key = item.first;
const auto& remote_value = item.second;
const versioned_value* local_value = local_state.get_application_state_ptr(remote_key);
if (!local_value || remote_value.version > local_value->version) {
if (!local_value || remote_value.version() > local_value->version()) {
logger.debug("Applying remote_state for node {} (remote generation = local generation), key={}, value={}",
node, remote_key, remote_value);
local_state.add_application_state(remote_key, remote_value);
@@ -765,7 +766,7 @@ future<std::set<inet_address>> gossiper::get_live_members_synchronized() {
});
}
future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, int64_t gossip_generation, uint64_t live_endpoints_version) {
future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, generation_type gossip_generation, uint64_t live_endpoints_version) {
auto last = gossiper::clk::now();
auto diff = gossiper::clk::duration(0);
auto echo_interval = std::chrono::milliseconds(2000);
@@ -774,7 +775,7 @@ future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, int64_
bool failed = false;
try {
logger.debug("failure_detector_loop: Send echo to node {}, status = started", node);
co_await _messaging.send_gossip_echo(netw::msg_addr(node), gossip_generation, max_duration);
co_await _messaging.send_gossip_echo(netw::msg_addr(node), gossip_generation.value(), max_duration);
logger.debug("failure_detector_loop: Send echo to node {}, status = ok", node);
} catch (...) {
failed = true;
@@ -1092,11 +1093,11 @@ std::set<inet_address> gossiper::get_unreachable_members() const {
return ret;
}
int gossiper::get_max_endpoint_state_version(endpoint_state state) const noexcept {
int max_version = state.get_heart_beat_state().get_heart_beat_version();
version_type gossiper::get_max_endpoint_state_version(endpoint_state state) const noexcept {
auto max_version = state.get_heart_beat_state().get_heart_beat_version();
for (auto& entry : state.get_application_state_map()) {
auto& value = entry.second;
max_version = std::max(max_version, value.version);
max_version = std::max(max_version, value.version());
}
return max_version;
}
@@ -1121,8 +1122,8 @@ void gossiper::quarantine_endpoint(inet_address endpoint, clk::time_point quaran
}
void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g_digests) {
int generation = 0;
int max_version = 0;
generation_type generation;
version_type max_version;
// local epstate will be part of _endpoint_state_map
utils::chunked_vector<inet_address> endpoints;
@@ -1181,7 +1182,7 @@ future<> gossiper::replicate(inet_address ep, application_state key, const versi
future<> gossiper::advertise_removing(inet_address endpoint, locator::host_id host_id, locator::host_id local_host_id) {
auto& state = get_endpoint_state(endpoint);
// remember this node's generation
int generation = state.get_heart_beat_state().get_generation();
auto generation = state.get_heart_beat_state().get_generation();
logger.info("Removing host: {}", host_id);
auto ring_delay = std::chrono::milliseconds(_gcfg.ring_delay_ms);
logger.info("Sleeping for {}ms to ensure {} does not change", ring_delay.count(), endpoint);
@@ -1228,8 +1229,8 @@ future<> gossiper::assassinate_endpoint(sstring address) {
auto permit = gossiper.lock_endpoint(endpoint).get0();
auto es = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
auto now = gossiper.now();
int gen = std::chrono::duration_cast<std::chrono::seconds>((now + std::chrono::seconds(60)).time_since_epoch()).count();
int ver = 9999;
generation_type gen(std::chrono::duration_cast<std::chrono::seconds>((now + std::chrono::seconds(60)).time_since_epoch()).count());
version_type ver(9999);
endpoint_state ep_state = es ? *es : endpoint_state(heart_beat_state(gen, ver));
std::vector<dht::token> tokens;
logger.warn("Assassinating {} via gossip", endpoint);
@@ -1240,8 +1241,8 @@ future<> gossiper::assassinate_endpoint(sstring address) {
throw std::runtime_error(format("Unable to calculate tokens for {}", endpoint));
}
int generation = ep_state.get_heart_beat_state().get_generation();
int heartbeat = ep_state.get_heart_beat_state().get_heart_beat_version();
auto generation = ep_state.get_heart_beat_state().get_generation();
auto heartbeat = ep_state.get_heart_beat_state().get_heart_beat_version();
auto ring_delay = std::chrono::milliseconds(gossiper._gcfg.ring_delay_ms);
logger.info("Sleeping for {} ms to ensure {} does not change", ring_delay.count(), endpoint);
// make sure it did not change
@@ -1273,13 +1274,13 @@ future<> gossiper::assassinate_endpoint(sstring address) {
});
}
future<int> gossiper::get_current_generation_number(inet_address endpoint) {
future<generation_type> gossiper::get_current_generation_number(inet_address endpoint) {
return container().invoke_on(0, [endpoint] (auto&& gossiper) {
return gossiper.get_endpoint_state(endpoint).get_heart_beat_state().get_generation();
});
}
future<int> gossiper::get_current_heart_beat_version(inet_address endpoint) {
future<version_type> gossiper::get_current_heart_beat_version(inet_address endpoint) {
return container().invoke_on(0, [endpoint] (auto&& gossiper) {
return gossiper.get_endpoint_state(endpoint).get_heart_beat_state().get_heart_beat_version();
});
@@ -1403,7 +1404,7 @@ locator::host_id gossiper::get_host_id(inet_address endpoint) const {
if (!app_state) {
throw std::runtime_error(format("Host {} does not have HOST_ID application_state", endpoint));
}
return locator::host_id(utils::UUID(app_state->value));
return locator::host_id(utils::UUID(app_state->value()));
}
std::set<gms::inet_address> gossiper::get_nodes_with_host_id(locator::host_id host_id) const {
@@ -1411,14 +1412,14 @@ std::set<gms::inet_address> gossiper::get_nodes_with_host_id(locator::host_id ho
for (auto& x : get_endpoint_states()) {
auto node = x.first;
auto app_state = get_application_state_ptr(node, application_state::HOST_ID);
if (app_state && host_id == locator::host_id(utils::UUID(app_state->value))) {
if (app_state && host_id == locator::host_id(utils::UUID(app_state->value()))) {
nodes.insert(node);
}
}
return nodes;
}
std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, int version) {
std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, version_type version) {
std::optional<endpoint_state> reqd_endpoint_state;
auto es = get_endpoint_state_for_endpoint_ptr(for_endpoint);
if (es) {
@@ -1431,7 +1432,7 @@ std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_a
* than the version passed in. In this case we also send the old
* heart beat and throw it away on the receiver if it is redundant.
*/
int local_hb_version = eps.get_heart_beat_state().get_heart_beat_version();
auto local_hb_version = eps.get_heart_beat_state().get_heart_beat_version();
if (local_hb_version > version) {
reqd_endpoint_state.emplace(eps.get_heart_beat_state());
logger.trace("local heartbeat version {} greater than {} for {}", local_hb_version, version, for_endpoint);
@@ -1439,12 +1440,12 @@ std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_a
/* Accumulate all application states whose versions are greater than "version" variable */
for (auto& entry : eps.get_application_state_map()) {
auto& value = entry.second;
if (value.version > version) {
if (value.version() > version) {
if (!reqd_endpoint_state) {
reqd_endpoint_state.emplace(eps.get_heart_beat_state());
}
auto& key = entry.first;
logger.trace("Adding state of {}, {}: {}" , for_endpoint, key, value.value);
logger.trace("Adding state of {}, {}: {}" , for_endpoint, key, value.value());
reqd_endpoint_state->add_application_state(key, value);
}
}
@@ -1452,7 +1453,7 @@ std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_a
return reqd_endpoint_state;
}
int gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) {
generation_type::value_type gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) {
auto* ep1 = get_endpoint_state_for_endpoint_ptr(addr1);
auto* ep2 = get_endpoint_state_for_endpoint_ptr(addr2);
if (!ep1 || !ep2) {
@@ -1467,7 +1468,7 @@ sstring gossiper::get_rpc_address(const inet_address& endpoint) const {
if (endpoint != get_broadcast_address()) {
auto* v = get_application_state_ptr(endpoint, gms::application_state::RPC_ADDRESS);
if (v) {
return v->value;
return v->value();
}
}
return fmt::to_string(endpoint);
@@ -1480,13 +1481,13 @@ void gossiper::update_timestamp_for_nodes(const std::map<inet_address, endpoint_
auto* local_endpoint_state = get_endpoint_state_for_endpoint_ptr(endpoint);
if (local_endpoint_state) {
bool update = false;
int local_generation = local_endpoint_state->get_heart_beat_state().get_generation();
int remote_generation = remote_endpoint_state.get_heart_beat_state().get_generation();
auto local_generation = local_endpoint_state->get_heart_beat_state().get_generation();
auto remote_generation = remote_endpoint_state.get_heart_beat_state().get_generation();
if (remote_generation > local_generation) {
update = true;
} else if (remote_generation == local_generation) {
int local_version = get_max_endpoint_state_version(*local_endpoint_state);
int remote_version = remote_endpoint_state.get_heart_beat_state().get_heart_beat_version();
auto local_version = get_max_endpoint_state_version(*local_endpoint_state);
auto remote_version = remote_endpoint_state.get_heart_beat_state().get_heart_beat_version();
if (remote_version > local_version) {
update = true;
}
@@ -1516,10 +1517,10 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) {
local_state.mark_dead();
msg_addr id = get_msg_addr(addr);
int64_t generation = _endpoint_state_map[get_broadcast_address()].get_heart_beat_state().get_generation();
auto generation = _endpoint_state_map[get_broadcast_address()].get_heart_beat_state().get_generation();
logger.debug("Sending a EchoMessage to {}, with generation_number={}", id, generation);
// Do it in the background.
(void)_messaging.send_gossip_echo(id, generation, std::chrono::milliseconds(15000)).then([this, addr] {
(void)_messaging.send_gossip_echo(id, generation.value(), std::chrono::milliseconds(15000)).then([this, addr] {
logger.trace("Got EchoMessage Reply");
// After sending echo message, the Node might not be in the
// _endpoint_state_map anymore, use the reference of local_state
@@ -1718,7 +1719,7 @@ future<> gossiper::apply_new_states(inet_address addr, endpoint_state& local_sta
}
const versioned_value* local_val = local_state.get_application_state_ptr(remote_key);
if (!local_val || remote_value.version > local_val->version) {
if (!local_val || remote_value.version() > local_val->version()) {
changed.push_back(remote_key);
local_state.add_application_state(remote_key, remote_value);
}
@@ -1757,15 +1758,15 @@ future<> gossiper::do_on_change_notifications(inet_address addr, const applicati
}
void gossiper::request_all(gossip_digest& g_digest,
utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, int remote_generation) {
utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, generation_type remote_generation) {
/* We are here since we have no data for this endpoint locally so request everthing. */
delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation, 0);
delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation);
logger.trace("request_all for {}", g_digest.get_endpoint());
}
void gossiper::send_all(gossip_digest& g_digest,
std::map<inet_address, endpoint_state>& delta_ep_state_map,
int max_remote_version) {
version_type max_remote_version) {
auto ep = g_digest.get_endpoint();
logger.trace("send_all(): ep={}, version > {}", ep, max_remote_version);
auto local_ep_state_ptr = get_state_for_version_bigger_than(ep, max_remote_version);
@@ -1785,12 +1786,12 @@ void gossiper::examine_gossiper(utils::chunked_vector<gossip_digest>& g_digest_l
*/
logger.debug("Shadow request received, adding all states");
for (auto& entry : _endpoint_state_map) {
g_digest_list.emplace_back(entry.first, 0, 0);
g_digest_list.emplace_back(entry.first);
}
}
for (gossip_digest& g_digest : g_digest_list) {
int remote_generation = g_digest.get_generation();
int max_remote_version = g_digest.get_max_version();
auto remote_generation = g_digest.get_generation();
auto max_remote_version = g_digest.get_max_version();
/* Get state associated with the end point in digest */
auto&& ep = g_digest.get_endpoint();
auto es = get_endpoint_state_for_endpoint_ptr(ep);
@@ -1801,9 +1802,9 @@ void gossiper::examine_gossiper(utils::chunked_vector<gossip_digest>& g_digest_l
*/
if (es) {
endpoint_state& ep_state_ptr = *es;
int local_generation = ep_state_ptr.get_heart_beat_state().get_generation();
auto local_generation = ep_state_ptr.get_heart_beat_state().get_generation();
/* get the max version of all keys in the state associated with this endpoint */
int max_local_version = get_max_endpoint_state_version(ep_state_ptr);
auto max_local_version = get_max_endpoint_state_version(ep_state_ptr);
logger.trace("examine_gossiper(): ep={}, remote={}.{}, local={}.{}", ep,
remote_generation, max_remote_version, local_generation, max_local_version);
if (remote_generation == local_generation && max_remote_version == max_local_version) {
@@ -1815,7 +1816,7 @@ void gossiper::examine_gossiper(utils::chunked_vector<gossip_digest>& g_digest_l
request_all(g_digest, delta_gossip_digest_list, remote_generation);
} else if (remote_generation < local_generation) {
/* send all data with generation = localgeneration and version > 0 */
send_all(g_digest, delta_ep_state_map, 0);
send_all(g_digest, delta_ep_state_map, version_type());
} else if (remote_generation == local_generation) {
/*
* If the max remote version is greater then we request the
@@ -1842,7 +1843,7 @@ void gossiper::examine_gossiper(utils::chunked_vector<gossip_digest>& g_digest_l
}
}
future<> gossiper::start_gossiping(int generation_nbr, std::map<application_state, versioned_value> preload_local_states, gms::advertise_myself advertise) {
future<> gossiper::start_gossiping(gms::generation_type generation_nbr, std::map<application_state, versioned_value> preload_local_states, gms::advertise_myself advertise) {
co_await container().invoke_on_all([advertise] (gossiper& g) {
if (!advertise) {
g._advertise_myself = false;
@@ -1851,7 +1852,7 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
build_seeds_list();
if (_force_gossip_generation() > 0) {
generation_nbr = _force_gossip_generation();
generation_nbr = gms::generation_type(_force_gossip_generation());
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
}
endpoint_state& local_state = _endpoint_state_map[get_broadcast_address()];
@@ -1882,24 +1883,24 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
});
}
future<std::unordered_map<gms::inet_address, int32_t>>
future<gossiper::generation_for_nodes>
gossiper::get_generation_for_nodes(std::unordered_set<gms::inet_address> nodes) {
std::unordered_map<gms::inet_address, int32_t> ret;
generation_for_nodes ret;
for (const auto& node : nodes) {
auto es = get_endpoint_state_for_endpoint_ptr(node);
if (es) {
auto current_generation_number = es->get_heart_beat_state().get_generation();
ret.emplace(node, current_generation_number);
} else {
return make_exception_future<std::unordered_map<gms::inet_address, int32_t>>(
return make_exception_future<generation_for_nodes>(
std::runtime_error(format("Can not find generation number for node={}", node)));
}
}
return make_ready_future<std::unordered_map<gms::inet_address, int32_t>>(std::move(ret));
return make_ready_future<generation_for_nodes>(std::move(ret));
}
future<> gossiper::advertise_to_nodes(std::unordered_map<gms::inet_address, int32_t> advertise_to_nodes) {
return container().invoke_on_all([advertise_to_nodes] (auto& g) {
future<> gossiper::advertise_to_nodes(generation_for_nodes advertise_to_nodes) {
return container().invoke_on_all([advertise_to_nodes = std::move(advertise_to_nodes)] (auto& g) {
g._advertise_to_nodes = advertise_to_nodes;
g._advertise_myself = true;
});
@@ -2006,12 +2007,12 @@ future<> gossiper::add_saved_endpoint(inet_address ep) {
}
//preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on)
auto ep_state = endpoint_state(heart_beat_state(0));
auto ep_state = endpoint_state();
auto es = get_endpoint_state_for_endpoint_ptr(ep);
if (es) {
ep_state = *es;
logger.debug("not replacing a previous ep_state for {}, but reusing it: {}", ep, ep_state);
ep_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(0));
ep_state.set_heart_beat_state_and_update_timestamp(heart_beat_state());
}
const auto tmptr = get_token_metadata_ptr();
auto tokens = tmptr->get_tokens(ep);
@@ -2126,14 +2127,14 @@ future<> gossiper::do_stop_gossiping() {
logger.info("My status = {}", get_gossip_status(*my_ep_state));
}
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
int local_generation = my_ep_state->get_heart_beat_state().get_generation();
auto local_generation = my_ep_state->get_heart_beat_state().get_generation();
logger.info("Announcing shutdown");
add_local_application_state(application_state::STATUS, versioned_value::shutdown(true)).get();
auto live_endpoints = _live_endpoints;
for (inet_address addr : live_endpoints) {
msg_addr id = get_msg_addr(addr);
logger.info("Sending a GossipShutdown to {} with generation {}", id.addr, local_generation);
_messaging.send_gossip_shutdown(id, get_broadcast_address(), local_generation).then_wrapped([id] (auto&&f) {
_messaging.send_gossip_shutdown(id, get_broadcast_address(), local_generation.value()).then_wrapped([id] (auto&&f) {
try {
f.get();
logger.trace("Got GossipShutdown Reply");
@@ -2275,7 +2276,7 @@ sstring gossiper::get_application_state_value(inet_address endpoint, application
if (!v) {
return {};
}
return v->value;
return v->value();
}
/**
@@ -2305,7 +2306,7 @@ static std::string_view do_get_gossip_status(const gms::versioned_value* app_sta
if (!app_state) {
return gms::versioned_value::STATUS_UNKNOWN;
}
const auto& value = app_state->value;
const auto& value = app_state->value();
auto pos = value.find(',');
if (!value.size() || !pos) {
return gms::versioned_value::STATUS_UNKNOWN;
@@ -2438,7 +2439,7 @@ std::set<sstring> gossiper::get_supported_features(inet_address endpoint) const
if (!app_state) {
return {};
}
return feature_service::to_feature_set(app_state->value);
return feature_service::to_feature_set(app_state->value());
}
std::set<sstring> gossiper::get_supported_features(const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, ignore_features_of_local_node ignore_local_node) const {
@@ -2512,8 +2513,8 @@ void gossiper::check_snitch_name_matches(sstring local_snitch_name) const {
continue;
}
if (remote_snitch_name->value != local_snitch_name) {
throw std::runtime_error(format("Snitch check failed. This node cannot join the cluster because it uses {} and not {}", local_snitch_name, remote_snitch_name->value));
if (remote_snitch_name->value() != local_snitch_name) {
throw std::runtime_error(format("Snitch check failed. This node cannot join the cluster because it uses {} and not {}", local_snitch_name, remote_snitch_name->value()));
}
}
}
@@ -2535,11 +2536,11 @@ void gossiper::append_endpoint_state(std::stringstream& ss, const endpoint_state
if (app_state == application_state::TOKENS) {
continue;
}
ss << " " << app_state << ":" << versioned_val.version << ":" << versioned_val.value << "\n";
ss << " " << app_state << ":" << versioned_val.version() << ":" << versioned_val.value() << "\n";
}
const auto& app_state_map = state.get_application_state_map();
if (app_state_map.contains(application_state::TOKENS)) {
ss << " TOKENS:" << app_state_map.at(application_state::TOKENS).version << ":<hidden>\n";
ss << " TOKENS:" << app_state_map.at(application_state::TOKENS).version() << ":<hidden>\n";
} else {
ss << " TOKENS: not present" << "\n";
}

View File

@@ -19,6 +19,7 @@
#include "utils/atomic_vector.hh"
#include "utils/UUID.hh"
#include "utils/fb_utilities.hh"
#include "gms/generation-number.hh"
#include "gms/versioned_value.hh"
#include "gms/application_state.hh"
#include "gms/endpoint_state.hh"
@@ -96,6 +97,7 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
public:
using clk = seastar::lowres_system_clock;
using ignore_features_of_local_node = bool_class<class ignore_features_of_local_node_tag>;
using generation_for_nodes = std::unordered_map<gms::inet_address, generation_type>;
private:
using messaging_verb = netw::messaging_verb;
using messaging_service = netw::messaging_service;
@@ -123,17 +125,17 @@ private:
std::unordered_map<gms::inet_address, ack_msg_pending> _ack_handlers;
bool _advertise_myself = true;
// Map ip address and generation number
std::unordered_map<gms::inet_address, int32_t> _advertise_to_nodes;
generation_for_nodes _advertise_to_nodes;
future<> _failure_detector_loop_done{make_ready_future<>()} ;
rpc::no_wait_type background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn);
public:
// Get current generation number for the given nodes
future<std::unordered_map<gms::inet_address, int32_t>>
future<generation_for_nodes>
get_generation_for_nodes(std::unordered_set<gms::inet_address> nodes);
// Only respond echo message listed in nodes with the generation number
future<> advertise_to_nodes(std::unordered_map<gms::inet_address, int32_t> advertise_to_nodes = {});
future<> advertise_to_nodes(generation_for_nodes advertise_to_nodes = {});
const sstring& get_cluster_name() const noexcept;
const sstring& get_partitioner_name() const noexcept {
@@ -183,7 +185,7 @@ public:
// Maximimum difference between remote generation value and generation
// value this node would get if this node were restarted that we are
// willing to accept about a peer.
static constexpr int64_t MAX_GENERATION_DIFFERENCE = 86400 * 365;
static constexpr generation_type::value_type MAX_GENERATION_DIFFERENCE = 86400 * 365;
std::chrono::milliseconds fat_client_timeout;
std::chrono::milliseconds quarantine_delay() const noexcept;
@@ -283,7 +285,7 @@ public:
* @param ep_state
* @return
*/
int get_max_endpoint_state_version(endpoint_state state) const noexcept;
version_type get_max_endpoint_state_version(endpoint_state state) const noexcept;
private:
@@ -357,8 +359,8 @@ public:
future<> assassinate_endpoint(sstring address);
public:
future<int> get_current_generation_number(inet_address endpoint);
future<int> get_current_heart_beat_version(inet_address endpoint);
future<generation_type> get_current_generation_number(inet_address endpoint);
future<version_type> get_current_heart_beat_version(inet_address endpoint);
bool is_gossip_only_member(inet_address endpoint);
bool is_safe_for_bootstrap(inet_address endpoint);
@@ -403,12 +405,12 @@ public:
std::set<gms::inet_address> get_nodes_with_host_id(locator::host_id host_id) const;
std::optional<endpoint_state> get_state_for_version_bigger_than(inet_address for_endpoint, int version);
std::optional<endpoint_state> get_state_for_version_bigger_than(inet_address for_endpoint, version_type version);
/**
* determine which endpoint started up earlier
*/
int compare_endpoint_startup(inet_address addr1, inet_address addr2);
generation_type::value_type compare_endpoint_startup(inet_address addr1, inet_address addr2);
/**
* Return the rpc address associated with an endpoint as a string.
@@ -457,10 +459,10 @@ private:
future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value);
/* Request all the state for the endpoint in the g_digest */
void request_all(gossip_digest& g_digest, utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, int remote_generation);
void request_all(gossip_digest& g_digest, utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, generation_type remote_generation);
/* Send all the data with version greater than max_remote_version */
void send_all(gossip_digest& g_digest, std::map<inet_address, endpoint_state>& delta_ep_state_map, int max_remote_version);
void send_all(gossip_digest& g_digest, std::map<inet_address, endpoint_state>& delta_ep_state_map, version_type max_remote_version);
public:
/*
@@ -509,7 +511,7 @@ public:
* existing nodes can talk to the replacing node. So the probability of
* replacing node being talked to is pretty high.
*/
future<> start_gossiping(int generation_nbr, std::map<application_state, versioned_value> preload_local_states = {},
future<> start_gossiping(gms::generation_type generation_nbr, std::map<application_state, versioned_value> preload_local_states = {},
gms::advertise_myself advertise = gms::advertise_myself::yes);
public:
@@ -615,7 +617,7 @@ public:
int get_up_endpoint_count() const noexcept;
private:
future<> failure_detector_loop();
future<> failure_detector_loop_for_node(gms::inet_address node, int64_t gossip_generation, uint64_t live_endpoints_version);
future<> failure_detector_loop_for_node(gms::inet_address node, generation_type gossip_generation, uint64_t live_endpoints_version);
future<> update_live_endpoints_version();
};

View File

@@ -10,6 +10,7 @@
#pragma once
#include "gms/generation-number.hh"
#include "gms/version_generator.hh"
#include "utils/serialization.hh"
#include <ostream>
@@ -21,24 +22,26 @@ namespace gms {
*/
class heart_beat_state {
private:
int32_t _generation;
int32_t _version;
generation_type _generation;
version_type _version;
public:
bool operator==(const heart_beat_state& other) const noexcept {
return _generation == other._generation && _version == other._version;
}
heart_beat_state(int32_t gen) noexcept
heart_beat_state() noexcept : heart_beat_state(generation_type(0)) {}
explicit heart_beat_state(generation_type gen) noexcept
: _generation(gen)
, _version(0) {
{
}
heart_beat_state(int32_t gen, int32_t ver) noexcept
heart_beat_state(generation_type gen, version_type ver) noexcept
: _generation(gen)
, _version(ver) {
}
int32_t get_generation() const noexcept {
generation_type get_generation() const noexcept {
return _generation;
}
@@ -46,16 +49,16 @@ public:
_version = version_generator::get_next_version();
}
int32_t get_heart_beat_version() const noexcept {
version_type get_heart_beat_version() const noexcept {
return _version;
}
void force_newer_generation_unsafe() noexcept {
_generation += 1;
++_generation;
}
void force_highest_possible_version_unsafe() noexcept {
_version = std::numeric_limits<int32_t>::max();
_version = std::numeric_limits<version_type>::max();
}
friend inline std::ostream& operator<<(std::ostream& os, const heart_beat_state& h) {

View File

@@ -14,9 +14,9 @@ namespace gms {
namespace version_generator {
// In the original Cassandra code, version was an AtomicInteger.
// For us, we run the gossiper on a single CPU, and don't need to use atomics.
static int version = 0;
static version_type version;
int get_next_version() noexcept
version_type get_next_version() noexcept
{
return ++version;
}

View File

@@ -10,8 +10,12 @@
#pragma once
#include "utils/tagged_integer.hh"
namespace gms {
using version_type = utils::tagged_integer<struct version_type_tag, int32_t>;
/**
* A unique version number generator for any state that is generated by the
* local node.
@@ -19,7 +23,7 @@ namespace gms {
namespace version_generator
{
int get_next_version() noexcept;
version_type get_next_version() noexcept;
}
} // namespace gms

View File

@@ -36,6 +36,8 @@ namespace gms {
*/
class versioned_value {
version_type _version;
sstring _value;
public:
// this must be a char that cannot be present in any token
static constexpr char DELIMITER = ',';
@@ -58,17 +60,17 @@ public:
// values for ApplicationState.REMOVAL_COORDINATOR
static constexpr const char* REMOVAL_COORDINATOR = "REMOVER";
int version;
sstring value;
version_type version() const noexcept { return _version; };
const sstring& value() const noexcept { return _value; };
public:
bool operator==(const versioned_value& other) const noexcept {
return version == other.version &&
value == other.value;
return _version == other._version &&
_value == other._value;
}
public:
versioned_value(const sstring& value, int version = version_generator::get_next_version())
: version(version), value(value) {
versioned_value(const sstring& value, version_type version = version_generator::get_next_version())
: _version(version), _value(value) {
#if 0
// blindly interning everything is somewhat suboptimal -- lots of VersionedValues are unique --
// but harmless, and interning the non-unique ones saves significant memory. (Unfortunately,
@@ -78,20 +80,16 @@ public:
#endif
}
versioned_value(sstring&& value, int version = version_generator::get_next_version()) noexcept
: version(version), value(std::move(value)) {
versioned_value(sstring&& value, version_type version = version_generator::get_next_version()) noexcept
: _version(version), _value(std::move(value)) {
}
versioned_value() noexcept
: version(-1) {
}
int compare_to(const versioned_value &value) const noexcept {
return version - value.version;
: _version(-1) {
}
friend inline std::ostream& operator<<(std::ostream& os, const versioned_value& x) {
return os << "Value(" << x.value << "," << x.version << ")";
return os << "Value(" << x.value() << "," << x.version() << ")";
}
static sstring version_string(const std::initializer_list<sstring>& args) {
@@ -109,7 +107,7 @@ public:
static std::optional<cdc::generation_id> cdc_generation_id_from_string(const sstring&);
static versioned_value clone_with_higher_version(const versioned_value& value) noexcept {
return versioned_value(value.value);
return versioned_value(value.value());
}
static versioned_value bootstrapping(const std::unordered_set<dht::token>& tokens) {

View File

@@ -59,7 +59,9 @@ set(idl_headers
replica_exception.idl.hh
per_partition_rate_limit_info.idl.hh
position_in_partition.idl.hh
experimental/broadcast_tables_lang.idl.hh)
experimental/broadcast_tables_lang.idl.hh
utils.idl.hh
)
foreach(idl_header ${idl_headers})
compile_idl(${idl_header}

View File

@@ -7,6 +7,7 @@
*/
#include "gms/inet_address_serializer.hh"
#include "gms/version_generator.hh"
namespace gms {
enum class application_state:int {
@@ -28,13 +29,13 @@ enum class application_state:int {
};
class versioned_value {
sstring value;
int version;
sstring value();
gms::version_type version();
};
class heart_beat_state {
int32_t get_generation();
int32_t get_heart_beat_version();
gms::generation_type get_generation();
gms::version_type get_heart_beat_version();
};
class endpoint_state {
@@ -44,8 +45,8 @@ class endpoint_state {
class gossip_digest {
gms::inet_address get_endpoint();
int32_t get_generation();
int32_t get_max_version();
gms::generation_type get_generation();
gms::version_type get_max_version();
};
class gossip_digest_syn {

View File

@@ -6,6 +6,10 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "raft/raft.hh"
#include "idl/utils.idl.hh"
namespace raft {
struct snapshot_descriptor {

View File

@@ -9,6 +9,7 @@
#include "raft/raft.hh"
#include "idl/uuid.idl.hh"
#include "idl/utils.idl.hh"
namespace raft {
@@ -19,11 +20,6 @@ struct tagged_id {
utils::UUID id;
};
template<typename Tag>
struct tagged_uint64 {
uint64_t get_value();
};
} // namespace internal
struct server_address {

18
idl/utils.idl.hh Normal file
View File

@@ -0,0 +1,18 @@
/*
* Copyright 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "utils/tagged_integer.hh"
namespace utils {
template<typename Tag, typename ValueType>
struct tagged_integer final {
ValueType value();
};
} // namespace utils

View File

@@ -1066,9 +1066,11 @@ future<> messaging_service::unregister_gossip_echo() {
return unregister_handler(netw::messaging_verb::GOSSIP_ECHO);
}
future<> messaging_service::send_gossip_echo(msg_addr id, int64_t generation_number, std::chrono::milliseconds timeout) {
gms::debug_validate_gossip_generation(generation_number);
return send_message_timeout<void>(this, messaging_verb::GOSSIP_ECHO, std::move(id), timeout, generation_number);
}
future<> messaging_service::send_gossip_echo(msg_addr id, int64_t generation_number, abort_source& as) {
gms::debug_validate_gossip_generation(generation_number);
return send_message_cancellable<void>(this, messaging_verb::GOSSIP_ECHO, std::move(id), as, generation_number);
}
@@ -1079,6 +1081,7 @@ future<> messaging_service::unregister_gossip_shutdown() {
return unregister_handler(netw::messaging_verb::GOSSIP_SHUTDOWN);
}
future<> messaging_service::send_gossip_shutdown(msg_addr id, inet_address from, int64_t generation_number) {
gms::debug_validate_gossip_generation(generation_number);
return send_message_oneway(this, messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(from), generation_number);
}

View File

@@ -98,7 +98,7 @@ const log_entry& fsm::add_entry(T command) {
tmp.enter_joint(command.current);
command = std::move(tmp);
logger.trace("[{}] appending joint config entry at {}: {}", _my_id, _log.next_idx().get_value(), command);
logger.trace("[{}] appending joint config entry at {}: {}", _my_id, _log.next_idx(), command);
}
utils::get_local_injector().inject("fsm::add_entry/test-failure",
@@ -462,7 +462,7 @@ void fsm::maybe_commit() {
// system then transitions to the new configuration.
configuration cfg(_log.get_configuration());
cfg.leave_joint();
logger.trace("[{}] appending non-joint config entry at {}: {}", _my_id, _log.next_idx().get_value(), cfg);
logger.trace("[{}] appending non-joint config entry at {}: {}", _my_id, _log.next_idx(), cfg);
_log.emplace_back(seastar::make_lw_shared<log_entry>({_current_term, _log.next_idx(), std::move(cfg)}));
leader_state().tracker.set_configuration(_log.get_configuration(), _log.last_idx());
// Leaving joint configuration may commit more entries

View File

@@ -14,66 +14,8 @@
namespace raft {
namespace internal {
template<typename Tag>
class tagged_uint64 {
uint64_t _val;
public:
tagged_uint64() : _val(0) {}
explicit tagged_uint64(uint64_t v) : _val(v) {}
tagged_uint64(const tagged_uint64&) = default;
tagged_uint64(tagged_uint64&&) = default;
tagged_uint64& operator=(const tagged_uint64&) = default;
auto operator<=>(const tagged_uint64&) const = default;
explicit operator bool() const { return _val != 0; }
uint64_t get_value() const {
return _val;
}
operator uint64_t() const {
return get_value();
}
tagged_uint64& operator++() { // pre increment
++_val;
return *this;
}
tagged_uint64 operator++(int) { // post increment
uint64_t v = _val++;
return tagged_uint64(v);
}
tagged_uint64& operator--() { // pre decrement
--_val;
return *this;
}
tagged_uint64 operator--(int) { // post decrement
uint64_t v = _val--;
return tagged_uint64(v);
}
tagged_uint64 operator+(const tagged_uint64& o) const {
return tagged_uint64(_val + o._val);
}
tagged_uint64 operator-(const tagged_uint64& o) const {
return tagged_uint64(_val - o._val);
}
friend std::ostream& operator<<(std::ostream& os, const tagged_uint64<Tag>& u) {
os << u._val;
return os;
}
};
template<typename Tag>
using tagged_id = utils::tagged_uuid<Tag>;
} // end of namespace internal
} // end of namespace raft
namespace std {
template<typename Tag>
struct hash<raft::internal::tagged_uint64<Tag>> {
size_t operator()(const raft::internal::tagged_uint64<Tag>& val) const {
return hash<uint64_t>()(val);
}
};
} // end of namespace std

View File

@@ -19,6 +19,7 @@
#include <seastar/core/abort_source.hh>
#include "bytes_ostream.hh"
#include "utils/UUID.hh"
#include "utils/tagged_integer.hh"
#include "internal.hh"
#include "logical_clock.hh"
@@ -39,11 +40,11 @@ using server_id = internal::tagged_id<struct server_id_tag>;
using group_id = raft::internal::tagged_id<struct group_id_tag>;
// This type represents the raft term
using term_t = internal::tagged_uint64<struct term_tag>;
using term_t = utils::tagged_integer<struct term_tag, int64_t>;
// This type represensts the index into the raft log
using index_t = internal::tagged_uint64<struct index_tag>;
using index_t = utils::tagged_integer<struct index_tag, uint64_t>;
// Identifier for a read barrier request
using read_id = internal::tagged_uint64<struct read_id_tag>;
using read_id = utils::tagged_integer<struct read_id_tag, uint64_t>;
// Opaque connection properties. May contain ip:port pair for instance.
// This value is disseminated between cluster member

View File

@@ -2210,9 +2210,9 @@ table::cache_hit_rate table::get_hit_rate(const gms::gossiper& gossiper, gms::in
float f = -1.0f; // missing state means old node
if (state) {
sstring me = format("{}.{}", _schema->ks_name(), _schema->cf_name());
auto i = state->value.find(me);
auto i = state->value().find(me);
if (i != sstring::npos) {
f = strtof(&state->value[i + me.size() + 1], nullptr);
f = strtof(&state->value()[i + me.size() + 1], nullptr);
} else {
f = 0.0f; // empty state means that node has rebooted
}

View File

@@ -4028,6 +4028,30 @@ class scylla_netw(gdb.Command):
gdb.write(' %s\n' % (conn['_stats']))
def get_tagged_integer_type(i):
try:
return i['_value']
except gdb.error:
return i
def get_gms_generation_or_version(i):
return get_tagged_integer_type(i)
def get_gms_versioned_value(vv):
try:
return {
'version': get_gms_generation_or_version(vv['_version']),
'value': vv['_value'],
}
except gdb.error:
return {
'version': vv['version'],
'value': vv['value'],
}
class scylla_gms(gdb.Command):
def __init__(self):
gdb.Command.__init__(self, 'scylla gms', gdb.COMMAND_USER, gdb.COMPLETE_NONE, True)
@@ -4042,7 +4066,8 @@ class scylla_gms(gdb.Command):
for (endpoint, state) in unordered_map(state_map):
ip = ip_to_str(int(get_ip(endpoint)), byteorder=sys.byteorder)
gdb.write('%s: (gms::endpoint_state*) %s (%s)\n' % (ip, state.address, state['_heart_beat_state']))
for app_state, value in std_map(state['_application_state']):
for app_state, vv in std_map(state['_application_state']):
value = get_gms_versioned_value(vv)
gdb.write(' %s: {version=%d, value=%s}\n' % (app_state, value['version'], value['value']))

View File

@@ -37,7 +37,7 @@ public:
virtual future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override {
if (state == gms::application_state::LOAD) {
_load_info[endpoint] = std::stod(value.value);
_load_info[endpoint] = std::stod(value.value());
}
return make_ready_future();
}

View File

@@ -179,7 +179,7 @@ void migration_manager::schedule_schema_pull(const gms::inet_address& endpoint,
if (endpoint != utils::fb_utilities::get_broadcast_address() && value) {
// FIXME: discarded future
(void)maybe_schedule_schema_pull(table_schema_version(utils::UUID{value->value}), endpoint).handle_exception([endpoint] (auto ep) {
(void)maybe_schedule_schema_pull(table_schema_version(utils::UUID{value->value()}), endpoint).handle_exception([endpoint] (auto ep) {
mlogger.warn("Fail to pull schema from {}: {}", endpoint, ep);
});
}
@@ -205,7 +205,7 @@ bool migration_manager::have_schema_agreement() {
mlogger.debug("Schema state not yet available for {}.", endpoint);
return false;
}
auto remote_version = table_schema_version(utils::UUID{schema->value});
auto remote_version = table_schema_version(utils::UUID{schema->value()});
if (our_version != remote_version) {
mlogger.debug("Schema mismatch for {} ({} != {}).", endpoint, our_version, remote_version);
return false;
@@ -251,7 +251,7 @@ future<> migration_manager::maybe_schedule_schema_pull(const table_schema_versio
mlogger.debug("application_state::SCHEMA does not exist for {}, not submitting migration task", endpoint);
return make_ready_future<>();
}
auto current_version = table_schema_version(utils::UUID{value->value});
auto current_version = table_schema_version(utils::UUID{value->value()});
if (db.get_version() == current_version) {
mlogger.debug("not submitting migration task for {} because our versions match", endpoint);
return make_ready_future<>();
@@ -366,7 +366,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
bool migration_manager::has_compatible_schema_tables_version(const gms::inet_address& endpoint) {
auto* version = _gossiper.get_application_state_ptr(endpoint, gms::application_state::SCHEMA_TABLES_VERSION);
return version && version->value == db::schema_tables::version;
return version && version->value() == db::schema_tables::version;
}
bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoint) {

View File

@@ -275,7 +275,7 @@ future<> view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::
size_t current;
size_t max;
api::timestamp_type ticks;
const char* start_bound = value.value.data();
const char* start_bound = value.value().data();
char* end_bound;
for (auto* ptr : {&current, &max}) {
*ptr = std::strtoull(start_bound, &end_bound, 10);
@@ -288,7 +288,7 @@ future<> view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::
return make_ready_future();
}
ticks = std::strtoll(start_bound, &end_bound, 10);
if (ticks == 0 || ticks == LLONG_MAX || end_bound != value.value.data() + value.value.size()) {
if (ticks == 0 || ticks == LLONG_MAX || end_bound != value.value().data() + value.value().size()) {
return make_ready_future();
}
auto backlog = view_update_backlog_timestamped{db::view::update_backlog{current, max}, ticks};

View File

@@ -617,7 +617,7 @@ void raft_group0::load_initial_raft_address_map() {
if (value == nullptr) {
continue;
}
auto server_id = utils::UUID(value->value);
auto server_id = utils::UUID(value->value());
if (server_id == utils::UUID{}) {
upgrade_log.error("empty Host ID for host {} ", ip_addr);
continue;

View File

@@ -77,7 +77,7 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang
on_endpoint_change(gms::inet_address endpoint, gms::endpoint_state ep_state) {
auto app_state_ptr = ep_state.get_application_state_ptr(gms::application_state::HOST_ID);
if (app_state_ptr) {
raft::server_id id(utils::UUID(app_state_ptr->value));
raft::server_id id(utils::UUID(app_state_ptr->value()));
rslog.debug("gossiper_state_change_subscriber_proxy::on_endpoint_change() {} {}", endpoint, id);
_address_map.add_or_update_entry(id, endpoint);
}

View File

@@ -59,7 +59,7 @@
#include "repair/repair.hh"
#include "repair/row_level.hh"
#include "service/priority_manager.hh"
#include "utils/generation-number.hh"
#include "gms/generation-number.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
@@ -1408,7 +1408,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
slogger.info("Starting up server gossip");
auto generation_number = co_await _sys_ks.local().increment_and_get_generation();
auto generation_number = gms::generation_type(co_await _sys_ks.local().increment_and_get_generation());
auto advertise = gms::advertise_myself(!replacing_a_node_with_same_ip);
co_await _gossiper.start_gossiping(generation_number, app_states, advertise);
@@ -1816,7 +1816,7 @@ storage_service::get_rpc_address(const inet_address& endpoint) const {
if (endpoint != get_broadcast_address()) {
auto* v = _gossiper.get_application_state_ptr(endpoint, gms::application_state::RPC_ADDRESS);
if (v) {
return v->value;
return v->value();
}
}
return fmt::to_string(endpoint);
@@ -2160,9 +2160,9 @@ future<> storage_service::handle_state_removing(inet_address endpoint, std::vect
throw std::runtime_error(err);
}
std::vector<sstring> coordinator;
boost::split(coordinator, value->value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
boost::split(coordinator, value->value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
if (coordinator.size() != 2) {
auto err = format("Can not split REMOVAL_COORDINATOR for endpoint={}, value={}", endpoint, value->value);
auto err = format("Can not split REMOVAL_COORDINATOR for endpoint={}, value={}", endpoint, value->value());
slogger.warn("{}", err);
throw std::runtime_error(err);
}
@@ -2236,7 +2236,7 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta
slogger.debug("endpoint={} on_change: app_state={}, versioned_value={}", endpoint, state, value);
if (state == application_state::STATUS) {
std::vector<sstring> pieces;
boost::split(pieces, value.value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
boost::split(pieces, value.value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
if (pieces.empty()) {
slogger.warn("Fail to split status in on_change: endpoint={}, app_state={}, value={}", endpoint, state, value);
co_return;
@@ -2274,7 +2274,7 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta
slogger.debug("Got application_state::RPC_READY for node {}, is_cql_ready={}", endpoint, ep_state->is_cql_ready());
co_await notify_cql_change(endpoint, ep_state->is_cql_ready());
} else if (state == application_state::INTERNAL_IP) {
co_await maybe_reconnect_to_preferred_ip(endpoint, inet_address(value.value));
co_await maybe_reconnect_to_preferred_ip(endpoint, inet_address(value.value()));
}
}
}
@@ -2330,18 +2330,18 @@ future<> storage_service::update_table(gms::inet_address endpoint, sstring col,
future<> storage_service::do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value) {
slogger.debug("Update system.peers table: endpoint={}, app_state={}, versioned_value={}", endpoint, state, value);
if (state == application_state::RELEASE_VERSION) {
co_await update_table(endpoint, "release_version", value.value);
co_await update_table(endpoint, "release_version", value.value());
} else if (state == application_state::DC) {
co_await update_table(endpoint, "data_center", value.value);
co_await update_table(endpoint, "data_center", value.value());
} else if (state == application_state::RACK) {
co_await update_table(endpoint, "rack", value.value);
co_await update_table(endpoint, "rack", value.value());
} else if (state == application_state::INTERNAL_IP) {
auto col = sstring("preferred_ip");
inet_address ep;
try {
ep = gms::inet_address(value.value);
ep = gms::inet_address(value.value());
} catch (...) {
slogger.error("fail to update {} for {}: invalid address {}", col, endpoint, value.value);
slogger.error("fail to update {} for {}: invalid address {}", col, endpoint, value.value());
co_return;
}
co_await update_table(endpoint, col, ep.addr());
@@ -2349,18 +2349,18 @@ future<> storage_service::do_update_system_peers_table(gms::inet_address endpoin
auto col = sstring("rpc_address");
inet_address ep;
try {
ep = gms::inet_address(value.value);
ep = gms::inet_address(value.value());
} catch (...) {
slogger.error("fail to update {} for {}: invalid rcpaddr {}", col, endpoint, value.value);
slogger.error("fail to update {} for {}: invalid rcpaddr {}", col, endpoint, value.value());
co_return;
}
co_await update_table(endpoint, col, ep.addr());
} else if (state == application_state::SCHEMA) {
co_await update_table(endpoint, "schema_version", utils::UUID(value.value));
co_await update_table(endpoint, "schema_version", utils::UUID(value.value()));
} else if (state == application_state::HOST_ID) {
co_await update_table(endpoint, "host_id", utils::UUID(value.value));
co_await update_table(endpoint, "host_id", utils::UUID(value.value()));
} else if (state == application_state::SUPPORTED_FEATURES) {
co_await update_table(endpoint, "supported_features", value.value);
co_await update_table(endpoint, "supported_features", value.value());
}
}
@@ -2390,8 +2390,8 @@ locator::endpoint_dc_rack storage_service::get_dc_rack_for(inet_address endpoint
auto* dc = _gossiper.get_application_state_ptr(endpoint, gms::application_state::DC);
auto* rack = _gossiper.get_application_state_ptr(endpoint, gms::application_state::RACK);
return locator::endpoint_dc_rack{
.dc = dc ? dc->value : locator::endpoint_dc_rack::default_location.dc,
.rack = rack ? rack->value : locator::endpoint_dc_rack::default_location.rack,
.dc = dc ? dc->value() : locator::endpoint_dc_rack::default_location.dc,
.rack = rack ? rack->value() : locator::endpoint_dc_rack::default_location.rack,
};
}
@@ -2948,7 +2948,7 @@ future<> storage_service::start_gossiping() {
co_await ss._sys_ks.local().get_local_tokens(),
cdc_gen_ts);
ss._gossiper.force_newer_generation();
co_await ss._gossiper.start_gossiping(utils::get_generation_number());
co_await ss._gossiper.start_gossiping(gms::get_generation_number());
} catch (...) {
should_stop_gossiper = true;
}

View File

@@ -98,7 +98,7 @@ int main(int ac, char ** av) {
using namespace std::chrono;
auto now = high_resolution_clock::now().time_since_epoch();
int generation_number = duration_cast<seconds>(now).count();
auto generation_number = gms::generation_type(duration_cast<seconds>(now).count());
gossiper.local().start_gossiping(generation_number, app_states).get();
static double load = 0.5;
for (;;) {

View File

@@ -56,8 +56,8 @@ public:
auto from = netw::messaging_service::get_source(cinfo);
auto ep1 = inet_address("1.1.1.1");
auto ep2 = inet_address("2.2.2.2");
int32_t gen = 800;
int32_t ver = 900;
gms::generation_type gen(800);
gms::version_type ver(900);
utils::chunked_vector<gms::gossip_digest> digests;
digests.push_back(gms::gossip_digest(ep1, gen++, ver++));
digests.push_back(gms::gossip_digest(ep2, gen++, ver++));
@@ -114,8 +114,8 @@ public:
auto id = get_msg_addr();
auto ep1 = inet_address("1.1.1.1");
auto ep2 = inet_address("2.2.2.2");
int32_t gen = 100;
int32_t ver = 900;
gms::generation_type gen(100);
gms::version_type ver(900);
utils::chunked_vector<gms::gossip_digest> digests;
digests.push_back(gms::gossip_digest(ep1, gen++, ver++));
digests.push_back(gms::gossip_digest(ep2, gen++, ver++));

View File

@@ -893,7 +893,7 @@ class persistence {
if (b == _stored_entries.end() || (*b)->idx >= idx) {
return b;
}
return b + std::min((idx - (*b)->idx).get_value(), _stored_entries.size());
return b + std::min(size_t(idx - (*b)->idx), _stored_entries.size());
}
public:

View File

@@ -21,7 +21,6 @@ target_sources(utils
error_injection.cc
exceptions.cc
file_lock.cc
generation-number.cc
gz/crc_combine.cc
gz/crc_combine_table.cc
hashers.cc

View File

@@ -1,21 +0,0 @@
/*
* Copyright 2020-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <chrono>
#include "generation-number.hh"
namespace utils {
int get_generation_number() {
using namespace std::chrono;
auto now = high_resolution_clock::now().time_since_epoch();
int generation_number = duration_cast<seconds>(now).count();
return generation_number;
}
}

View File

@@ -1,15 +0,0 @@
/*
* Copyright 2020-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
namespace utils {
int get_generation_number();
}

93
utils/tagged_integer.hh Normal file
View File

@@ -0,0 +1,93 @@
/*
* Copyright 2020-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <cstdint>
#include <compare>
#include <iostream>
#include <type_traits>
namespace utils {
template <typename Tag, std::integral ValueType>
class tagged_integer {
public:
using value_type = ValueType;
private:
value_type _value;
public:
tagged_integer() noexcept : _value(0) {}
explicit tagged_integer(value_type v) noexcept : _value(v) {}
tagged_integer& operator=(value_type v) noexcept {
_value = v;
return *this;
}
value_type value() const noexcept { return _value; }
operator value_type() const noexcept { return _value; }
explicit operator bool() const { return _value != 0; }
auto operator<=>(const tagged_integer& o) const = default;
tagged_integer& operator++() noexcept {
++_value;
return *this;
}
tagged_integer& operator--() noexcept {
--_value;
return *this;
}
tagged_integer operator++(int) noexcept {
auto ret = *this;
++_value;
return ret;
}
tagged_integer operator--(int) noexcept {
auto ret = *this;
--_value;
return ret;
}
tagged_integer operator+(const tagged_integer& o) const {
return tagged_integer(_value + o._value);
}
tagged_integer operator-(const tagged_integer& o) const {
return tagged_integer(_value - o._value);
}
tagged_integer& operator+=(const tagged_integer& o) const {
_value += o._value;
return *this;
}
tagged_integer& operator-=(const tagged_integer& o) const {
_value -= o._value;
return *this;
}
};
} // namespace utils
namespace std {
template <typename Tag, std::integral ValueType>
struct hash<utils::tagged_integer<Tag, ValueType>> {
size_t operator()(const utils::tagged_integer<Tag, ValueType>& x) const noexcept {
return hash<ValueType>{}(x.value());
}
};
template <typename Tag, std::integral ValueType>
[[maybe_unused]] ostream& operator<<(ostream& s, const utils::tagged_integer<Tag, ValueType>& x) {
return s << x.value();
}
} // namespace std