diff --git a/cdc/generation.cc b/cdc/generation.cc index ec3e3ab597..b2febfaf2f 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -1209,7 +1209,7 @@ future create_table_streams_mutation(table_id table, db_clock::time_po co_return std::move(m); } -future create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const std::vector& stream_ids, api::timestamp_type ts) { +future create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const utils::chunked_vector& stream_ids, api::timestamp_type ts) { auto s = db::system_keyspace::cdc_streams_state(); mutation m(s, partition_key::from_single_value(*s, @@ -1252,24 +1252,24 @@ future<> generation_service::load_cdc_tablet_streams(std::optional>(); } - auto read_streams_state = [this] (const std::optional>& tables, noncopyable_function(table_id, db_clock::time_point, std::vector)> f) -> future<> { + auto read_streams_state = [this] (const std::optional>& tables, noncopyable_function(table_id, db_clock::time_point, utils::chunked_vector)> f) -> future<> { if (tables) { for (auto table : *tables) { - co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, std::vector base_stream_set) -> future<> { + co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector base_stream_set) -> future<> { return f(table, base_ts, std::move(base_stream_set)); }); } } else { - co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, std::vector base_stream_set) -> future<> { + co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector base_stream_set) -> future<> { return f(table, base_ts, std::move(base_stream_set)); }); } }; - co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, std::vector base_stream_set) -> future<> { + co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, utils::chunked_vector base_stream_set) -> future<> { table_streams new_table_map; - auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, std::vector stream_set) { + auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, utils::chunked_vector stream_set) { auto ts = std::chrono::duration_cast(stream_tp.time_since_epoch()).count(); new_table_map[ts] = committed_stream_set {stream_tp, std::move(stream_set)}; }; @@ -1345,7 +1345,7 @@ future<> generation_service::query_cdc_timestamps(table_id table, bool ascending } } -future<> generation_service::query_cdc_streams(table_id table, noncopyable_function(db_clock::time_point, const std::vector& current, cdc::cdc_stream_diff)> f) { +future<> generation_service::query_cdc_streams(table_id table, noncopyable_function(db_clock::time_point, const utils::chunked_vector& current, cdc::cdc_stream_diff)> f) { const auto& all_tables = _cdc_metadata.get_all_tablet_streams(); auto table_it = all_tables.find(table); if (table_it == all_tables.end()) { @@ -1402,8 +1402,8 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector co_return; } - std::vector new_streams; - new_streams.reserve(new_tablet_map.tablet_count()); + utils::chunked_vector new_streams; + co_await utils::reserve_gently(new_streams, new_tablet_map.tablet_count()); for (auto tid : new_tablet_map.tablet_ids()) { new_streams.emplace_back(new_tablet_map.get_last_token(tid), 0); co_await coroutine::maybe_yield(); @@ -1425,7 +1425,7 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector muts.emplace_back(std::move(mut)); } -future> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const std::vector& base_stream_set, api::timestamp_type ts) { +future> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector& base_stream_set, api::timestamp_type ts) { utils::chunked_vector muts; muts.reserve(2); diff --git a/cdc/generation.hh b/cdc/generation.hh index 2fe1ea9f2f..ea6eb2ad34 100644 --- a/cdc/generation.hh +++ b/cdc/generation.hh @@ -143,12 +143,12 @@ stream_state read_stream_state(int8_t val); struct committed_stream_set { db_clock::time_point ts; - std::vector streams; + utils::chunked_vector streams; }; struct cdc_stream_diff { - std::vector closed_streams; - std::vector opened_streams; + utils::chunked_vector closed_streams; + utils::chunked_vector opened_streams; }; using table_streams = std::map; @@ -220,11 +220,11 @@ future> get_cdc_generation_mutations_v3( size_t mutation_size_threshold, api::timestamp_type mutation_timestamp); future create_table_streams_mutation(table_id, db_clock::time_point, const locator::tablet_map&, api::timestamp_type); -future create_table_streams_mutation(table_id, db_clock::time_point, const std::vector&, api::timestamp_type); +future create_table_streams_mutation(table_id, db_clock::time_point, const utils::chunked_vector&, api::timestamp_type); utils::chunked_vector make_drop_table_streams_mutations(table_id, api::timestamp_type ts); future get_switch_streams_mutation(table_id table, db_clock::time_point stream_ts, cdc_stream_diff diff, api::timestamp_type ts); -future> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const std::vector& base_stream_set, api::timestamp_type ts); +future> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector& base_stream_set, api::timestamp_type ts); table_streams::const_iterator get_new_base_for_gc(const table_streams&, std::chrono::seconds ttl); } // namespace cdc diff --git a/cdc/generation_service.hh b/cdc/generation_service.hh index dacb2582a5..f853ea04e8 100644 --- a/cdc/generation_service.hh +++ b/cdc/generation_service.hh @@ -149,7 +149,7 @@ public: future<> load_cdc_tablet_streams(std::optional> changed_tables); future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function(db_clock::time_point)> f); - future<> query_cdc_streams(table_id table, noncopyable_function(db_clock::time_point, const std::vector& current, cdc::cdc_stream_diff)> f); + future<> query_cdc_streams(table_id table, noncopyable_function(db_clock::time_point, const utils::chunked_vector& current, cdc::cdc_stream_diff)> f); future<> generate_tablet_resize_update(utils::chunked_vector& muts, table_id table, const locator::tablet_map& new_tablet_map, api::timestamp_type ts); diff --git a/cdc/metadata.cc b/cdc/metadata.cc index 1f9cecee04..a8b3a696cf 100644 --- a/cdc/metadata.cc +++ b/cdc/metadata.cc @@ -54,7 +54,7 @@ cdc::stream_id get_stream( } static cdc::stream_id get_stream( - const std::vector& streams, + const utils::chunked_vector& streams, dht::token tok) { if (streams.empty()) { on_internal_error(cdc_log, "get_stream: streams empty"); @@ -159,7 +159,7 @@ cdc::stream_id cdc::metadata::get_vnode_stream(api::timestamp_type ts, dht::toke return ret; } -const std::vector& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const { +const utils::chunked_vector& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const { auto now = api::new_timestamp(); if (ts > now + get_generation_leeway().count()) { throw exceptions::invalid_request_exception(seastar::format( @@ -259,10 +259,10 @@ bool cdc::metadata::prepare(db_clock::time_point tp) { return !it->second; } -future> cdc::metadata::construct_next_stream_set( - const std::vector& prev_stream_set, - std::vector opened, - const std::vector& closed) { +future> cdc::metadata::construct_next_stream_set( + const utils::chunked_vector& prev_stream_set, + utils::chunked_vector opened, + const utils::chunked_vector& closed) { if (closed.size() == prev_stream_set.size()) { // all previous streams are closed, so the next stream set is just the opened streams. @@ -273,8 +273,8 @@ future> cdc::metadata::construct_next_stream_set( // streams and removing the closed streams. we assume each stream set is // sorted by token, and the result is sorted as well. - std::vector next_stream_set; - next_stream_set.reserve(prev_stream_set.size() + opened.size() - closed.size()); + utils::chunked_vector next_stream_set; + co_await utils::reserve_gently(next_stream_set, prev_stream_set.size() + opened.size() - closed.size()); auto next_prev = prev_stream_set.begin(); auto next_closed = closed.begin(); @@ -318,8 +318,8 @@ std::vector cdc::metadata::get_tables_with_cdc_tablet_streams() const return _tablet_streams | std::views::keys | std::ranges::to>(); } -future cdc::metadata::generate_stream_diff(const std::vector& before, const std::vector& after) { - std::vector closed, opened; +future cdc::metadata::generate_stream_diff(const utils::chunked_vector& before, const utils::chunked_vector& after) { + utils::chunked_vector closed, opened; auto before_it = before.begin(); auto after_it = after.begin(); diff --git a/cdc/metadata.hh b/cdc/metadata.hh index 841ff53439..35c82b85ee 100644 --- a/cdc/metadata.hh +++ b/cdc/metadata.hh @@ -49,7 +49,7 @@ class metadata final { container_t::const_iterator gen_used_at(api::timestamp_type ts) const; - const std::vector& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const; + const utils::chunked_vector& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const; public: /* Is a generation with the given timestamp already known or obsolete? It is obsolete if and only if @@ -111,14 +111,14 @@ public: std::vector get_tables_with_cdc_tablet_streams() const; - static future> construct_next_stream_set( - const std::vector& prev_stream_set, - std::vector opened, - const std::vector& closed); + static future> construct_next_stream_set( + const utils::chunked_vector& prev_stream_set, + utils::chunked_vector opened, + const utils::chunked_vector& closed); static future generate_stream_diff( - const std::vector& before, - const std::vector& after); + const utils::chunked_vector& before, + const utils::chunked_vector& after); }; diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 0ec0c96de0..7180d9f863 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -2463,14 +2463,14 @@ future system_keyspace::cdc_is_rewritten() { } future<> system_keyspace::read_cdc_streams_state(std::optional table, - noncopyable_function(table_id, db_clock::time_point, std::vector)> f) { + noncopyable_function(table_id, db_clock::time_point, utils::chunked_vector)> f) { static const sstring all_tables_query = format("SELECT table_id, timestamp, stream_id FROM {}.{}", NAME, CDC_STREAMS_STATE); static const sstring single_table_query = format("SELECT table_id, timestamp, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_STATE); struct cur_t { table_id tid; db_clock::time_point ts; - std::vector streams; + utils::chunked_vector streams; }; std::optional cur; @@ -2487,7 +2487,7 @@ future<> system_keyspace::read_cdc_streams_state(std::optional table, if (cur) { co_await f(cur->tid, cur->ts, std::move(cur->streams)); } - cur = { tid, ts, std::vector() }; + cur = { tid, ts, utils::chunked_vector() }; } cur->streams.push_back(std::move(stream_id)); diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index b50a18a7bc..b00d382a44 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -601,7 +601,7 @@ public: future cdc_is_rewritten(); future<> cdc_set_rewritten(std::optional); - future<> read_cdc_streams_state(std::optional table, noncopyable_function(table_id, db_clock::time_point, std::vector)> f); + future<> read_cdc_streams_state(std::optional table, noncopyable_function(table_id, db_clock::time_point, utils::chunked_vector)> f); future<> read_cdc_streams_history(table_id table, std::optional from, noncopyable_function(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f); // Load Raft Group 0 id from scylla.local diff --git a/db/virtual_tables.cc b/db/virtual_tables.cc index bfbb38fd02..89e421d7e3 100644 --- a/db/virtual_tables.cc +++ b/db/virtual_tables.cc @@ -1278,7 +1278,7 @@ public: static_assert(int(cdc::stream_state::current) < int(cdc::stream_state::closed)); static_assert(int(cdc::stream_state::closed) < int(cdc::stream_state::opened)); - co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const std::vector& current, cdc::cdc_stream_diff diff) -> future<> { + co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const utils::chunked_vector& current, cdc::cdc_stream_diff diff) -> future<> { co_await emit_stream_set(ts, cdc::stream_state::current, current); co_await emit_stream_set(ts, cdc::stream_state::closed, diff.closed_streams); co_await emit_stream_set(ts, cdc::stream_state::opened, diff.opened_streams); diff --git a/service/storage_service.cc b/service/storage_service.cc index 124a01b733..494bcf55c8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -8170,7 +8170,7 @@ future<> storage_service::query_cdc_timestamps(table_id table, bool ascending, n return _cdc_gens.local().query_cdc_timestamps(table, ascending, std::move(f)); } -future<> storage_service::query_cdc_streams(table_id table, noncopyable_function(db_clock::time_point, const std::vector& current, cdc::cdc_stream_diff)> f) { +future<> storage_service::query_cdc_streams(table_id table, noncopyable_function(db_clock::time_point, const utils::chunked_vector& current, cdc::cdc_stream_diff)> f) { return _cdc_gens.local().query_cdc_streams(table, std::move(f)); } diff --git a/service/storage_service.hh b/service/storage_service.hh index ed9782c032..f6c62574cb 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -358,7 +358,7 @@ public: std::vector get_tables_with_cdc_tablet_streams() const; future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function(db_clock::time_point)> f); - future<> query_cdc_streams(table_id table, noncopyable_function(db_clock::time_point, const std::vector& current, cdc::cdc_stream_diff)> f); + future<> query_cdc_streams(table_id table, noncopyable_function(db_clock::time_point, const utils::chunked_vector& current, cdc::cdc_stream_diff)> f); private: inet_address get_broadcast_address() const noexcept { diff --git a/test/boost/cdc_test.cc b/test/boost/cdc_test.cc index 664529bb5f..40d844b947 100644 --- a/test/boost/cdc_test.cc +++ b/test/boost/cdc_test.cc @@ -2143,7 +2143,7 @@ SEASTAR_THREAD_TEST_CASE(test_construct_next_stream_set) { }; auto tokens_to_stream_ids = [&stream_id_for_token] (const stream_set& tokens) { - std::vector stream_ids; + utils::chunked_vector stream_ids; for (auto t : tokens) { stream_ids.push_back(stream_id_for_token(t)); } @@ -2252,7 +2252,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_generate_stream_diff) { }; auto tokens_to_stream_ids = [&stream_id_for_token] (const stream_set& tokens) { - std::vector stream_ids; + utils::chunked_vector stream_ids; for (auto t : tokens) { stream_ids.push_back(stream_id_for_token(t)); } @@ -2347,7 +2347,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_generate_stream_diff) { struct cdc_gc_test_config { table_id table; - std::vector> streams; + std::vector> streams; size_t new_base_stream; }; @@ -2463,11 +2463,11 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_gc_mutations) { // as the base and the history is empty auto table = table_id(utils::UUID_gen::get_time_UUID()); - std::vector streams0; + utils::chunked_vector streams0; for (auto t : {10, 20, 30}) { streams0.emplace_back(dht::token(t), 0); } - std::vector streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)}; + utils::chunked_vector streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)}; cdc_gc_test_config test1 = { .table = table, @@ -2492,12 +2492,12 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_gc_mutations) { // as the base and one history entry for open 50 auto table = table_id(utils::UUID_gen::get_time_UUID()); - std::vector streams0; + utils::chunked_vector streams0; for (auto t : {10, 20, 30}) { streams0.emplace_back(dht::token(t), 0); } - std::vector streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)}; - std::vector streams2 = {streams0[0], streams0[2], streams1[2], cdc::stream_id(dht::token(50), 0)}; + utils::chunked_vector streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)}; + utils::chunked_vector streams2 = {streams0[0], streams0[2], streams1[2], cdc::stream_id(dht::token(50), 0)}; cdc_gc_test_config test2 = { .table = table, @@ -2525,7 +2525,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_gc_get_new_base) { auto tp = base_time + offset; auto ts = std::chrono::duration_cast(tp.time_since_epoch()).count(); - streams_map[ts] = cdc::committed_stream_set{tp, std::vector{}}; + streams_map[ts] = cdc::committed_stream_set{tp, utils::chunked_vector{}}; } return streams_map; };