From 5dd15aa3c888438e8d9bde60e320e2970611fd7a Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 5 Sep 2022 15:58:11 +0300 Subject: [PATCH] tombstone_gc: introduce tombstone_gc_state and use it to access the repair history maps. At this introductory patch, we use default-constructed tombstone_gc_state to access the thread-local maps temporarily and those use sites will be replaced in following patches that will gradually pass the tombstone_gc_state down from the compaction_manager to where it's used. Signed-off-by: Benny Halevy --- db/view/view.cc | 3 ++- mutation_compactor.hh | 3 ++- mutation_partition.cc | 14 +++++++++----- mutation_partition.hh | 4 +++- repair/row_level.cc | 6 ++++-- replica/database.cc | 3 ++- sstables/sstables.cc | 6 ++++-- tombstone_gc.cc | 26 +++++++++++++++++++------- tombstone_gc.hh | 40 ++++++++++++++++++++++++++++------------ 9 files changed, 73 insertions(+), 32 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 33eaa085af..ee067b4029 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1169,7 +1169,8 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional } auto dk = dht::decorate_key(*_schema, _key); - auto gc_before = ::get_gc_before_for_key(_schema, dk, _now); + auto gc_state = tombstone_gc_state(); // FIXME: for now + auto gc_before = gc_state.get_gc_before_for_key(_schema, dk, _now); // We allow existing to be disengaged, which we treat the same as an empty row. if (existing) { diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 80a56cf9ad..4031694fad 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -140,6 +140,7 @@ class compact_mutation_state { uint64_t _row_limit{}; uint32_t _partition_limit{}; uint64_t _partition_row_limit{}; + tombstone_gc_state _tombstone_gc_state; // FIXME: default-constructed for now tombstone _partition_tombstone; @@ -239,7 +240,7 @@ private: return _gc_before.value(); } else { if (_dk) { - _gc_before = ::get_gc_before_for_key(_schema.shared_from_this(), *_dk, _query_time); + _gc_before = _tombstone_gc_state.get_gc_before_for_key(_schema.shared_from_this(), *_dk, _query_time); return _gc_before.value(); } else { return gc_clock::time_point::min(); diff --git a/mutation_partition.cc b/mutation_partition.cc index c25ad3052e..06a94ec63d 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1401,13 +1401,14 @@ uint32_t mutation_partition::do_compact(const schema& s, bool reverse, uint64_t row_limit, can_gc_fn& can_gc, - bool drop_tombstones_unconditionally) + bool drop_tombstones_unconditionally, + const tombstone_gc_state& gc_state) { check_schema(s); assert(row_limit > 0); auto gc_before = drop_tombstones_unconditionally ? gc_clock::time_point::max() : - ::get_gc_before_for_key(s.shared_from_this(), dk, query_time); + gc_state.get_gc_before_for_key(s.shared_from_this(), dk, query_time); auto should_purge_tombstone = [&] (const tombstone& t) { return t.deletion_time < gc_before && can_gc(t); @@ -1468,7 +1469,9 @@ mutation_partition::compact_for_query( { check_schema(s); bool drop_tombstones_unconditionally = false; - return do_compact(s, dk, query_time, row_ranges, always_return_static_content, reverse, row_limit, always_gc, drop_tombstones_unconditionally); + // Replicas should only send non-purgeable tombstones already, + // so we can expect to not have to actually purge any tombstones here. + return do_compact(s, dk, query_time, row_ranges, always_return_static_content, reverse, row_limit, always_gc, drop_tombstones_unconditionally, tombstone_gc_state(nullptr)); } void mutation_partition::compact_for_compaction(const schema& s, @@ -1480,7 +1483,8 @@ void mutation_partition::compact_for_compaction(const schema& s, }; bool drop_tombstones_unconditionally = false; - do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, can_gc, drop_tombstones_unconditionally); + auto gc_state = tombstone_gc_state(); // FIXME: for now + do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, can_gc, drop_tombstones_unconditionally, gc_state); } void mutation_partition::compact_for_compaction_drop_tombstones_unconditionally(const schema& s, const dht::decorated_key& dk) @@ -1491,7 +1495,7 @@ void mutation_partition::compact_for_compaction_drop_tombstones_unconditionally( }; bool drop_tombstones_unconditionally = true; auto compaction_time = gc_clock::time_point::max(); - do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, always_gc, drop_tombstones_unconditionally); + do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, always_gc, drop_tombstones_unconditionally, tombstone_gc_state(nullptr)); } // Returns true if the mutation_partition represents no writes. diff --git a/mutation_partition.hh b/mutation_partition.hh index 5b12345ba7..f61ae7118d 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -32,6 +32,7 @@ #include "utils/managed_ref.hh" #include "utils/compact-radix-tree.hh" #include "utils/immutable-collection.hh" +#include "tombstone_gc.hh" class mutation_fragment; class mutation_partition_view; @@ -1317,7 +1318,8 @@ private: bool reverse, uint64_t row_limit, can_gc_fn&, - bool drop_tombstones_unconditionally); + bool drop_tombstones_unconditionally, + const tombstone_gc_state& gc_state); // Calls func for each row entry inside row_ranges until func returns stop_iteration::yes. // Removes all entries for which func didn't return stop_iteration::no or wasn't called at all. diff --git a/repair/row_level.cc b/repair/row_level.cc index 00baeb9494..5fb0a485ff 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2101,7 +2101,8 @@ future repair_service::repair_update_system throw std::runtime_error(format("repair[{}]: range {} is not in the format of (start, end]", req.repair_uuid, req.range)); } co_await db.invoke_on_all([&req] (replica::database& local_db) { - return ::update_repair_time(req.table_uuid, req.range, req.repair_time); + auto gc_state = tombstone_gc_state(); // FIXME: for now + return gc_state.update_repair_time(req.table_uuid, req.range, req.repair_time); }); db::system_keyspace::repair_history_entry ent; ent.id = req.repair_uuid; @@ -3009,7 +3010,8 @@ future<> repair_service::load_history() { entry.ks, entry.cf, entry.table_uuid, entry.ts, range); try { co_await get_db().invoke_on_all([table_uuid = entry.table_uuid, range, repair_time] (replica::database& local_db) { - ::update_repair_time(table_uuid, range, repair_time); + auto gc_state = tombstone_gc_state(); // FIXME: for now + gc_state.update_repair_time(table_uuid, range, repair_time); }); } catch (...) { rlogger.warn("Failed to update repair history time for keyspace={}, table={}, range={}, repair_time={}", diff --git a/replica/database.cc b/replica/database.cc index 998e55af43..f5c5c9c848 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2497,7 +2497,8 @@ future<> database::truncate(column_family& cf, const table_truncate_state& st, d cf.cache_truncation_record(truncated_at); co_await db::system_keyspace::save_truncation_record(cf, truncated_at, rp); - drop_repair_history_map_for_table(uuid); + auto gc_state = tombstone_gc_state(); // FIXME: for now + gc_state.drop_repair_history_map_for_table(uuid); } const sstring& database::get_snitch_name() const { diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 00198d19f6..65b1a84b2c 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -3300,7 +3300,8 @@ gc_clock::time_point sstable::get_gc_before_for_drop_estimation(const gc_clock:: auto end = get_last_decorated_key().token(); auto range = dht::token_range(dht::token_range::bound(start, true), dht::token_range::bound(end, true)); sstlog.trace("sstable={}, ks={}, cf={}, range={}, estimate", get_filename(), s->ks_name(), s->cf_name(), range); - return ::get_gc_before_for_range(s, range, compaction_time).max_gc_before; + auto gc_state = tombstone_gc_state(); // FIXME: for now + return gc_state.get_gc_before_for_range(s, range, compaction_time).max_gc_before; } // If the sstable contains any regular live cells, we can not drop the sstable. @@ -3325,7 +3326,8 @@ gc_clock::time_point sstable::get_gc_before_for_fully_expire(const gc_clock::tim auto range = dht::token_range(dht::token_range::bound(start, true), dht::token_range::bound(end, true)); sstlog.trace("sstable={}, ks={}, cf={}, range={}, get_max_local_deletion_time={}, min_timestamp={}, gc_grace_seconds={}, query", get_filename(), s->ks_name(), s->cf_name(), range, deletion_time, get_stats_metadata().min_timestamp, s->gc_grace_seconds().count()); - auto res = ::get_gc_before_for_range(s, range, compaction_time); + auto gc_state = tombstone_gc_state(); // FIXME: for now + auto res = gc_state.get_gc_before_for_range(s, range, compaction_time); return res.knows_entire_range ? res.min_gc_before : gc_clock::time_point::min(); } diff --git a/tombstone_gc.cc b/tombstone_gc.cc index b9119dc73e..9dbe54de4e 100644 --- a/tombstone_gc.cc +++ b/tombstone_gc.cc @@ -29,7 +29,15 @@ public: thread_local std::unordered_map> repair_history_maps; -static seastar::lw_shared_ptr get_or_create_repair_history_map_for_table(const table_id& id) { +tombstone_gc_state::tombstone_gc_state() noexcept + : _repair_history_maps(&repair_history_maps) +{ } + +seastar::lw_shared_ptr tombstone_gc_state::get_or_create_repair_history_map_for_table(const table_id& id) { + if (!_repair_history_maps) { + return {}; + } + auto& repair_history_maps = *_repair_history_maps; auto it = repair_history_maps.find(id); if (it != repair_history_maps.end()) { return it->second; @@ -39,7 +47,11 @@ static seastar::lw_shared_ptr get_or_create_repair_history_m } } -seastar::lw_shared_ptr get_repair_history_map_for_table(const table_id& id) { +seastar::lw_shared_ptr tombstone_gc_state::get_repair_history_map_for_table(const table_id& id) const { + if (!_repair_history_maps) { + return {}; + } + auto& repair_history_maps = *_repair_history_maps; auto it = repair_history_maps.find(id); if (it != repair_history_maps.end()) { return it->second; @@ -48,8 +60,8 @@ seastar::lw_shared_ptr get_repair_history_map_for_table(cons } } -void drop_repair_history_map_for_table(const table_id& id) { - repair_history_maps.erase(id); +void tombstone_gc_state::drop_repair_history_map_for_table(const table_id& id) { + _repair_history_maps->erase(id); } // This is useful for a sstable to query a gc_before for a range. The range is @@ -60,7 +72,7 @@ void drop_repair_history_map_for_table(const table_id& id) { // The knows_entire_range is set to true: // 1) if the tombstone_gc_mode is not repair, since we have the same value for all the keys in the ranges. // 2) if the tombstone_gc_mode is repair, and the range is a sub range of a range in the repair history map. -get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time) { +tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time) const { bool knows_entire_range = true; const auto& options = s->tombstone_gc_options(); switch (options.mode()) { @@ -118,7 +130,7 @@ get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht:: std::abort(); } -gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time) { +gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time) const { // if mode = timeout // default option, if user does not specify tombstone_gc options // if mode = disabled // never gc tombstone // if mode = immediate // can gc tombstone immediately @@ -155,7 +167,7 @@ gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_ke std::abort(); } -void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time) { +void tombstone_gc_state::update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time) { auto m = get_or_create_repair_history_map_for_table(id); m->map += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time); } diff --git a/tombstone_gc.hh b/tombstone_gc.hh index 66a38bc635..d8e4b00258 100644 --- a/tombstone_gc.hh +++ b/tombstone_gc.hh @@ -29,20 +29,36 @@ class database; } +class repair_history_map; +using per_table_history_maps = std::unordered_map>; + class tombstone_gc_options; -struct get_gc_before_for_range_result { - gc_clock::time_point min_gc_before; - gc_clock::time_point max_gc_before; - bool knows_entire_range; +class tombstone_gc_state { + per_table_history_maps* _repair_history_maps; +public: + tombstone_gc_state() noexcept; + tombstone_gc_state(per_table_history_maps* maps) noexcept : _repair_history_maps(maps) {} + + explicit operator bool() const noexcept { + return _repair_history_maps != nullptr; + } + + seastar::lw_shared_ptr get_repair_history_map_for_table(const table_id& id) const; + seastar::lw_shared_ptr get_or_create_repair_history_map_for_table(const table_id& id); + void drop_repair_history_map_for_table(const table_id& id); + + struct get_gc_before_for_range_result { + gc_clock::time_point min_gc_before; + gc_clock::time_point max_gc_before; + bool knows_entire_range; + }; + + get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time) const; + + gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time) const; + + void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time); }; -void drop_repair_history_map_for_table(const table_id& id); - -get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time); - -gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time); - -void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time); - void validate_tombstone_gc_options(const tombstone_gc_options* options, data_dictionary::database db, sstring ks_name);