tombstone_gc: introduce tombstone_gc_state

and use it to access the repair history maps.

At this introductory patch, we use default-constructed
tombstone_gc_state to access the thread-local maps
temporarily and those use sites will be replaced
in following patches that will gradually pass
the tombstone_gc_state down from the compaction_manager
to where it's used.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-09-05 15:58:11 +03:00
parent b2b211568e
commit 5dd15aa3c8
9 changed files with 73 additions and 32 deletions

View File

@@ -1169,7 +1169,8 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional
}
auto dk = dht::decorate_key(*_schema, _key);
auto gc_before = ::get_gc_before_for_key(_schema, dk, _now);
auto gc_state = tombstone_gc_state(); // FIXME: for now
auto gc_before = gc_state.get_gc_before_for_key(_schema, dk, _now);
// We allow existing to be disengaged, which we treat the same as an empty row.
if (existing) {

View File

@@ -140,6 +140,7 @@ class compact_mutation_state {
uint64_t _row_limit{};
uint32_t _partition_limit{};
uint64_t _partition_row_limit{};
tombstone_gc_state _tombstone_gc_state; // FIXME: default-constructed for now
tombstone _partition_tombstone;
@@ -239,7 +240,7 @@ private:
return _gc_before.value();
} else {
if (_dk) {
_gc_before = ::get_gc_before_for_key(_schema.shared_from_this(), *_dk, _query_time);
_gc_before = _tombstone_gc_state.get_gc_before_for_key(_schema.shared_from_this(), *_dk, _query_time);
return _gc_before.value();
} else {
return gc_clock::time_point::min();

View File

@@ -1401,13 +1401,14 @@ uint32_t mutation_partition::do_compact(const schema& s,
bool reverse,
uint64_t row_limit,
can_gc_fn& can_gc,
bool drop_tombstones_unconditionally)
bool drop_tombstones_unconditionally,
const tombstone_gc_state& gc_state)
{
check_schema(s);
assert(row_limit > 0);
auto gc_before = drop_tombstones_unconditionally ? gc_clock::time_point::max() :
::get_gc_before_for_key(s.shared_from_this(), dk, query_time);
gc_state.get_gc_before_for_key(s.shared_from_this(), dk, query_time);
auto should_purge_tombstone = [&] (const tombstone& t) {
return t.deletion_time < gc_before && can_gc(t);
@@ -1468,7 +1469,9 @@ mutation_partition::compact_for_query(
{
check_schema(s);
bool drop_tombstones_unconditionally = false;
return do_compact(s, dk, query_time, row_ranges, always_return_static_content, reverse, row_limit, always_gc, drop_tombstones_unconditionally);
// Replicas should only send non-purgeable tombstones already,
// so we can expect to not have to actually purge any tombstones here.
return do_compact(s, dk, query_time, row_ranges, always_return_static_content, reverse, row_limit, always_gc, drop_tombstones_unconditionally, tombstone_gc_state(nullptr));
}
void mutation_partition::compact_for_compaction(const schema& s,
@@ -1480,7 +1483,8 @@ void mutation_partition::compact_for_compaction(const schema& s,
};
bool drop_tombstones_unconditionally = false;
do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, can_gc, drop_tombstones_unconditionally);
auto gc_state = tombstone_gc_state(); // FIXME: for now
do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, can_gc, drop_tombstones_unconditionally, gc_state);
}
void mutation_partition::compact_for_compaction_drop_tombstones_unconditionally(const schema& s, const dht::decorated_key& dk)
@@ -1491,7 +1495,7 @@ void mutation_partition::compact_for_compaction_drop_tombstones_unconditionally(
};
bool drop_tombstones_unconditionally = true;
auto compaction_time = gc_clock::time_point::max();
do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, always_gc, drop_tombstones_unconditionally);
do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, always_gc, drop_tombstones_unconditionally, tombstone_gc_state(nullptr));
}
// Returns true if the mutation_partition represents no writes.

View File

