cdc: use chunked_vector instead of vector for stream ids
use utils::chunked_vector instead of std::vector to store cdc stream
sets for tablets.
a cdc stream set usually represents all streams for a specific table and
timestamp, and has a stream id per each tablet of the table. each stream
id is represented by 16 bytes. thus the vector could require quite large
contiguous allocations for a table that has many tablets. change it to
chunked_vector to avoid large contiguous allocations.
Fixes scylladb/scylladb#26791
Closes scylladb/scylladb#26792
(cherry picked from commit e7dbccd59e)
Closes scylladb/scylladb#26828
This commit is contained in:
committed by
Piotr Dulikowski
parent
0656a73c52
commit
8f7a6fd5eb
@@ -1209,7 +1209,7 @@ future<mutation> create_table_streams_mutation(table_id table, db_clock::time_po
|
||||
co_return std::move(m);
|
||||
}
|
||||
|
||||
future<mutation> create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const std::vector<cdc::stream_id>& stream_ids, api::timestamp_type ts) {
|
||||
future<mutation> create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const utils::chunked_vector<cdc::stream_id>& stream_ids, api::timestamp_type ts) {
|
||||
auto s = db::system_keyspace::cdc_streams_state();
|
||||
|
||||
mutation m(s, partition_key::from_single_value(*s,
|
||||
@@ -1252,24 +1252,24 @@ future<> generation_service::load_cdc_tablet_streams(std::optional<std::unordere
|
||||
tables_to_process = _cdc_metadata.get_tables_with_cdc_tablet_streams() | std::ranges::to<std::unordered_set<table_id>>();
|
||||
}
|
||||
|
||||
auto read_streams_state = [this] (const std::optional<std::unordered_set<table_id>>& tables, noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f) -> future<> {
|
||||
auto read_streams_state = [this] (const std::optional<std::unordered_set<table_id>>& tables, noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) -> future<> {
|
||||
if (tables) {
|
||||
for (auto table : *tables) {
|
||||
co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
return f(table, base_ts, std::move(base_stream_set));
|
||||
});
|
||||
}
|
||||
} else {
|
||||
co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
return f(table, base_ts, std::move(base_stream_set));
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
|
||||
table_streams new_table_map;
|
||||
|
||||
auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, std::vector<cdc::stream_id> stream_set) {
|
||||
auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, utils::chunked_vector<cdc::stream_id> stream_set) {
|
||||
auto ts = std::chrono::duration_cast<api::timestamp_clock::duration>(stream_tp.time_since_epoch()).count();
|
||||
new_table_map[ts] = committed_stream_set {stream_tp, std::move(stream_set)};
|
||||
};
|
||||
@@ -1345,7 +1345,7 @@ future<> generation_service::query_cdc_timestamps(table_id table, bool ascending
|
||||
}
|
||||
}
|
||||
|
||||
future<> generation_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
|
||||
future<> generation_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
|
||||
const auto& all_tables = _cdc_metadata.get_all_tablet_streams();
|
||||
auto table_it = all_tables.find(table);
|
||||
if (table_it == all_tables.end()) {
|
||||
@@ -1402,8 +1402,8 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector
|
||||
co_return;
|
||||
}
|
||||
|
||||
std::vector<cdc::stream_id> new_streams;
|
||||
new_streams.reserve(new_tablet_map.tablet_count());
|
||||
utils::chunked_vector<cdc::stream_id> new_streams;
|
||||
co_await utils::reserve_gently(new_streams, new_tablet_map.tablet_count());
|
||||
for (auto tid : new_tablet_map.tablet_ids()) {
|
||||
new_streams.emplace_back(new_tablet_map.get_last_token(tid), 0);
|
||||
co_await coroutine::maybe_yield();
|
||||
@@ -1425,7 +1425,7 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector
|
||||
muts.emplace_back(std::move(mut));
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const std::vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts) {
|
||||
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts) {
|
||||
utils::chunked_vector<mutation> muts;
|
||||
muts.reserve(2);
|
||||
|
||||
|
||||
@@ -143,12 +143,12 @@ stream_state read_stream_state(int8_t val);
|
||||
|
||||
struct committed_stream_set {
|
||||
db_clock::time_point ts;
|
||||
std::vector<cdc::stream_id> streams;
|
||||
utils::chunked_vector<cdc::stream_id> streams;
|
||||
};
|
||||
|
||||
struct cdc_stream_diff {
|
||||
std::vector<stream_id> closed_streams;
|
||||
std::vector<stream_id> opened_streams;
|
||||
utils::chunked_vector<stream_id> closed_streams;
|
||||
utils::chunked_vector<stream_id> opened_streams;
|
||||
};
|
||||
|
||||
using table_streams = std::map<api::timestamp_type, committed_stream_set>;
|
||||
@@ -220,11 +220,11 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v3(
|
||||
size_t mutation_size_threshold, api::timestamp_type mutation_timestamp);
|
||||
|
||||
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const locator::tablet_map&, api::timestamp_type);
|
||||
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const std::vector<cdc::stream_id>&, api::timestamp_type);
|
||||
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const utils::chunked_vector<cdc::stream_id>&, api::timestamp_type);
|
||||
utils::chunked_vector<mutation> make_drop_table_streams_mutations(table_id, api::timestamp_type ts);
|
||||
|
||||
future<mutation> get_switch_streams_mutation(table_id table, db_clock::time_point stream_ts, cdc_stream_diff diff, api::timestamp_type ts);
|
||||
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const std::vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts);
|
||||
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts);
|
||||
table_streams::const_iterator get_new_base_for_gc(const table_streams&, std::chrono::seconds ttl);
|
||||
|
||||
} // namespace cdc
|
||||
|
||||
@@ -149,7 +149,7 @@ public:
|
||||
future<> load_cdc_tablet_streams(std::optional<std::unordered_set<table_id>> changed_tables);
|
||||
|
||||
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
|
||||
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
||||
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
||||
|
||||
future<> generate_tablet_resize_update(utils::chunked_vector<canonical_mutation>& muts, table_id table, const locator::tablet_map& new_tablet_map, api::timestamp_type ts);
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ cdc::stream_id get_stream(
|
||||
}
|
||||
|
||||
static cdc::stream_id get_stream(
|
||||
const std::vector<cdc::stream_id>& streams,
|
||||
const utils::chunked_vector<cdc::stream_id>& streams,
|
||||
dht::token tok) {
|
||||
if (streams.empty()) {
|
||||
on_internal_error(cdc_log, "get_stream: streams empty");
|
||||
@@ -159,7 +159,7 @@ cdc::stream_id cdc::metadata::get_vnode_stream(api::timestamp_type ts, dht::toke
|
||||
return ret;
|
||||
}
|
||||
|
||||
const std::vector<cdc::stream_id>& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const {
|
||||
const utils::chunked_vector<cdc::stream_id>& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const {
|
||||
auto now = api::new_timestamp();
|
||||
if (ts > now + get_generation_leeway().count()) {
|
||||
throw exceptions::invalid_request_exception(seastar::format(
|
||||
@@ -259,10 +259,10 @@ bool cdc::metadata::prepare(db_clock::time_point tp) {
|
||||
return !it->second;
|
||||
}
|
||||
|
||||
future<std::vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
|
||||
const std::vector<cdc::stream_id>& prev_stream_set,
|
||||
std::vector<cdc::stream_id> opened,
|
||||
const std::vector<cdc::stream_id>& closed) {
|
||||
future<utils::chunked_vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
|
||||
const utils::chunked_vector<cdc::stream_id>& prev_stream_set,
|
||||
utils::chunked_vector<cdc::stream_id> opened,
|
||||
const utils::chunked_vector<cdc::stream_id>& closed) {
|
||||
|
||||
if (closed.size() == prev_stream_set.size()) {
|
||||
// all previous streams are closed, so the next stream set is just the opened streams.
|
||||
@@ -273,8 +273,8 @@ future<std::vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
|
||||
// streams and removing the closed streams. we assume each stream set is
|
||||
// sorted by token, and the result is sorted as well.
|
||||
|
||||
std::vector<cdc::stream_id> next_stream_set;
|
||||
next_stream_set.reserve(prev_stream_set.size() + opened.size() - closed.size());
|
||||
utils::chunked_vector<cdc::stream_id> next_stream_set;
|
||||
co_await utils::reserve_gently(next_stream_set, prev_stream_set.size() + opened.size() - closed.size());
|
||||
|
||||
auto next_prev = prev_stream_set.begin();
|
||||
auto next_closed = closed.begin();
|
||||
@@ -318,8 +318,8 @@ std::vector<table_id> cdc::metadata::get_tables_with_cdc_tablet_streams() const
|
||||
return _tablet_streams | std::views::keys | std::ranges::to<std::vector<table_id>>();
|
||||
}
|
||||
|
||||
future<cdc::cdc_stream_diff> cdc::metadata::generate_stream_diff(const std::vector<stream_id>& before, const std::vector<stream_id>& after) {
|
||||
std::vector<stream_id> closed, opened;
|
||||
future<cdc::cdc_stream_diff> cdc::metadata::generate_stream_diff(const utils::chunked_vector<stream_id>& before, const utils::chunked_vector<stream_id>& after) {
|
||||
utils::chunked_vector<stream_id> closed, opened;
|
||||
|
||||
auto before_it = before.begin();
|
||||
auto after_it = after.begin();
|
||||
|
||||
@@ -49,7 +49,7 @@ class metadata final {
|
||||
|
||||
container_t::const_iterator gen_used_at(api::timestamp_type ts) const;
|
||||
|
||||
const std::vector<stream_id>& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const;
|
||||
const utils::chunked_vector<stream_id>& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const;
|
||||
|
||||
public:
|
||||
/* Is a generation with the given timestamp already known or obsolete? It is obsolete if and only if
|
||||
@@ -111,14 +111,14 @@ public:
|
||||
|
||||
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
|
||||
|
||||
static future<std::vector<stream_id>> construct_next_stream_set(
|
||||
const std::vector<cdc::stream_id>& prev_stream_set,
|
||||
std::vector<cdc::stream_id> opened,
|
||||
const std::vector<cdc::stream_id>& closed);
|
||||
static future<utils::chunked_vector<stream_id>> construct_next_stream_set(
|
||||
const utils::chunked_vector<cdc::stream_id>& prev_stream_set,
|
||||
utils::chunked_vector<cdc::stream_id> opened,
|
||||
const utils::chunked_vector<cdc::stream_id>& closed);
|
||||
|
||||
static future<cdc_stream_diff> generate_stream_diff(
|
||||
const std::vector<stream_id>& before,
|
||||
const std::vector<stream_id>& after);
|
||||
const utils::chunked_vector<stream_id>& before,
|
||||
const utils::chunked_vector<stream_id>& after);
|
||||
|
||||
};
|
||||
|
||||
|
||||
@@ -2463,14 +2463,14 @@ future<bool> system_keyspace::cdc_is_rewritten() {
|
||||
}
|
||||
|
||||
future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
|
||||
noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f) {
|
||||
noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) {
|
||||
static const sstring all_tables_query = format("SELECT table_id, timestamp, stream_id FROM {}.{}", NAME, CDC_STREAMS_STATE);
|
||||
static const sstring single_table_query = format("SELECT table_id, timestamp, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_STATE);
|
||||
|
||||
struct cur_t {
|
||||
table_id tid;
|
||||
db_clock::time_point ts;
|
||||
std::vector<cdc::stream_id> streams;
|
||||
utils::chunked_vector<cdc::stream_id> streams;
|
||||
};
|
||||
std::optional<cur_t> cur;
|
||||
|
||||
@@ -2487,7 +2487,7 @@ future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
|
||||
if (cur) {
|
||||
co_await f(cur->tid, cur->ts, std::move(cur->streams));
|
||||
}
|
||||
cur = { tid, ts, std::vector<cdc::stream_id>() };
|
||||
cur = { tid, ts, utils::chunked_vector<cdc::stream_id>() };
|
||||
}
|
||||
cur->streams.push_back(std::move(stream_id));
|
||||
|
||||
|
||||
@@ -601,7 +601,7 @@ public:
|
||||
future<bool> cdc_is_rewritten();
|
||||
future<> cdc_set_rewritten(std::optional<cdc::generation_id_v1>);
|
||||
|
||||
future<> read_cdc_streams_state(std::optional<table_id> table, noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f);
|
||||
future<> read_cdc_streams_state(std::optional<table_id> table, noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f);
|
||||
future<> read_cdc_streams_history(table_id table, std::optional<db_clock::time_point> from, noncopyable_function<future<>(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f);
|
||||
|
||||
// Load Raft Group 0 id from scylla.local
|
||||
|
||||
@@ -1278,7 +1278,7 @@ public:
|
||||
static_assert(int(cdc::stream_state::current) < int(cdc::stream_state::closed));
|
||||
static_assert(int(cdc::stream_state::closed) < int(cdc::stream_state::opened));
|
||||
|
||||
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
|
||||
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
|
||||
co_await emit_stream_set(ts, cdc::stream_state::current, current);
|
||||
co_await emit_stream_set(ts, cdc::stream_state::closed, diff.closed_streams);
|
||||
co_await emit_stream_set(ts, cdc::stream_state::opened, diff.opened_streams);
|
||||
|
||||
@@ -8170,7 +8170,7 @@ future<> storage_service::query_cdc_timestamps(table_id table, bool ascending, n
|
||||
return _cdc_gens.local().query_cdc_timestamps(table, ascending, std::move(f));
|
||||
}
|
||||
|
||||
future<> storage_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
|
||||
future<> storage_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
|
||||
return _cdc_gens.local().query_cdc_streams(table, std::move(f));
|
||||
}
|
||||
|
||||
|
||||
@@ -358,7 +358,7 @@ public:
|
||||
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
|
||||
|
||||
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
|
||||
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
||||
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
||||
|
||||
private:
|
||||
inet_address get_broadcast_address() const noexcept {
|
||||
|
||||
@@ -2143,7 +2143,7 @@ SEASTAR_THREAD_TEST_CASE(test_construct_next_stream_set) {
|
||||
};
|
||||
|
||||
auto tokens_to_stream_ids = [&stream_id_for_token] (const stream_set& tokens) {
|
||||
std::vector<cdc::stream_id> stream_ids;
|
||||
utils::chunked_vector<cdc::stream_id> stream_ids;
|
||||
for (auto t : tokens) {
|
||||
stream_ids.push_back(stream_id_for_token(t));
|
||||
}
|
||||
@@ -2252,7 +2252,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_generate_stream_diff) {
|
||||
};
|
||||
|
||||
auto tokens_to_stream_ids = [&stream_id_for_token] (const stream_set& tokens) {
|
||||
std::vector<cdc::stream_id> stream_ids;
|
||||
utils::chunked_vector<cdc::stream_id> stream_ids;
|
||||
for (auto t : tokens) {
|
||||
stream_ids.push_back(stream_id_for_token(t));
|
||||
}
|
||||
@@ -2347,7 +2347,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_generate_stream_diff) {
|
||||
|
||||
struct cdc_gc_test_config {
|
||||
table_id table;
|
||||
std::vector<std::vector<cdc::stream_id>> streams;
|
||||
std::vector<utils::chunked_vector<cdc::stream_id>> streams;
|
||||
size_t new_base_stream;
|
||||
};
|
||||
|
||||
@@ -2463,11 +2463,11 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_gc_mutations) {
|
||||
// as the base and the history is empty
|
||||
|
||||
auto table = table_id(utils::UUID_gen::get_time_UUID());
|
||||
std::vector<cdc::stream_id> streams0;
|
||||
utils::chunked_vector<cdc::stream_id> streams0;
|
||||
for (auto t : {10, 20, 30}) {
|
||||
streams0.emplace_back(dht::token(t), 0);
|
||||
}
|
||||
std::vector<cdc::stream_id> streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)};
|
||||
utils::chunked_vector<cdc::stream_id> streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)};
|
||||
|
||||
cdc_gc_test_config test1 = {
|
||||
.table = table,
|
||||
@@ -2492,12 +2492,12 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_gc_mutations) {
|
||||
// as the base and one history entry for open 50
|
||||
|
||||
auto table = table_id(utils::UUID_gen::get_time_UUID());
|
||||
std::vector<cdc::stream_id> streams0;
|
||||
utils::chunked_vector<cdc::stream_id> streams0;
|
||||
for (auto t : {10, 20, 30}) {
|
||||
streams0.emplace_back(dht::token(t), 0);
|
||||
}
|
||||
std::vector<cdc::stream_id> streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)};
|
||||
std::vector<cdc::stream_id> streams2 = {streams0[0], streams0[2], streams1[2], cdc::stream_id(dht::token(50), 0)};
|
||||
utils::chunked_vector<cdc::stream_id> streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)};
|
||||
utils::chunked_vector<cdc::stream_id> streams2 = {streams0[0], streams0[2], streams1[2], cdc::stream_id(dht::token(50), 0)};
|
||||
|
||||
cdc_gc_test_config test2 = {
|
||||
.table = table,
|
||||
@@ -2525,7 +2525,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_gc_get_new_base) {
|
||||
auto tp = base_time + offset;
|
||||
auto ts = std::chrono::duration_cast<api::timestamp_clock::duration>(tp.time_since_epoch()).count();
|
||||
|
||||
streams_map[ts] = cdc::committed_stream_set{tp, std::vector<cdc::stream_id>{}};
|
||||
streams_map[ts] = cdc::committed_stream_set{tp, utils::chunked_vector<cdc::stream_id>{}};
|
||||
}
|
||||
return streams_map;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user