tablets: Implement tablet sharder
This commit is contained in:
78
locator/tablet_sharder.hh
Normal file
78
locator/tablet_sharder.hh
Normal file
@@ -0,0 +1,78 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "locator/tablets.hh"
|
||||
#include "dht/sharder.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
|
||||
namespace locator {
|
||||
|
||||
/// Implements sharder object which reflects assignment of tablets of a given table to local shards.
|
||||
/// Token ranges which don't have local tablets are reported to belong to shard 0.
|
||||
class tablet_sharder : public dht::sharder {
|
||||
const token_metadata& _tm;
|
||||
table_id _table;
|
||||
mutable const tablet_map* _tmap = nullptr;
|
||||
private:
|
||||
// Tablet map is lazily initialized to avoid exceptions during effective_replication_map construction
|
||||
// in case tablet mapping is not yet available in token metadata at the time the table is constructed.
|
||||
void ensure_tablet_map() const {
|
||||
if (!_tmap) {
|
||||
_tmap = &_tm.tablets().get_tablet_map(_table);
|
||||
}
|
||||
}
|
||||
public:
|
||||
tablet_sharder(const token_metadata& tm, table_id table)
|
||||
: _tm(tm)
|
||||
, _table(table)
|
||||
{ }
|
||||
|
||||
virtual ~tablet_sharder() = default;
|
||||
|
||||
virtual unsigned shard_of(const dht::token& token) const override {
|
||||
ensure_tablet_map();
|
||||
auto tid = _tmap->get_tablet_id(token);
|
||||
auto shard = _tmap->get_shard(tid, _tm.get_my_id());
|
||||
tablet_logger.trace("[{}] shard_of({}) = {}, tablet={}", _table, token, shard, tid);
|
||||
return shard.value_or(0);
|
||||
}
|
||||
|
||||
virtual std::optional<dht::shard_and_token> next_shard(const token& t) const override {
|
||||
ensure_tablet_map();
|
||||
auto me = _tm.get_my_id();
|
||||
std::optional<tablet_id> tb = _tmap->get_tablet_id(t);
|
||||
while ((tb = _tmap->next_tablet(*tb))) {
|
||||
auto r = _tmap->get_shard(*tb, me);
|
||||
auto next = _tmap->get_first_token(*tb);
|
||||
tablet_logger.trace("[{}] token_for_next_shard({}) = {{{}, {}}}, tablet={}", _table, t, next, r, *tb);
|
||||
return dht::shard_and_token{r.value_or(0), next};
|
||||
}
|
||||
tablet_logger.trace("[{}] token_for_next_shard({}) = null", _table, t);
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
virtual token token_for_next_shard(const token& t, shard_id shard, unsigned spans = 1) const override {
|
||||
ensure_tablet_map();
|
||||
auto token = t;
|
||||
while (auto s_a_t = next_shard(token)) {
|
||||
token = s_a_t->token;
|
||||
if (s_a_t->shard == shard) {
|
||||
if (--spans == 0) {
|
||||
tablet_logger.trace("[{}] token_for_next_shard({}, {}, {}) = {}", _table, t, shard, spans, s_a_t->token);
|
||||
return token;
|
||||
}
|
||||
}
|
||||
}
|
||||
tablet_logger.trace("[{}] token_for_next_shard({}, {}, {}) = null", _table, t, shard, spans);
|
||||
return dht::maximum_token();
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace locator
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#include "locator/tablet_replication_strategy.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "locator/tablet_sharder.hh"
|
||||
#include "locator/token_range_splitter.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "types/types.hh"
|
||||
|
||||
@@ -17,7 +17,9 @@
|
||||
|
||||
#include "replica/tablets.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "locator/tablet_sharder.hh"
|
||||
#include "locator/tablet_replication_strategy.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
|
||||
using namespace locator;
|
||||
using namespace replica;
|
||||
@@ -263,6 +265,112 @@ SEASTAR_TEST_CASE(test_get_shard) {
|
||||
}, tablet_cql_test_config());
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_sharder) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
|
||||
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
|
||||
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
|
||||
|
||||
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
|
||||
|
||||
token_metadata tokm(token_metadata::config{});
|
||||
tokm.get_topology().add_or_update_endpoint(utils::fb_utilities::get_broadcast_address(), h1);
|
||||
|
||||
std::vector<tablet_id> tablet_ids;
|
||||
{
|
||||
tablet_map tmap(4);
|
||||
auto tid = tmap.first_tablet();
|
||||
|
||||
tablet_ids.push_back(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {h1, 3},
|
||||
tablet_replica {h3, 5},
|
||||
}
|
||||
});
|
||||
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tablet_ids.push_back(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {h2, 3},
|
||||
tablet_replica {h3, 1},
|
||||
}
|
||||
});
|
||||
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tablet_ids.push_back(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {h3, 2},
|
||||
tablet_replica {h1, 1},
|
||||
}
|
||||
});
|
||||
tmap.set_tablet_transition_info(tid, tablet_transition_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {h1, 1},
|
||||
tablet_replica {h2, 3},
|
||||
},
|
||||
tablet_replica {h2, 3}
|
||||
});
|
||||
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tablet_ids.push_back(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {h3, 7},
|
||||
tablet_replica {h2, 3},
|
||||
}
|
||||
});
|
||||
|
||||
tablet_metadata tm;
|
||||
tm.set_tablet_map(table1, std::move(tmap));
|
||||
tokm.set_tablets(std::move(tm));
|
||||
}
|
||||
|
||||
auto& tm = tokm.tablets().get_tablet_map(table1);
|
||||
tablet_sharder sharder(tokm, table1);
|
||||
BOOST_REQUIRE_EQUAL(sharder.shard_of(tm.get_last_token(tablet_ids[0])), 3);
|
||||
BOOST_REQUIRE_EQUAL(sharder.shard_of(tm.get_last_token(tablet_ids[1])), 0); // missing
|
||||
BOOST_REQUIRE_EQUAL(sharder.shard_of(tm.get_last_token(tablet_ids[2])), 1);
|
||||
BOOST_REQUIRE_EQUAL(sharder.shard_of(tm.get_last_token(tablet_ids[3])), 0); // missing
|
||||
|
||||
BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard(tm.get_last_token(tablet_ids[1]), 0), tm.get_first_token(tablet_ids[3]));
|
||||
BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard(tm.get_last_token(tablet_ids[1]), 1), tm.get_first_token(tablet_ids[2]));
|
||||
BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard(tm.get_last_token(tablet_ids[1]), 3), dht::maximum_token());
|
||||
|
||||
BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard(tm.get_first_token(tablet_ids[1]), 0), tm.get_first_token(tablet_ids[3]));
|
||||
BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard(tm.get_first_token(tablet_ids[1]), 1), tm.get_first_token(tablet_ids[2]));
|
||||
BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard(tm.get_first_token(tablet_ids[1]), 3), dht::maximum_token());
|
||||
|
||||
{
|
||||
auto shard_opt = sharder.next_shard(tm.get_last_token(tablet_ids[0]));
|
||||
BOOST_REQUIRE(shard_opt);
|
||||
BOOST_REQUIRE_EQUAL(shard_opt->shard, 0);
|
||||
BOOST_REQUIRE_EQUAL(shard_opt->token, tm.get_first_token(tablet_ids[1]));
|
||||
}
|
||||
|
||||
{
|
||||
auto shard_opt = sharder.next_shard(tm.get_last_token(tablet_ids[1]));
|
||||
BOOST_REQUIRE(shard_opt);
|
||||
BOOST_REQUIRE_EQUAL(shard_opt->shard, 1);
|
||||
BOOST_REQUIRE_EQUAL(shard_opt->token, tm.get_first_token(tablet_ids[2]));
|
||||
}
|
||||
|
||||
{
|
||||
auto shard_opt = sharder.next_shard(tm.get_last_token(tablet_ids[2]));
|
||||
BOOST_REQUIRE(shard_opt);
|
||||
BOOST_REQUIRE_EQUAL(shard_opt->shard, 0);
|
||||
BOOST_REQUIRE_EQUAL(shard_opt->token, tm.get_first_token(tablet_ids[3]));
|
||||
}
|
||||
|
||||
{
|
||||
auto shard_opt = sharder.next_shard(tm.get_last_token(tablet_ids[3]));
|
||||
BOOST_REQUIRE(!shard_opt);
|
||||
}
|
||||
}, tablet_cql_test_config());
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_large_tablet_metadata) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
tablet_metadata tm;
|
||||
|
||||
Reference in New Issue
Block a user