@@ -32,6 +32,7 @@
#include "utils/managed_ref.hh"
#include "utils/compact-radix-tree.hh"
#include "utils/immutable-collection.hh"
#include "tombstone_gc.hh"
class mutation_fragment;
class mutation_partition_view;
@@ -1317,7 +1318,8 @@ private:
bool reverse,
uint64_t row_limit,
can_gc_fn&,
bool drop_tombstones_unconditionally);
bool drop_tombstones_unconditionally,
const tombstone_gc_state& gc_state);
// Calls func for each row entry inside row_ranges until func returns stop_iteration::yes.
// Removes all entries for which func didn't return stop_iteration::no or wasn't called at all.

View File

@@ -2101,7 +2101,8 @@ future<repair_update_system_table_response> repair_service::repair_update_system
throw std::runtime_error(format("repair[{}]: range {} is not in the format of (start, end]", req.repair_uuid, req.range));
}
co_await db.invoke_on_all([&req] (replica::database& local_db) {
return ::update_repair_time(req.table_uuid, req.range, req.repair_time);
auto gc_state = tombstone_gc_state(); // FIXME: for now
return gc_state.update_repair_time(req.table_uuid, req.range, req.repair_time);
});
db::system_keyspace::repair_history_entry ent;
ent.id = req.repair_uuid;
@@ -3009,7 +3010,8 @@ future<> repair_service::load_history() {
entry.ks, entry.cf, entry.table_uuid, entry.ts, range);
try {
co_await get_db().invoke_on_all([table_uuid = entry.table_uuid, range, repair_time] (replica::database& local_db) {
::update_repair_time(table_uuid, range, repair_time);
auto gc_state = tombstone_gc_state(); // FIXME: for now
gc_state.update_repair_time(table_uuid, range, repair_time);
});
} catch (...) {
rlogger.warn("Failed to update repair history time for keyspace={}, table={}, range={}, repair_time={}",

View File

@@ -2497,7 +2497,8 @@ future<> database::truncate(column_family& cf, const table_truncate_state& st, d
cf.cache_truncation_record(truncated_at);
co_await db::system_keyspace::save_truncation_record(cf, truncated_at, rp);
drop_repair_history_map_for_table(uuid);
auto gc_state = tombstone_gc_state(); // FIXME: for now
gc_state.drop_repair_history_map_for_table(uuid);
}
const sstring& database::get_snitch_name() const {

View File

@@ -3300,7 +3300,8 @@ gc_clock::time_point sstable::get_gc_before_for_drop_estimation(const gc_clock::
auto end = get_last_decorated_key().token();
auto range = dht::token_range(dht::token_range::bound(start, true), dht::token_range::bound(end, true));
sstlog.trace("sstable={}, ks={}, cf={}, range={}, estimate", get_filename(), s->ks_name(), s->cf_name(), range);
return ::get_gc_before_for_range(s, range, compaction_time).max_gc_before;
auto gc_state = tombstone_gc_state(); // FIXME: for now
return gc_state.get_gc_before_for_range(s, range, compaction_time).max_gc_before;
}
// If the sstable contains any regular live cells, we can not drop the sstable.
@@ -3325,7 +3326,8 @@ gc_clock::time_point sstable::get_gc_before_for_fully_expire(const gc_clock::tim
auto range = dht::token_range(dht::token_range::bound(start, true), dht::token_range::bound(end, true));
sstlog.trace("sstable={}, ks={}, cf={}, range={}, get_max_local_deletion_time={}, min_timestamp={}, gc_grace_seconds={}, query",
get_filename(), s->ks_name(), s->cf_name(), range, deletion_time, get_stats_metadata().min_timestamp, s->gc_grace_seconds().count());
auto res = ::get_gc_before_for_range(s, range, compaction_time);
auto gc_state = tombstone_gc_state(); // FIXME: for now
auto res = gc_state.get_gc_before_for_range(s, range, compaction_time);
return res.knows_entire_range ? res.min_gc_before : gc_clock::time_point::min();
}

View File

@@ -29,7 +29,15 @@ public:
thread_local std::unordered_map<table_id, seastar::lw_shared_ptr<repair_history_map>> repair_history_maps;
static seastar::lw_shared_ptr<repair_history_map> get_or_create_repair_history_map_for_table(const table_id& id) {
tombstone_gc_state::tombstone_gc_state() noexcept
: _repair_history_maps(&repair_history_maps)
{ }
seastar::lw_shared_ptr<repair_history_map> tombstone_gc_state::get_or_create_repair_history_map_for_table(const table_id& id) {
if (!_repair_history_maps) {
return {};
}
auto& repair_history_maps = *_repair_history_maps;
auto it = repair_history_maps.find(id);
if (it != repair_history_maps.end()) {
return it->second;
@@ -39,7 +47,11 @@ static seastar::lw_shared_ptr<repair_history_map> get_or_create_repair_history_m
}
}
seastar::lw_shared_ptr<repair_history_map> get_repair_history_map_for_table(const table_id& id) {
seastar::lw_shared_ptr<repair_history_map> tombstone_gc_state::get_repair_history_map_for_table(const table_id& id) const {
if (!_repair_history_maps) {
return {};
}
auto& repair_history_maps = *_repair_history_maps;
auto it = repair_history_maps.find(id);
if (it != repair_history_maps.end()) {
return it->second;
@@ -48,8 +60,8 @@ seastar::lw_shared_ptr<repair_history_map> get_repair_history_map_for_table(cons
}
}
void drop_repair_history_map_for_table(const table_id& id) {
repair_history_maps.erase(id);
void tombstone_gc_state::drop_repair_history_map_for_table(const table_id& id) {
_repair_history_maps->erase(id);
}
// This is useful for a sstable to query a gc_before for a range. The range is
@@ -60,7 +72,7 @@ void drop_repair_history_map_for_table(const table_id& id) {
// The knows_entire_range is set to true:
// 1) if the tombstone_gc_mode is not repair, since we have the same value for all the keys in the ranges.
// 2) if the tombstone_gc_mode is repair, and the range is a sub range of a range in the repair history map.
get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time) {
tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time) const {
bool knows_entire_range = true;
const auto& options = s->tombstone_gc_options();
switch (options.mode()) {
@@ -118,7 +130,7 @@ get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::
std::abort();
}
gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time) {
gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time) const {
// if mode = timeout // default option, if user does not specify tombstone_gc options
// if mode = disabled // never gc tombstone
// if mode = immediate // can gc tombstone immediately
@@ -155,7 +167,7 @@ gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_ke
std::abort();
}
void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time) {
void tombstone_gc_state::update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time) {
auto m = get_or_create_repair_history_map_for_table(id);
m->map += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time);
}

View File

@@ -29,20 +29,36 @@ class database;
}
class repair_history_map;
using per_table_history_maps = std::unordered_map<table_id, seastar::lw_shared_ptr<repair_history_map>>;
class tombstone_gc_options;
struct get_gc_before_for_range_result {
gc_clock::time_point min_gc_before;
gc_clock::time_point max_gc_before;
bool knows_entire_range;
class tombstone_gc_state {
per_table_history_maps* _repair_history_maps;
public:
tombstone_gc_state() noexcept;
tombstone_gc_state(per_table_history_maps* maps) noexcept : _repair_history_maps(maps) {}
explicit operator bool() const noexcept {
return _repair_history_maps != nullptr;
}
seastar::lw_shared_ptr<repair_history_map> get_repair_history_map_for_table(const table_id& id) const;
seastar::lw_shared_ptr<repair_history_map> get_or_create_repair_history_map_for_table(const table_id& id);
void drop_repair_history_map_for_table(const table_id& id);
struct get_gc_before_for_range_result {
gc_clock::time_point min_gc_before;
gc_clock::time_point max_gc_before;
bool knows_entire_range;
};
get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time) const;
gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time) const;
void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time);
};
void drop_repair_history_map_for_table(const table_id& id);
get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time);
gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time);
void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time);
void validate_tombstone_gc_options(const tombstone_gc_options* options, data_dictionary::database db, sstring ks_name);