cdc: generation: use token_metadata_ptr

So it could be safely held across continuations.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2020-10-04 16:14:31 +03:00
parent ecda21224e
commit 7697c0f129
3 changed files with 16 additions and 19 deletions

View File

@@ -181,12 +181,12 @@ const std::vector<token_range_description>& topology_description::entries() cons
class topology_description_generator final {
const db::config& _cfg;
const std::unordered_set<dht::token>& _bootstrap_tokens;
const locator::token_metadata& _token_metadata;
const locator::token_metadata_ptr _tmptr;
const gms::gossiper& _gossiper;
// Compute a set of tokens that split the token ring into vnodes
auto get_tokens() const {
auto tokens = _token_metadata.sorted_tokens();
auto tokens = _tmptr->sorted_tokens();
auto it = tokens.insert(
tokens.end(), _bootstrap_tokens.begin(), _bootstrap_tokens.end());
std::sort(it, tokens.end());
@@ -201,7 +201,7 @@ class topology_description_generator final {
if (_bootstrap_tokens.contains(end)) {
return {smp::count, _cfg.murmur3_partitioner_ignore_msb_bits()};
} else {
auto endpoint = _token_metadata.get_endpoint(end);
auto endpoint = _tmptr->get_endpoint(end);
if (!endpoint) {
throw std::runtime_error(
format("Can't find endpoint for token {}", end));
@@ -235,11 +235,11 @@ public:
topology_description_generator(
const db::config& cfg,
const std::unordered_set<dht::token>& bootstrap_tokens,
const locator::token_metadata& token_metadata,
const locator::token_metadata_ptr tmptr,
const gms::gossiper& gossiper)
: _cfg(cfg)
, _bootstrap_tokens(bootstrap_tokens)
, _token_metadata(token_metadata)
, _tmptr(std::move(tmptr))
, _gossiper(gossiper)
{}
@@ -298,19 +298,19 @@ future<db_clock::time_point> get_local_streams_timestamp() {
db_clock::time_point make_new_cdc_generation(
const db::config& cfg,
const std::unordered_set<dht::token>& bootstrap_tokens,
const locator::token_metadata& tm,
const locator::token_metadata_ptr tmptr,
const gms::gossiper& g,
db::system_distributed_keyspace& sys_dist_ks,
std::chrono::milliseconds ring_delay,
bool for_testing) {
using namespace std::chrono;
auto gen = topology_description_generator(cfg, bootstrap_tokens, tm, g).generate();
auto gen = topology_description_generator(cfg, bootstrap_tokens, tmptr, g).generate();
// Begin the race.
auto ts = db_clock::now() + (
(for_testing || ring_delay == milliseconds(0)) ? milliseconds(0) : (
2 * ring_delay + duration_cast<milliseconds>(generation_leeway)));
sys_dist_ks.insert_cdc_topology_description(ts, std::move(gen), { tm.count_normal_token_owners() }).get();
sys_dist_ks.insert_cdc_topology_description(ts, std::move(gen), { tmptr->count_normal_token_owners() }).get();
return ts;
}

View File

@@ -40,6 +40,7 @@
#include "database_fwd.hh"
#include "db_clock.hh"
#include "dht/token.hh"
#include "locator/token_metadata.hh"
namespace seastar {
class abort_source;
@@ -55,10 +56,6 @@ namespace gms {
class gossiper;
} // namespace gms
namespace locator {
class token_metadata;
} // namespace locator
namespace cdc {
class stream_id final {
@@ -168,7 +165,7 @@ future<db_clock::time_point> get_local_streams_timestamp();
db_clock::time_point make_new_cdc_generation(
const db::config& cfg,
const std::unordered_set<dht::token>& bootstrap_tokens,
const locator::token_metadata& tm,
const locator::token_metadata_ptr tmptr,
const gms::gossiper& g,
db::system_distributed_keyspace& sys_dist_ks,
std::chrono::milliseconds ring_delay,

View File

@@ -583,7 +583,7 @@ void storage_service::join_token_ring(int delay) {
|| cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) {
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
_bootstrap_tokens, get_token_metadata(), _gossiper,
_bootstrap_tokens, get_token_metadata_ptr(), _gossiper,
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
}
}
@@ -818,10 +818,10 @@ future<> storage_service::check_and_repair_cdc_streams() {
" node(s) being down or unreachable. It is recommended to check the network and"
" restart/remove the failed node(s), then retry checkAndRepairCdcStreams command";
static const auto exception_translating_msg = "Translating the exception to `request_execution_exception`";
const auto& tm = get_token_metadata();
const auto tmptr = get_token_metadata_ptr();
try {
gen = _sys_dist_ks.local().read_cdc_topology_description(
*latest, { tm.count_normal_token_owners() }).get0();
*latest, { tmptr->count_normal_token_owners() }).get0();
} catch (exceptions::request_timeout_exception& e) {
cdc_log.error("{}: \"{}\". {}.", timeout_msg, e.what(), exception_translating_msg);
throw exceptions::request_execution_exception(exceptions::exception_code::READ_TIMEOUT,
@@ -854,7 +854,7 @@ future<> storage_service::check_and_repair_cdc_streams() {
for (const auto& entry : gen->entries()) {
gen_ends.insert(entry.token_range_end);
}
for (const auto& metadata_token : tm.sorted_tokens()) {
for (const auto& metadata_token : tmptr->sorted_tokens()) {
if (!gen_ends.contains(metadata_token)) {
cdc_log.warn("CDC generation {} missing token {}. Regenerating.", latest, metadata_token);
should_regenerate = true;
@@ -871,7 +871,7 @@ future<> storage_service::check_and_repair_cdc_streams() {
return;
}
const auto new_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
{}, tm, _gossiper,
{}, std::move(tmptr), _gossiper,
_sys_dist_ks.local(), get_ring_delay(), false /* for_testing */);
// Need to artificially update our STATUS so other nodes handle the timestamp change
auto status = _gossiper.get_application_state_ptr(get_broadcast_address(), application_state::STATUS);
@@ -919,7 +919,7 @@ void storage_service::bootstrap() {
assert(!_cdc_streams_ts);
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
_bootstrap_tokens, get_token_metadata(), _gossiper,
_bootstrap_tokens, get_token_metadata_ptr(), _gossiper,
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
} else {
// We should not be able to join the cluster if other nodes support CDC but we don't.