diff --git a/CMakeLists.txt b/CMakeLists.txt index a2a17416b5..7be172ca31 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -537,6 +537,8 @@ set(scylla_sources raft/tracker.cc range_tombstone.cc range_tombstone_list.cc + tombstone_gc_options.cc + tombstone_gc.cc reader_concurrency_semaphore.cc redis/abstract_command.cc redis/command_factory.cc diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 5d054ba265..04e9032bbc 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -76,6 +76,7 @@ #include "utils/UUID_gen.hh" #include "utils/utf8.hh" #include "utils/fmt-compat.hh" +#include "tombstone_gc.hh" namespace sstables { @@ -643,7 +644,7 @@ private: void setup() { auto ssts = make_lw_shared(make_sstable_set_for_input()); formatted_sstables_list formatted_msg; - auto fully_expired = _table_s.fully_expired_sstables(_sstables); + auto fully_expired = _table_s.fully_expired_sstables(_sstables, gc_clock::now()); min_max_tracker timestamp_tracker; for (auto& sst : _sstables) { @@ -733,7 +734,6 @@ private: max_purgeable_func(), get_compacted_fragments_writer(), noop_compacted_fragments_consumer()); - reader.consume_in_thread(std::move(cfc)); }); }); @@ -1746,7 +1746,7 @@ compact_sstables(sstables::compaction_descriptor descriptor, compaction_data& cd } std::unordered_set -get_fully_expired_sstables(const table_state& table_s, const std::vector& compacting, gc_clock::time_point gc_before) { +get_fully_expired_sstables(const table_state& table_s, const std::vector& compacting, gc_clock::time_point compaction_time) { clogger.debug("Checking droppable sstables in {}.{}", table_s.schema()->ks_name(), table_s.schema()->cf_name()); if (compacting.empty()) { @@ -1760,6 +1760,7 @@ get_fully_expired_sstables(const table_state& table_s, const std::vector::max(); for (auto& sstable : overlapping) { + auto gc_before = sstable->get_gc_before_for_fully_expire(compaction_time); if (sstable->get_max_local_deletion_time() >= gc_before) { min_timestamp = std::min(min_timestamp, sstable->get_stats_metadata().min_timestamp); } @@ -1778,6 +1779,7 @@ get_fully_expired_sstables(const table_state& table_s, const std::vectorget_gc_before_for_fully_expire(compaction_time); clogger.debug("Checking if candidate of generation {} and max_deletion_time {} is expired, gc_before is {}", candidate->generation(), candidate->get_stats_metadata().max_local_deletion_time, gc_before); // A fully expired sstable which has an ancestor undeleted shouldn't be compacted because @@ -1798,11 +1800,12 @@ get_fully_expired_sstables(const table_state& table_s, const std::vectorget_stats_metadata().max_timestamp >= min_timestamp) { it = candidates.erase(it); } else { - clogger.debug("Dropping expired SSTable {} (maxLocalDeletionTime={}, gcBefore={})", - candidate->get_filename(), candidate->get_stats_metadata().max_local_deletion_time, gc_before); + clogger.debug("Dropping expired SSTable {} (maxLocalDeletionTime={})", + candidate->get_filename(), candidate->get_stats_metadata().max_local_deletion_time); it++; } } + clogger.debug("Checking droppable sstables in {}.{}, candidates={}", table_s.schema()->ks_name(), table_s.schema()->cf_name(), candidates.size()); return candidates; } diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index 26a056379b..2eaf9c48eb 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -68,7 +68,7 @@ compaction_descriptor compaction_strategy_impl::get_major_compaction_job(table_s return compaction_descriptor(std::move(candidates), table_s.get_sstable_set(), service::get_local_compaction_priority()); } -bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point gc_before) { +bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point compaction_time) { if (_disable_tombstone_compaction) { return false; } @@ -79,6 +79,7 @@ bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& s if (db_clock::now()-_tombstone_compaction_interval < sst->data_file_write_time()) { return false; } + auto gc_before = sst->get_gc_before_for_drop_estimation(compaction_time); return sst->estimate_droppable_tombstone_ratio(gc_before) >= _tombstone_threshold; } @@ -421,20 +422,20 @@ time_window_compaction_strategy::time_window_compaction_strategy(const std::map< } // namespace sstables std::vector -date_tiered_manifest::get_next_sstables(table_state& table_s, std::vector& uncompacting, gc_clock::time_point gc_before) { +date_tiered_manifest::get_next_sstables(table_state& table_s, std::vector& uncompacting, gc_clock::time_point compaction_time) { if (table_s.get_sstable_set().all()->empty()) { return {}; } // Find fully expired SSTables. Those will be included no matter what. - auto expired = table_s.fully_expired_sstables(uncompacting); + auto expired = table_s.fully_expired_sstables(uncompacting, compaction_time); if (!expired.empty()) { auto is_expired = [&] (const sstables::shared_sstable& s) { return expired.contains(s); }; uncompacting.erase(boost::remove_if(uncompacting, is_expired), uncompacting.end()); } - auto compaction_candidates = get_next_non_expired_sstables(table_s, uncompacting, gc_before); + auto compaction_candidates = get_next_non_expired_sstables(table_s, uncompacting, compaction_time); if (!expired.empty()) { compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end()); } @@ -464,7 +465,7 @@ int64_t date_tiered_manifest::get_estimated_tasks(table_state& table_s) const { } std::vector -date_tiered_manifest::get_next_non_expired_sstables(table_state& table_s, std::vector& non_expiring_sstables, gc_clock::time_point gc_before) { +date_tiered_manifest::get_next_non_expired_sstables(table_state& table_s, std::vector& non_expiring_sstables, gc_clock::time_point compaction_time) { int base = table_s.schema()->min_compaction_threshold(); int64_t now = get_now(table_s.get_sstable_set().all()); auto most_interesting = get_compaction_candidates(table_s, non_expiring_sstables, now, base); @@ -582,8 +583,8 @@ date_tiered_compaction_strategy::date_tiered_compaction_strategy(const std::map< } compaction_descriptor date_tiered_compaction_strategy::get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector candidates) { - auto gc_before = gc_clock::now() - table_s.schema()->gc_grace_seconds(); - auto sstables = _manifest.get_next_sstables(table_s, candidates, gc_before); + auto compaction_time = gc_clock::now(); + auto sstables = _manifest.get_next_sstables(table_s, candidates, compaction_time); if (!sstables.empty()) { date_tiered_manifest::logger.debug("datetiered: Compacting {} out of {} sstables", sstables.size(), candidates.size()); @@ -591,8 +592,8 @@ compaction_descriptor date_tiered_compaction_strategy::get_sstables_for_compacti } // filter out sstables which droppable tombstone ratio isn't greater than the defined threshold. - auto e = boost::range::remove_if(candidates, [this, &gc_before] (const sstables::shared_sstable& sst) -> bool { - return !worth_dropping_tombstones(sst, gc_before); + auto e = boost::range::remove_if(candidates, [this, compaction_time] (const sstables::shared_sstable& sst) -> bool { + return !worth_dropping_tombstones(sst, compaction_time); }); candidates.erase(e, candidates.end()); if (candidates.empty()) { diff --git a/compaction/compaction_strategy_impl.hh b/compaction/compaction_strategy_impl.hh index 27eb357e68..a5d54fc3b7 100644 --- a/compaction/compaction_strategy_impl.hh +++ b/compaction/compaction_strategy_impl.hh @@ -74,7 +74,7 @@ public: // Check if a given sstable is entitled for tombstone compaction based on its // droppable tombstone histogram and gc_before. - bool worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point gc_before); + bool worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point compaction_time); virtual compaction_backlog_tracker& get_backlog_tracker() = 0; diff --git a/compaction/date_tiered_compaction_strategy.hh b/compaction/date_tiered_compaction_strategy.hh index b5b8db4273..7402e36a41 100644 --- a/compaction/date_tiered_compaction_strategy.hh +++ b/compaction/date_tiered_compaction_strategy.hh @@ -112,12 +112,12 @@ public: : _options(options) {} std::vector - get_next_sstables(table_state& table_s, std::vector& uncompacting, gc_clock::time_point gc_before); + get_next_sstables(table_state& table_s, std::vector& uncompacting, gc_clock::time_point compaction_time); int64_t get_estimated_tasks(table_state& table_s) const; private: std::vector - get_next_non_expired_sstables(table_state& table_s, std::vector& non_expiring_sstables, gc_clock::time_point gc_before); + get_next_non_expired_sstables(table_state& table_s, std::vector& non_expiring_sstables, gc_clock::time_point compaction_time); std::vector get_compaction_candidates(table_state& table_s, std::vector candidate_sstables, int64_t now, int base); diff --git a/compaction/leveled_compaction_strategy.cc b/compaction/leveled_compaction_strategy.cc index b9035225bf..883e652910 100644 --- a/compaction/leveled_compaction_strategy.cc +++ b/compaction/leveled_compaction_strategy.cc @@ -48,19 +48,21 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(t // unlike stcs, lcs can look for sstable with highest droppable tombstone ratio, so as not to choose // a sstable which droppable data shadow data in older sstable, by starting from highest levels which // theoretically contain oldest non-overlapping data. - auto gc_before = gc_clock::now() - table_s.schema()->gc_grace_seconds(); + auto compaction_time = gc_clock::now(); for (auto level = int(manifest.get_level_count()); level >= 0; level--) { auto& sstables = manifest.get_level(level); // filter out sstables which droppable tombstone ratio isn't greater than the defined threshold. - auto e = boost::range::remove_if(sstables, [this, &gc_before] (const sstables::shared_sstable& sst) -> bool { - return !worth_dropping_tombstones(sst, gc_before); + auto e = boost::range::remove_if(sstables, [this, compaction_time] (const sstables::shared_sstable& sst) -> bool { + return !worth_dropping_tombstones(sst, compaction_time); }); sstables.erase(e, sstables.end()); if (sstables.empty()) { continue; } auto& sst = *std::max_element(sstables.begin(), sstables.end(), [&] (auto& i, auto& j) { - return i->estimate_droppable_tombstone_ratio(gc_before) < j->estimate_droppable_tombstone_ratio(gc_before); + auto gc_before1 = i->get_gc_before_for_drop_estimation(compaction_time); + auto gc_before2 = j->get_gc_before_for_drop_estimation(compaction_time); + return i->estimate_droppable_tombstone_ratio(gc_before1) < j->estimate_droppable_tombstone_ratio(gc_before2); }); return sstables::compaction_descriptor({ sst }, table_s.get_sstable_set(), service::get_local_compaction_priority(), sst->get_sstable_level()); } diff --git a/compaction/size_tiered_compaction_strategy.cc b/compaction/size_tiered_compaction_strategy.cc index feed3be4fb..d2b319d960 100644 --- a/compaction/size_tiered_compaction_strategy.cc +++ b/compaction/size_tiered_compaction_strategy.cc @@ -161,7 +161,7 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(table_state& table_ // make local copies so they can't be changed out from under us mid-method int min_threshold = table_s.min_compaction_threshold(); int max_threshold = table_s.schema()->max_compaction_threshold(); - auto gc_before = gc_clock::now() - table_s.schema()->gc_grace_seconds(); + auto compaction_time = gc_clock::now(); // TODO: Add support to filter cold sstables (for reference: SizeTieredCompactionStrategy::filterColdSSTables). @@ -184,8 +184,8 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(table_state& table_ // tombstone purge, i.e. less likely to shadow even older data. for (auto&& sstables : buckets | boost::adaptors::reversed) { // filter out sstables which droppable tombstone ratio isn't greater than the defined threshold. - auto e = boost::range::remove_if(sstables, [this, &gc_before] (const sstables::shared_sstable& sst) -> bool { - return !worth_dropping_tombstones(sst, gc_before); + auto e = boost::range::remove_if(sstables, [this, compaction_time] (const sstables::shared_sstable& sst) -> bool { + return !worth_dropping_tombstones(sst, compaction_time); }); sstables.erase(e, sstables.end()); if (sstables.empty()) { diff --git a/compaction/table_state.hh b/compaction/table_state.hh index afd0fd44ce..19156ef96d 100644 --- a/compaction/table_state.hh +++ b/compaction/table_state.hh @@ -42,7 +42,7 @@ public: virtual unsigned min_compaction_threshold() const noexcept = 0; virtual bool compaction_enforce_min_threshold() const noexcept = 0; virtual const sstables::sstable_set& get_sstable_set() const = 0; - virtual std::unordered_set fully_expired_sstables(const std::vector& sstables) const = 0; + virtual std::unordered_set fully_expired_sstables(const std::vector& sstables, gc_clock::time_point compaction_time) const = 0; virtual const std::vector& compacted_undeleted_sstables() const noexcept = 0; virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept = 0; virtual reader_permit make_compaction_reader_permit() const = 0; diff --git a/compaction/time_window_compaction_strategy.cc b/compaction/time_window_compaction_strategy.cc index 2110bc68a1..c8cceff6b9 100644 --- a/compaction/time_window_compaction_strategy.cc +++ b/compaction/time_window_compaction_strategy.cc @@ -227,7 +227,7 @@ time_window_compaction_strategy::get_reshaping_job(std::vector i compaction_descriptor time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector candidates) { - auto gc_before = gc_clock::now() - table_s.schema()->gc_grace_seconds(); + auto compaction_time = gc_clock::now(); if (candidates.empty()) { return compaction_descriptor(); @@ -238,7 +238,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_ if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) { clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables"); - expired = table_s.fully_expired_sstables(candidates); + expired = table_s.fully_expired_sstables(candidates, compaction_time); _last_expired_check = db_clock::now(); } else { clogger.debug("TWCS skipping check for fully expired SSTables"); @@ -249,7 +249,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_ return compaction_descriptor(has_only_fully_expired::yes, std::vector(expired.begin(), expired.end()), table_s.get_sstable_set(), service::get_local_compaction_priority()); } - auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), gc_before); + auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), compaction_time); return compaction_descriptor(std::move(compaction_candidates), table_s.get_sstable_set(), service::get_local_compaction_priority()); } @@ -270,7 +270,7 @@ time_window_compaction_strategy::compaction_mode(const bucket_t& bucket, timesta std::vector time_window_compaction_strategy::get_next_non_expired_sstables(table_state& table_s, strategy_control& control, - std::vector non_expiring_sstables, gc_clock::time_point gc_before) { + std::vector non_expiring_sstables, gc_clock::time_point compaction_time) { auto most_interesting = get_compaction_candidates(table_s, control, non_expiring_sstables); if (!most_interesting.empty()) { @@ -279,8 +279,8 @@ time_window_compaction_strategy::get_next_non_expired_sstables(table_state& tabl // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone // ratio is greater than threshold. - auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool { - return !worth_dropping_tombstones(sst, gc_before); + auto e = boost::range::remove_if(non_expiring_sstables, [this, compaction_time] (const shared_sstable& sst) -> bool { + return !worth_dropping_tombstones(sst, compaction_time); }); non_expiring_sstables.erase(e, non_expiring_sstables.end()); if (non_expiring_sstables.empty()) { diff --git a/compaction/time_window_compaction_strategy.hh b/compaction/time_window_compaction_strategy.hh index bb05af7b3d..09c2f9946c 100644 --- a/compaction/time_window_compaction_strategy.hh +++ b/compaction/time_window_compaction_strategy.hh @@ -139,7 +139,7 @@ private: compaction_mode(const bucket_t& bucket, timestamp_type bucket_key, timestamp_type now, size_t min_threshold) const; std::vector - get_next_non_expired_sstables(table_state& table_s, strategy_control& control, std::vector non_expiring_sstables, gc_clock::time_point gc_before); + get_next_non_expired_sstables(table_state& table_s, strategy_control& control, std::vector non_expiring_sstables, gc_clock::time_point compaction_time); std::vector get_compaction_candidates(table_state& table_s, strategy_control& control, std::vector candidate_sstables); public: diff --git a/configure.py b/configure.py index 8a34d933f3..34a283878f 100755 --- a/configure.py +++ b/configure.py @@ -989,6 +989,8 @@ scylla_core = (['database.cc', 'table_helper.cc', 'range_tombstone.cc', 'range_tombstone_list.cc', + 'tombstone_gc_options.cc', + 'tombstone_gc.cc', 'utils/disk-error-handler.cc', 'duration.cc', 'vint-serialization.cc', diff --git a/cql3/statements/alter_table_statement.cc b/cql3/statements/alter_table_statement.cc index 6182f0ed71..70426df524 100644 --- a/cql3/statements/alter_table_statement.cc +++ b/cql3/statements/alter_table_statement.cc @@ -349,7 +349,7 @@ std::pair> alter_table_statement::prepare_ { auto schema_extensions = _properties->make_schema_extensions(db.extensions()); - _properties->validate(db, schema_extensions); + _properties->validate(db, keyspace(), schema_extensions); if (!cf.views().empty() && _properties->get_gc_grace_seconds() == 0) { throw exceptions::invalid_request_exception( diff --git a/cql3/statements/alter_view_statement.cc b/cql3/statements/alter_view_statement.cc index f7afaf93f9..775df6e714 100644 --- a/cql3/statements/alter_view_statement.cc +++ b/cql3/statements/alter_view_statement.cc @@ -90,7 +90,7 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const } auto schema_extensions = _properties->make_schema_extensions(db.extensions()); - _properties->validate(db, schema_extensions); + _properties->validate(db, keyspace(), schema_extensions); auto builder = schema_builder(schema); _properties->apply_to_builder(builder, std::move(schema_extensions)); diff --git a/cql3/statements/cf_prop_defs.cc b/cql3/statements/cf_prop_defs.cc index 9c0bf01fd7..ba80a97b16 100644 --- a/cql3/statements/cf_prop_defs.cc +++ b/cql3/statements/cf_prop_defs.cc @@ -46,6 +46,8 @@ #include "cdc/cdc_extension.hh" #include "gms/feature.hh" #include "gms/feature_service.hh" +#include "tombstone_gc_extension.hh" +#include "tombstone_gc.hh" #include @@ -94,7 +96,7 @@ schema::extensions_map cf_prop_defs::make_schema_extensions(const db::extensions return er; } -void cf_prop_defs::validate(const data_dictionary::database db, const schema::extensions_map& schema_extensions) const { +void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name, const schema::extensions_map& schema_extensions) const { // Skip validation if the comapction strategy class is already set as it means we've alreayd // prepared (and redoing it would set strategyClass back to null, which we don't want) if (_compaction_strategy_class) { @@ -156,6 +158,9 @@ void cf_prop_defs::validate(const data_dictionary::database db, const schema::ex throw exceptions::configuration_exception("CDC not supported by the cluster"); } + auto tombstone_gc_options = get_tombstone_gc_options(schema_extensions); + validate_tombstone_gc_options(tombstone_gc_options, db.real_database(), ks_name); + validate_minimum_int(KW_DEFAULT_TIME_TO_LIVE, 0, DEFAULT_DEFAULT_TIME_TO_LIVE); validate_minimum_int(KW_PAXOSGRACESECONDS, 0, DEFAULT_GC_GRACE_SECONDS); @@ -235,6 +240,16 @@ const cdc::options* cf_prop_defs::get_cdc_options(const schema::extensions_map& return &cdc_ext->get_options(); } +const tombstone_gc_options* cf_prop_defs::get_tombstone_gc_options(const schema::extensions_map& schema_exts) const { + auto it = schema_exts.find(tombstone_gc_extension::NAME); + if (it == schema_exts.end()) { + return nullptr; + } + + auto ext = dynamic_pointer_cast(it->second); + return &ext->get_options(); +} + void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions) const { if (has_property(KW_COMMENT)) { builder.set_comment(get_string(KW_COMMENT, "")); diff --git a/cql3/statements/cf_prop_defs.hh b/cql3/statements/cf_prop_defs.hh index 368bded005..e172ddee98 100644 --- a/cql3/statements/cf_prop_defs.hh +++ b/cql3/statements/cf_prop_defs.hh @@ -51,6 +51,8 @@ namespace data_dictionary { class database; } +class tombstone_gc_options; + namespace db { class extensions; } @@ -101,11 +103,12 @@ public: std::optional get_compaction_strategy_class() const; schema::extensions_map make_schema_extensions(const db::extensions& exts) const; - void validate(const data_dictionary::database db, const schema::extensions_map& schema_extensions) const; + void validate(const data_dictionary::database db, sstring ks_name, const schema::extensions_map& schema_extensions) const; std::map get_compaction_type_options() const; std::optional> get_compression_options() const; const cdc::options* get_cdc_options(const schema::extensions_map&) const; std::optional get_caching_options() const; + const tombstone_gc_options* get_tombstone_gc_options(const schema::extensions_map&) const; #if 0 public CachingOptions getCachingOptions() throws SyntaxException, ConfigurationException { diff --git a/cql3/statements/cf_properties.hh b/cql3/statements/cf_properties.hh index 389be1183f..bd02e0853f 100644 --- a/cql3/statements/cf_properties.hh +++ b/cql3/statements/cf_properties.hh @@ -94,8 +94,8 @@ public: _defined_ordering.emplace_back(alias, reversed); } - void validate(const data_dictionary::database db, const schema::extensions_map& schema_extensions) const { - _properties->validate(db, schema_extensions); + void validate(const data_dictionary::database db, sstring ks_name, const schema::extensions_map& schema_extensions) const { + _properties->validate(db, std::move(ks_name), schema_extensions); } }; diff --git a/cql3/statements/create_table_statement.cc b/cql3/statements/create_table_statement.cc index 159dcc7509..9cb8f272f6 100644 --- a/cql3/statements/create_table_statement.cc +++ b/cql3/statements/create_table_statement.cc @@ -211,7 +211,7 @@ std::unique_ptr create_table_statement::raw_statement::prepa throw exceptions::invalid_request_exception(format("Multiple definition of identifier {}", (*i)->text())); } - _properties.validate(db, _properties.properties()->make_schema_extensions(db.extensions())); + _properties.validate(db, keyspace(), _properties.properties()->make_schema_extensions(db.extensions())); const bool has_default_ttl = _properties.properties()->get_default_time_to_live() > 0; auto stmt = ::make_shared(*_cf_name, _properties.properties(), _if_not_exists, _static_columns, _properties.properties()->get_id()); diff --git a/cql3/statements/create_view_statement.cc b/cql3/statements/create_view_statement.cc index 3d3eb81362..004d9f999d 100644 --- a/cql3/statements/create_view_statement.cc +++ b/cql3/statements/create_view_statement.cc @@ -154,7 +154,7 @@ view_ptr create_view_statement::prepare_view(data_dictionary::database db) const // - make sure base_table gc_grace_seconds > 0 auto schema_extensions = _properties.properties()->make_schema_extensions(db.extensions()); - _properties.validate(db, schema_extensions); + _properties.validate(db, keyspace(), schema_extensions); if (_properties.use_compact_storage()) { throw exceptions::invalid_request_exception(format("Cannot use 'COMPACT STORAGE' when defining a materialized view")); diff --git a/database.cc b/database.cc index 5053d57e28..8823a8a00d 100644 --- a/database.cc +++ b/database.cc @@ -71,6 +71,7 @@ #include "locator/abstract_replication_strategy.hh" #include "timeout_config.hh" +#include "tombstone_gc.hh" #include "data_dictionary/impl.hh" @@ -940,6 +941,7 @@ future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_ lw_shared_ptr cf; try { cf = _column_families.at(uuid); + drop_repair_history_map_for_table(uuid); } catch (std::out_of_range&) { on_internal_error(dblog, fmt::format("drop_column_family {}.{}: UUID={} not found", ks_name, cf_name, uuid)); } @@ -2067,6 +2069,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun // call. auto low_mark = cf.set_low_replay_position_mark(); + const auto uuid = cf.schema()->id(); return _compaction_manager->run_with_compaction_disabled(&cf, [this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable { future<> f = make_ready_future<>(); @@ -2112,6 +2115,8 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun }); }); }); + }).then([this, uuid] { + drop_repair_history_map_for_table(uuid); }); }); } diff --git a/db/config.cc b/db/config.cc index 015677cd33..1b297c153b 100644 --- a/db/config.cc +++ b/db/config.cc @@ -35,6 +35,7 @@ #include #include "cdc/cdc_extension.hh" +#include "tombstone_gc_extension.hh" #include "config.hh" #include "extensions.hh" #include "log.hh" diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 7864fd1612..2a7f74712c 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -958,7 +958,7 @@ mutation compact_for_schema_digest(const mutation& m) { // See https://issues.apache.org/jira/browse/CASSANDRA-6862. // We achieve similar effect with compact_for_compaction(). mutation m_compacted(m); - m_compacted.partition().compact_for_compaction(*m.schema(), always_gc, gc_clock::time_point::max()); + m_compacted.partition().compact_for_compaction_drop_tombstones_unconditionally(*m.schema(), m.decorated_key()); return m_compacted; } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 63487c8af3..e38bd28feb 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -288,6 +288,26 @@ schema_ptr system_keyspace::raft_config() { return schema; } +schema_ptr system_keyspace::repair_history() { + static thread_local auto schema = [] { + auto id = generate_legacy_id(NAME, REPAIR_HISTORY); + return schema_builder(NAME, REPAIR_HISTORY, std::optional(id)) + .with_column("table_uuid", uuid_type, column_kind::partition_key) + // The time is repair start time + .with_column("repair_time", timestamp_type, column_kind::clustering_key) + .with_column("repair_uuid", uuid_type, column_kind::clustering_key) + // The token range is (range_start, range_end] + .with_column("range_start", long_type, column_kind::clustering_key) + .with_column("range_end", long_type, column_kind::clustering_key) + .with_column("keyspace_name", utf8_type, column_kind::static_column) + .with_column("table_name", utf8_type, column_kind::static_column) + .set_comment("Record repair history") + .with_version(generate_schema_version(id)) + .build(); + }(); + return schema; +} + schema_ptr system_keyspace::built_indexes() { static thread_local auto built_indexes = [] { schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES, @@ -2518,6 +2538,7 @@ std::vector system_keyspace::all_tables(const db::config& cfg) { compactions_in_progress(), compaction_history(), sstable_activity(), clients(), size_estimates(), large_partitions(), large_rows(), large_cells(), scylla_local(), db::schema_tables::scylla_table_schema_history(), + repair_history(), v3::views_builds_in_progress(), v3::built_views(), v3::scylla_views_builds_in_progress(), v3::truncated(), diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 701532c817..5b413a7164 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -149,6 +149,7 @@ public: static constexpr auto RAFT = "raft"; static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots"; static constexpr auto RAFT_CONFIG = "raft_config"; + static constexpr auto REPAIR_HISTORY = "repair_history"; static const char *const CLIENTS; struct v3 { @@ -230,6 +231,7 @@ public: static schema_ptr built_indexes(); // TODO (from Cassandra): make private static schema_ptr raft(); static schema_ptr raft_snapshots(); + static schema_ptr repair_history(); static table_schema_version generate_schema_version(utils::UUID table_id, uint16_t offset = 0); diff --git a/db/view/view.cc b/db/view/view.cc index 4ddc873767..403cbc2730 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -995,7 +995,8 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional throw std::logic_error("Empty materialized view updated"); } - auto gc_before = _now - _schema->gc_grace_seconds(); + auto dk = dht::decorate_key(*_schema, _key); + auto gc_before = ::get_gc_before_for_key(_schema, dk, _now); // We allow existing to be disengaged, which we treat the same as an empty row. if (existing) { diff --git a/gms/feature.hh b/gms/feature.hh index 2739ed1c20..47b80e7a11 100644 --- a/gms/feature.hh +++ b/gms/feature.hh @@ -151,6 +151,7 @@ extern const std::string_view UDA; extern const std::string_view SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT; extern const std::string_view SUPPORTS_RAFT_CLUSTER_MANAGEMENT; extern const std::string_view USES_RAFT_CLUSTER_MANAGEMENT; +extern const std::string_view TOMBSTONE_GC_OPTIONS; } diff --git a/gms/feature_service.cc b/gms/feature_service.cc index 8e67e1ce4d..9448407fa6 100644 --- a/gms/feature_service.cc +++ b/gms/feature_service.cc @@ -76,6 +76,7 @@ constexpr std::string_view features::UDA = "UDA"; constexpr std::string_view features::SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT = "SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT"; constexpr std::string_view features::SUPPORTS_RAFT_CLUSTER_MANAGEMENT = "SUPPORTS_RAFT_CLUSTER_MANAGEMENT"; constexpr std::string_view features::USES_RAFT_CLUSTER_MANAGEMENT = "USES_RAFT_CLUSTER_MANAGEMENT"; +constexpr std::string_view features::TOMBSTONE_GC_OPTIONS = "TOMBSTONE_GC_OPTIONS"; static logging::logger logger("features"); @@ -104,6 +105,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg) , _separate_page_size_and_safety_limit(*this, features::SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT) , _supports_raft_cluster_mgmt(*this, features::SUPPORTS_RAFT_CLUSTER_MANAGEMENT) , _uses_raft_cluster_mgmt(*this, features::USES_RAFT_CLUSTER_MANAGEMENT) + , _tombstone_gc_options(*this, features::TOMBSTONE_GC_OPTIONS) , _raft_support_listener(_supports_raft_cluster_mgmt.when_enabled([this] { // When the cluster fully supports raft-based cluster management, // we can re-enable support for the second gossip feature to trigger @@ -227,6 +229,7 @@ std::set feature_service::known_feature_set() { gms::features::SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT, gms::features::SUPPORTS_RAFT_CLUSTER_MANAGEMENT, gms::features::USES_RAFT_CLUSTER_MANAGEMENT, + gms::features::TOMBSTONE_GC_OPTIONS, }; for (const sstring& s : _config._disabled_features) { @@ -335,6 +338,7 @@ void feature_service::enable(const std::set& list) { std::ref(_separate_page_size_and_safety_limit), std::ref(_supports_raft_cluster_mgmt), std::ref(_uses_raft_cluster_mgmt), + std::ref(_tombstone_gc_options), }) { if (list.contains(f.name())) { diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 80e53b7cf3..3a36bb33c3 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -103,6 +103,7 @@ private: gms::feature _separate_page_size_and_safety_limit; gms::feature _supports_raft_cluster_mgmt; gms::feature _uses_raft_cluster_mgmt; + gms::feature _tombstone_gc_options; gms::feature::listener_registration _raft_support_listener; @@ -199,6 +200,10 @@ public: return bool(_separate_page_size_and_safety_limit); } + bool cluster_supports_tombstone_gc_options() const { + return bool(_tombstone_gc_options); + } + static std::set to_feature_set(sstring features_string); // Persist enabled feature in the `system.scylla_local` table under the "enabled_features" key. // The key itself is maintained as an `unordered_set` and serialized via `to_string` diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index 9d753ebf93..1b7243de63 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -142,3 +142,28 @@ struct node_ops_cmd_response { // Optional field, set by query_pending_ops cmd std::list pending_ops; }; + +struct repair_update_system_table_request { + utils::UUID repair_uuid; + utils::UUID table_uuid; + sstring keyspace_name; + sstring table_name; + dht::token_range range; + gc_clock::time_point repair_time; +}; + +struct repair_update_system_table_response { +}; + +struct repair_flush_hints_batchlog_request { + utils::UUID repair_uuid; + std::list target_nodes; + std::chrono::seconds hints_timeout; + std::chrono::seconds batchlog_timeout; +}; + +struct repair_flush_hints_batchlog_response { +}; + +verb [[with_client_info]] repair_update_system_table (repair_update_system_table_request) -> repair_update_system_table_response; +verb [[with_client_info]] repair_flush_hints_batchlog (repair_flush_hints_batchlog_request) -> repair_flush_hints_batchlog_response; diff --git a/main.cc b/main.cc index f3c6259d58..a8c97be103 100644 --- a/main.cc +++ b/main.cc @@ -90,6 +90,7 @@ #include "cdc/log.hh" #include "cdc/cdc_extension.hh" #include "cdc/generation_service.hh" +#include "tombstone_gc_extension.hh" #include "alternator/tags_extension.hh" #include "db/paxos_grace_seconds_extension.hh" #include "service/qos/standard_service_level_distributed_data_accessor.hh" @@ -438,6 +439,7 @@ For more information about individual apps, run: scylla {app_name} --help ext->add_schema_extension(alternator::tags_extension::NAME); ext->add_schema_extension(cdc::cdc_extension::NAME); ext->add_schema_extension(db::paxos_grace_seconds_extension::NAME); + ext->add_schema_extension(tombstone_gc_extension::NAME); auto cfg = make_lw_shared(ext); auto init = app.get_options_description().add_options(); @@ -1095,7 +1097,7 @@ For more information about individual apps, run: scylla {app_name} --help // both) supervisor::notify("starting messaging service"); auto max_memory_repair = memory::stats().total_memory() * 0.1; - repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(mm), max_memory_repair).get(); + repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(mm), max_memory_repair).get(); auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] { repair.stop().get(); }); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index e7159a848d..e3394500a3 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -81,7 +81,6 @@ #include "idl/gossip_digest.dist.impl.hh" #include "idl/read_command.dist.impl.hh" #include "idl/range.dist.impl.hh" -#include "idl/partition_checksum.dist.impl.hh" #include "idl/query.dist.impl.hh" #include "idl/cache_temperature.dist.impl.hh" #include "idl/mutation.dist.impl.hh" @@ -103,6 +102,7 @@ #include "locator/snitch_base.hh" #include "message/rpc_protocol_impl.hh" +#include "idl/partition_checksum.dist.impl.hh" namespace netw { @@ -469,6 +469,8 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM: case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM: case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM: + case messaging_verb::REPAIR_UPDATE_SYSTEM_TABLE: + case messaging_verb::REPAIR_FLUSH_HINTS_BATCHLOG: case messaging_verb::NODE_OPS_CMD: case messaging_verb::HINT_MUTATION: return 1; diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 22d5c50a3d..70c0278514 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -161,7 +161,9 @@ enum class messaging_verb : int32_t { RAFT_MODIFY_CONFIG = 56, GROUP0_PEER_EXCHANGE = 57, GROUP0_MODIFY_CONFIG = 58, - LAST = 59, + REPAIR_UPDATE_SYSTEM_TABLE = 59, + REPAIR_FLUSH_HINTS_BATCHLOG = 60, + LAST = 61, }; } // namespace netw diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 573deaf521..9b6ff5c3a8 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -23,6 +23,7 @@ #include "compaction/compaction_garbage_collector.hh" #include "mutation_fragment.hh" +#include "tombstone_gc.hh" static inline bool has_ck_selector(const query::clustering_row_ranges& ranges) { // Like PK range, an empty row range, should be considered an "exclude all" restriction @@ -150,10 +151,10 @@ template class compact_mutation_state { const schema& _schema; gc_clock::time_point _query_time; - gc_clock::time_point _gc_before; std::function _get_max_purgeable; can_gc_fn _can_gc; api::timestamp_type _max_purgeable = api::missing_timestamp; + std::optional _gc_before; const query::partition_slice& _slice; uint64_t _row_limit{}; uint32_t _partition_limit{}; @@ -209,13 +210,26 @@ private: } bool can_purge_tombstone(const tombstone& t) { - return t.deletion_time < _gc_before && can_gc(t); + return can_gc(t) && t.deletion_time < get_gc_before(); }; bool can_purge_tombstone(const row_tombstone& t) { - return t.max_deletion_time() < _gc_before && can_gc(t.tomb()); + return can_gc(t.tomb()) && t.max_deletion_time() < get_gc_before(); }; + gc_clock::time_point get_gc_before() { + if (_gc_before) { + return _gc_before.value(); + } else { + if (_dk) { + _gc_before = ::get_gc_before_for_key(_schema.shared_from_this(), *_dk, _query_time); + return _gc_before.value(); + } else { + return gc_clock::time_point::min(); + } + } + } + bool can_gc(tombstone t) { if (!sstable_compaction()) { return true; @@ -241,7 +255,6 @@ public: uint32_t partition_limit) : _schema(s) , _query_time(query_time) - , _gc_before(saturating_subtract(query_time, s.gc_grace_seconds())) , _can_gc(always_gc) , _slice(slice) , _row_limit(limit) @@ -257,7 +270,6 @@ public: std::function get_max_purgeable) : _schema(s) , _query_time(compaction_time) - , _gc_before(saturating_subtract(_query_time, s.gc_grace_seconds())) , _get_max_purgeable(std::move(get_max_purgeable)) , _can_gc([this] (tombstone t) { return can_gc(t); }) , _slice(s.full_slice()) @@ -282,6 +294,7 @@ public: _range_tombstones.clear(); _current_partition_limit = std::min(_row_limit, _partition_row_limit); _max_purgeable = api::missing_timestamp; + _gc_before = std::nullopt; _last_static_row.reset(); } @@ -306,8 +319,9 @@ public: if constexpr (sstable_compaction()) { _collector->start_collecting_static_row(); } + auto gc_before = get_gc_before(); bool is_live = sr.cells().compact_and_expire(_schema, column_kind::static_column, row_tombstone(current_tombstone), - _query_time, _can_gc, _gc_before, _collector.get()); + _query_time, _can_gc, gc_before, _collector.get()); _stats.static_rows += is_live; if constexpr (sstable_compaction()) { _collector->consume_static_row([this, &gc_consumer, current_tombstone] (static_row&& sr_garbage) { @@ -350,9 +364,9 @@ public: cr.remove_tombstone(); } } - - bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, _gc_before, _collector.get()); - is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before, cr.marker(), + auto gc_before = get_gc_before(); + bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, gc_before, _collector.get()); + is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, gc_before, cr.marker(), _collector.get()); _stats.clustering_rows += is_live; @@ -470,7 +484,6 @@ public: _rows_in_current_partition = 0; _current_partition_limit = std::min(_row_limit, _partition_row_limit); _query_time = query_time; - _gc_before = saturating_subtract(query_time, _schema.gc_grace_seconds()); _stats = {}; if ((next_fragment_kind == mutation_fragment::kind::clustering_row || next_fragment_kind == mutation_fragment::kind::range_tombstone) @@ -517,7 +530,8 @@ public: } compact_mutation(const schema& s, gc_clock::time_point compaction_time, - std::function get_max_purgeable, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) + std::function get_max_purgeable, + Consumer consumer, GCConsumer gc_consumer = GCConsumer()) : _state(make_lw_shared>(s, compaction_time, get_max_purgeable)) , _consumer(std::move(consumer)) , _gc_consumer(std::move(gc_consumer)) { diff --git a/mutation_partition.cc b/mutation_partition.cc index 6f8d7d8aed..dcbd567763 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -41,6 +41,7 @@ #include "utils/exceptions.hh" #include "clustering_key_filter.hh" #include "mutation_partition_view.hh" +#include "tombstone_gc.hh" logging::logger mplog("mutation_partition"); @@ -1294,17 +1295,20 @@ void mutation_partition::trim_rows(const schema& s, } uint32_t mutation_partition::do_compact(const schema& s, + const dht::decorated_key& dk, gc_clock::time_point query_time, const std::vector& row_ranges, bool always_return_static_content, bool reverse, uint64_t row_limit, - can_gc_fn& can_gc) + can_gc_fn& can_gc, + bool drop_tombstones_unconditionally) { check_schema(s); assert(row_limit > 0); - auto gc_before = saturating_subtract(query_time, s.gc_grace_seconds()); + auto gc_before = drop_tombstones_unconditionally ? gc_clock::time_point::max() : + ::get_gc_before_for_key(s.shared_from_this(), dk, query_time); auto should_purge_tombstone = [&] (const tombstone& t) { return t.deletion_time < gc_before && can_gc(t); @@ -1363,6 +1367,7 @@ uint32_t mutation_partition::do_compact(const schema& s, uint64_t mutation_partition::compact_for_query( const schema& s, + const dht::decorated_key& dk, gc_clock::time_point query_time, const std::vector& row_ranges, bool always_return_static_content, @@ -1370,18 +1375,31 @@ mutation_partition::compact_for_query( uint64_t row_limit) { check_schema(s); - return do_compact(s, query_time, row_ranges, always_return_static_content, reverse, row_limit, always_gc); + bool drop_tombstones_unconditionally = false; + return do_compact(s, dk, query_time, row_ranges, always_return_static_content, reverse, row_limit, always_gc, drop_tombstones_unconditionally); } void mutation_partition::compact_for_compaction(const schema& s, - can_gc_fn& can_gc, gc_clock::time_point compaction_time) + can_gc_fn& can_gc, const dht::decorated_key& dk, gc_clock::time_point compaction_time) { check_schema(s); static const std::vector all_rows = { query::clustering_range::make_open_ended_both_sides() }; - do_compact(s, compaction_time, all_rows, true, false, query::partition_max_rows, can_gc); + bool drop_tombstones_unconditionally = false; + do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, can_gc, drop_tombstones_unconditionally); +} + +void mutation_partition::compact_for_compaction_drop_tombstones_unconditionally(const schema& s, const dht::decorated_key& dk) +{ + check_schema(s); + static const std::vector all_rows = { + query::clustering_range::make_open_ended_both_sides() + }; + bool drop_tombstones_unconditionally = true; + auto compaction_time = gc_clock::time_point::max(); + do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, always_gc, drop_tombstones_unconditionally); } // Returns true if the mutation_partition represents no writes. diff --git a/mutation_partition.hh b/mutation_partition.hh index a26d749c74..abdf11991c 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -1241,12 +1241,14 @@ private: void insert_row(const schema& s, const clustering_key& key, const deletable_row& row); uint32_t do_compact(const schema& s, + const dht::decorated_key& dk, gc_clock::time_point now, const std::vector& row_ranges, bool always_return_static_content, bool reverse, uint64_t row_limit, - can_gc_fn&); + can_gc_fn&, + bool drop_tombstones_unconditionally); // Calls func for each row entry inside row_ranges until func returns stop_iteration::yes. // Removes all entries for which func didn't return stop_iteration::no or wasn't called at all. @@ -1274,7 +1276,7 @@ public: // // The row_limit parameter must be > 0. // - uint64_t compact_for_query(const schema& s, gc_clock::time_point query_time, + uint64_t compact_for_query(const schema& s, const dht::decorated_key& dk, gc_clock::time_point query_time, const std::vector& row_ranges, bool always_return_static_content, bool reversed, uint64_t row_limit); @@ -1283,8 +1285,13 @@ public: // - drops cells covered by higher-level tombstones // - drops expired tombstones which timestamp is before max_purgeable void compact_for_compaction(const schema& s, can_gc_fn&, + const dht::decorated_key& dk, gc_clock::time_point compaction_time); + // Like compact_for_compaction but drop tombstones unconditionally + void compact_for_compaction_drop_tombstones_unconditionally(const schema& s, + const dht::decorated_key& dk); + // Returns the minimal mutation_partition that when applied to "other" will // create a mutation_partition equal to the sum of other and this one. // This and other must both be governed by the same schema s. diff --git a/repair/repair.cc b/repair/repair.cc index 26e2ed5228..887dd734c4 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -55,6 +55,8 @@ #include +#include "idl/partition_checksum.dist.hh" + logging::logger rlogger("repair"); void node_ops_info::check_abort() { @@ -313,7 +315,7 @@ static std::vector get_neighbors(database& db, #endif } -static future> get_hosts_participating_in_repair(database& db, +static future> get_hosts_participating_in_repair(database& db, const sstring& ksname, const dht::token_range_vector& ranges, const std::vector& data_centers, @@ -333,7 +335,7 @@ static future> get_hosts_participating_in_repair( } }); - co_return std::vector(participating_hosts.begin(), participating_hosts.end()); + co_return std::list(participating_hosts.begin(), participating_hosts.end()); } static tracker* _the_tracker = nullptr; @@ -591,8 +593,10 @@ repair_info::repair_info(repair_service& repair, const std::vector& hosts_, const std::unordered_set& ignore_nodes_, streaming::stream_reason reason_, - std::optional ops_uuid) - : db(repair.get_db()) + std::optional ops_uuid, + bool hints_batchlog_flushed) + : rs(repair) + , db(repair.get_db()) , messaging(repair.get_messaging().container()) , sys_dist_ks(repair.get_sys_dist_ks()) , view_update_generator(repair.get_view_update_generator()) @@ -609,8 +613,10 @@ repair_info::repair_info(repair_service& repair, , hosts(hosts_) , ignore_nodes(ignore_nodes_) , reason(reason_) + , total_rf(db.local().find_keyspace(keyspace).get_effective_replication_map()->get_replication_factor()) , nr_ranges_total(ranges.size()) - , _ops_uuid(std::move(ops_uuid)) { + , _ops_uuid(std::move(ops_uuid)) + , _hints_batchlog_flushed(std::move(hints_batchlog_flushed)) { } void repair_info::check_failed_ranges() { @@ -1120,12 +1126,41 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map future<> { + rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started", + uuid, node, participants); + try { + auto& ms = get_messaging(); + auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req); + } catch (...) { + rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}", + uuid, node, participants, std::current_exception()); + throw; + } + }).get(); + hints_batchlog_flushed = true; + } catch (...) { + rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair", + uuid, participants); + } + std::vector> repair_results; repair_results.reserve(smp::count); auto table_ids = get_table_ids(db.local(), keyspace, cfs); abort_source as; - auto uuid = id.uuid; auto off_strategy_updater = seastar::async([this, uuid, &table_ids, &participants, &as] { auto tables = std::list(table_ids.begin(), table_ids.end()); auto req = node_ops_cmd_request(node_ops_cmd::repair_updater, uuid, {}, {}, {}, {}, std::move(tables)); @@ -1155,13 +1190,21 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_mapcleanup_history(uuid).get(); + } catch (...) { + rlogger.warn("repair[{}]: Failed to cleanup history: {}", uuid, std::current_exception()); + } + }); + for (auto shard : boost::irange(unsigned(0), smp::count)) { - auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, + auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, data_centers = options.data_centers, hosts = options.hosts, ignore_nodes] (repair_service& local_repair) mutable { _node_ops_metrics.repair_total_ranges_sum += ranges.size(); auto ri = make_lw_shared(local_repair, std::move(keyspace), std::move(ranges), std::move(table_ids), - id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, id.uuid); + id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, id.uuid, hints_batchlog_flushed); return repair_ranges(ri); }); repair_results.push_back(std::move(f)); @@ -1263,9 +1306,10 @@ future<> repair_service::do_sync_data_using_repair( auto data_centers = std::vector(); auto hosts = std::vector(); auto ignore_nodes = std::unordered_set(); + bool hints_batchlog_flushed = false; auto ri = make_lw_shared(local_repair, std::move(keyspace), std::move(ranges), std::move(table_ids), - id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_uuid); + id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_uuid, hints_batchlog_flushed); ri->neighbors = std::move(neighbors); return repair_ranges(ri); }); diff --git a/repair/repair.hh b/repair/repair.hh index 63f866fd19..7aaf1066d4 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -168,6 +168,7 @@ public: class repair_info { public: + repair_service& rs; seastar::sharded& db; seastar::sharded& messaging; sharded& sys_dist_ks; @@ -186,6 +187,7 @@ public: std::unordered_set ignore_nodes; streaming::stream_reason reason; std::unordered_map neighbors; + size_t total_rf; uint64_t nr_ranges_finished = 0; uint64_t nr_ranges_total; size_t nr_failed_ranges = 0; @@ -194,6 +196,7 @@ public: repair_stats _stats; std::unordered_set dropped_tables; std::optional _ops_uuid; + bool _hints_batchlog_flushed = false; public: repair_info(repair_service& repair, const sstring& keyspace_, @@ -204,7 +207,8 @@ public: const std::vector& hosts_, const std::unordered_set& ingore_nodes_, streaming::stream_reason reason_, - std::optional ops_uuid); + std::optional ops_uuid, + bool hints_batchlog_flushed); void check_failed_ranges(); void abort(); void check_in_abort(); @@ -219,6 +223,10 @@ public: return _ops_uuid; }; + bool hints_batchlog_flushed() const { + return _hints_batchlog_flushed; + } + future<> repair_range(const dht::token_range& range); }; @@ -491,6 +499,29 @@ struct node_ops_cmd_response { } }; + +struct repair_update_system_table_request { + utils::UUID repair_uuid; + utils::UUID table_uuid; + sstring keyspace_name; + sstring table_name; + dht::token_range range; + gc_clock::time_point repair_time; +}; + +struct repair_update_system_table_response { +}; + +struct repair_flush_hints_batchlog_request { + utils::UUID repair_uuid; + std::list target_nodes; + std::chrono::seconds hints_timeout; + std::chrono::seconds batchlog_timeout; +}; + +struct repair_flush_hints_batchlog_response { +}; + namespace std { template<> diff --git a/repair/row_level.cc b/repair/row_level.cc index a094bae628..18101e5d6e 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -52,6 +52,13 @@ #include "service/migration_manager.hh" #include "streaming/consumer.hh" #include +#include +#include "db/query_context.hh" +#include "db/system_keyspace.hh" +#include "service/storage_proxy.hh" +#include "db/batchlog_manager.hh" +#include "cql3/untyped_result_set.hh" +#include "idl/partition_checksum.dist.hh" extern logging::logger rlogger; @@ -2266,6 +2273,66 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler( }); } +future repair_service::repair_update_system_table_handler(gms::inet_address from, repair_update_system_table_request req) { + rlogger.debug("repair[{}]: Got repair_update_system_table_request from node={}, range={}, repair_time={}", req.repair_uuid, from, req.range, req.repair_time); + auto& db = this->get_db(); + bool is_valid_range = true; + if (req.range.start()) { + if (req.range.start()->is_inclusive()) { + is_valid_range = false; + } + } + if (req.range.end()) { + if (!req.range.end()->is_inclusive()) { + is_valid_range = false; + } + } + if (!is_valid_range) { + throw std::runtime_error(format("repair[{}]: range {} is not in the format of (start, end]", req.repair_uuid, req.range)); + } + co_await db.invoke_on_all([&req] (database& local_db) { + auto& table = local_db.find_column_family(req.table_uuid); + return ::update_repair_time(table.schema(), req.range, req.repair_time); + }); + sstring cql = format("INSERT INTO system.{} (table_uuid, repair_time, repair_uuid, keyspace_name, table_name, range_start, range_end) VALUES (?, ?, ?, ?, ?, ?, ?)", + db::system_keyspace::REPAIR_HISTORY); + auto range_start = req.range.start() ? req.range.start()->value() : dht::minimum_token(); + auto range_end = req.range.end() ? req.range.end()->value() : dht::maximum_token(); + db_clock::time_point ts = db_clock::from_time_t(gc_clock::to_time_t(req.repair_time)); + co_await db::qctx->execute_cql(cql, req.table_uuid, ts, req.repair_uuid, req.keyspace_name, req.table_name, + dht::token::to_int64(range_start), dht::token::to_int64(range_end)).discard_result(); + co_return repair_update_system_table_response(); +} + +future repair_service::repair_flush_hints_batchlog_handler(gms::inet_address from, repair_flush_hints_batchlog_request req) { + rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={}, target_nodes={}, hints_timeout={}s, batchlog_timeout={}s", + req.repair_uuid, from, req.target_nodes, req.hints_timeout.count(), req.batchlog_timeout.count()); + std::vector target_nodes(req.target_nodes.begin(), req.target_nodes.end()); + db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::move(target_nodes)); + lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout; + try { + co_await coroutine::all( + [this, &from, &req, &sync_point, &deadline] () -> future<> { + rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes); + co_await _sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline); + rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}, target_hosts={}", req.repair_uuid, from, req.target_nodes); + co_return; + }, + [this, &from, &req] () -> future<> { + rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes); + co_await _bm.local().do_batch_log_replay(); + rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes); + } + ); + } catch (...) { + rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={}, target_hosts={}, {}", + req.repair_uuid, from, req.target_nodes, std::current_exception()); + throw; + } + rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes); + co_return repair_flush_hints_batchlog_response(); +} + future<> repair_service::init_ms_handlers() { auto& ms = this->_messaging; @@ -2430,6 +2497,14 @@ future<> repair_service::init_ms_handlers() { ms.register_repair_get_diff_algorithms([] (const rpc::client_info& cinfo) { return make_ready_future>(suportted_diff_detect_algorithms()); }); + ser::partition_checksum_rpc_verbs::register_repair_update_system_table(&ms, [this] (const rpc::client_info& cinfo, repair_update_system_table_request req) { + auto from = cinfo.retrieve_auxiliary("baddr"); + return repair_update_system_table_handler(from, std::move(req)); + }); + ser::partition_checksum_rpc_verbs::register_repair_flush_hints_batchlog(&ms, [this] (const rpc::client_info& cinfo, repair_flush_hints_batchlog_request req) { + auto from = cinfo.retrieve_auxiliary("baddr"); + return repair_flush_hints_batchlog_handler(from, std::move(req)); + }); return make_ready_future<>(); } @@ -2450,7 +2525,10 @@ future<> repair_service::uninit_ms_handlers() { ms.unregister_repair_row_level_stop(), ms.unregister_repair_get_estimated_partitions(), ms.unregister_repair_set_estimated_partitions(), - ms.unregister_repair_get_diff_algorithms()).discard_result(); + ms.unregister_repair_get_diff_algorithms(), + ser::partition_checksum_rpc_verbs::unregister_repair_update_system_table(&ms), + ser::partition_checksum_rpc_verbs::unregister_repair_flush_hints_batchlog(&ms) + ).discard_result(); } class repair_meta_tracker { @@ -2522,6 +2600,8 @@ class row_level_repair { // the next repair. uint64_t _seed; + gc_clock::time_point _start_time; + public: row_level_repair(repair_info& ri, sstring cf_name, @@ -2534,7 +2614,8 @@ public: , _range(std::move(range)) , _all_live_peer_nodes(std::move(all_live_peer_nodes)) , _cf(_ri.db.local().find_column_family(_table_id)) - , _seed(get_random_seed()) { + , _seed(get_random_seed()) + , _start_time(gc_clock::now()) { } private: @@ -2771,6 +2852,45 @@ private: master.stats().round_nr_slow_path++; } +private: + // Update system.repair_history table + future<> update_system_repair_table() { + // Update repair_history table only if it is a reguar repair. + if (_ri.reason != streaming::stream_reason::repair) { + co_return; + } + // Update repair_history table only if all replicas have been repaired + size_t repaired_replicas = _all_live_peer_nodes.size() + 1; + if (_ri.total_rf != repaired_replicas){ + rlogger.debug("repair[{}]: Skipped to update system.repair_history total_rf={}, repaired_replicas={}, local={}, peers={}", + _ri.id.uuid, _ri.total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes); + co_return; + } + // Update repair_history table only if both hints and batchlog have been flushed. + if (!_ri.hints_batchlog_flushed()) { + co_return; + } + repair_service& rs = _ri.rs; + std::optional repair_time_opt = co_await rs.update_history(_ri.id.uuid, _table_id, _range, _start_time); + if (!repair_time_opt) { + co_return; + } + auto repair_time = repair_time_opt.value(); + repair_update_system_table_request req{_ri.id.uuid, _table_id, _ri.keyspace, _cf_name, _range, repair_time}; + auto all_nodes = _all_live_peer_nodes; + all_nodes.push_back(utils::fb_utilities::get_broadcast_address()); + co_await parallel_for_each(all_nodes, [this, req] (gms::inet_address node) -> future<> { + try { + auto& ms = _ri.messaging.local(); + repair_update_system_table_response resp = co_await ser::partition_checksum_rpc_verbs::send_repair_update_system_table(&ms, netw::messaging_service::msg_addr(node), req); + rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _ri.id.uuid, node); + } catch (...) { + rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _ri.id.uuid, node, std::current_exception()); + } + }); + co_return; + } + public: future<> run() { return seastar::async([this] { @@ -2884,6 +3004,8 @@ public: } else { throw std::runtime_error(format("Failed to repair for keyspace={}, cf={}, range={}", _ri.keyspace, _cf_name, _range)); } + } else { + update_system_repair_table().get(); } rlogger.debug("<<< Finished Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, range={}, tx_hashes_nr={}, rx_hashes_nr={}, tx_row_nr={}, rx_row_nr={}, row_from_disk_bytes={}, row_from_disk_nr={}", master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, _range, master.stats().tx_hashes_nr, master.stats().rx_hashes_nr, master.stats().tx_row_nr, master.stats().rx_row_nr, master.stats().row_from_disk_bytes, master.stats().row_from_disk_nr); @@ -2957,6 +3079,8 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc repair_service::repair_service(distributed& gossiper, netw::messaging_service& ms, sharded& db, + sharded& sp, + sharded& bm, sharded& sys_dist_ks, sharded& vug, service::migration_manager& mm, @@ -2964,6 +3088,8 @@ repair_service::repair_service(distributed& gossiper, : _gossiper(gossiper) , _messaging(ms) , _db(db) + , _sp(sp) + , _bm(bm) , _sys_dist_ks(sys_dist_ks) , _view_update_generator(vug) , _mm(mm) @@ -2976,6 +3102,7 @@ repair_service::repair_service(distributed& gossiper, } future<> repair_service::start() { + co_await load_history(); co_await init_metrics(); co_await init_ms_handlers(); } @@ -2991,3 +3118,79 @@ future<> repair_service::stop() { repair_service::~repair_service() { assert(_stopped); } + +static shard_id repair_id_to_shard(utils::UUID& repair_id) { + return shard_id(repair_id.get_most_significant_bits()) % smp::count; +} + +future> +repair_service::update_history(utils::UUID repair_id, utils::UUID table_id, dht::token_range range, gc_clock::time_point repair_time) { + auto shard = repair_id_to_shard(repair_id); + return container().invoke_on(shard, [repair_id, table_id, range, repair_time] (repair_service& rs) mutable -> future> { + repair_history& rh = rs._finished_ranges_history[repair_id]; + if (rh.repair_time > repair_time) { + rh.repair_time = repair_time; + } + auto finished_shards = ++(rh.finished_ranges[table_id][range]); + if (finished_shards == smp::count) { + // All shards have finished repair the range. Send an rpc to ask peers to update system.repair_history table + rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_history table, finished_shards={}", + repair_id, range, table_id, finished_shards); + co_return rh.repair_time; + } else { + rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_historytable, finished_shards={}", + repair_id, range, table_id, finished_shards); + co_return std::nullopt; + } + }); +} + +future<> repair_service::cleanup_history(utils::UUID repair_id) { + auto shard = repair_id_to_shard(repair_id); + return container().invoke_on(shard, [repair_id] (repair_service& rs) mutable { + rs._finished_ranges_history.erase(repair_id); + rlogger.debug("repair[{}]: Finished cleaning up repair_service history", repair_id); + }); +} + +future<> repair_service::load_history() { + auto tables = get_db().local().get_column_families(); + for (const auto& x : tables) { + auto& table_uuid = x.first; + auto& table = x.second; + auto shard = unsigned(table_uuid.get_most_significant_bits()) % smp::count; + if (shard != this_shard_id()) { + continue; + } + rlogger.info("Loading repair history for keyspace={}, table={}, table_uuid={}", + table->schema()->ks_name(), table->schema()->cf_name(), table_uuid); + auto req = format("SELECT * from system.{} WHERE table_uuid = {}", db::system_keyspace::REPAIR_HISTORY, table_uuid); + co_await db::qctx->qp().query_internal(req, [this] (const cql3::untyped_result_set::row& row) mutable -> future { + auto table_uuid = row.get_as("table_uuid"); + auto range_start = row.get_as("range_start"); + auto range_end = row.get_as("range_end"); + auto keyspace_name = row.get_as("keyspace_name"); + auto table_name = row.get_as("table_name"); + auto start = range_start == std::numeric_limits::min() ? dht::minimum_token() : dht::token::from_int64(range_start); + auto end = range_end == std::numeric_limits::min() ? dht::maximum_token() : dht::token::from_int64(range_end); + auto repair_time = to_gc_clock(row.get_as("repair_time")); + auto range = dht::token_range(dht::token_range::bound(start, false), dht::token_range::bound(end, true)); + rlogger.debug("Loading repair history for keyspace={}, table={}, table_uuid={}, repair_time={}, range={}", + keyspace_name, table_name, table_uuid, repair_time, range); + co_await get_db().invoke_on_all([table_uuid, range, repair_time, keyspace_name, table_name] (database& local_db) -> future<> { + try { + auto& table = local_db.find_column_family(table_uuid); + ::update_repair_time(table.schema(), range, repair_time); + } catch (no_such_column_family&) { + rlogger.trace("Table {}.{} with {} does not exist", keyspace_name, table_name, table_uuid); + } catch (...) { + rlogger.warn("Failed to load repair history for keyspace={}, table={}, range={}, repair_time={}", + keyspace_name, table_name, range, repair_time); + } + co_return; + }); + co_return stop_iteration::no; + }); + } + co_return; +} diff --git a/repair/row_level.hh b/repair/row_level.hh index 04028bbd8a..95e3ae6d67 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -30,11 +30,13 @@ class row_level_repair_gossip_helper; namespace service { class migration_manager; +class storage_proxy; } namespace db { class system_distributed_keyspace; +class batchlog_manager; } @@ -42,14 +44,25 @@ namespace gms { class gossiper; } +class repair_history { +public: + // The key for the map is the table_id + std::unordered_map> finished_ranges; + gc_clock::time_point repair_time = gc_clock::time_point::max(); +}; + class repair_service : public seastar::peering_sharded_service { distributed& _gossiper; netw::messaging_service& _messaging; sharded& _db; + sharded& _sp; + sharded& _bm; sharded& _sys_dist_ks; sharded& _view_update_generator; service::migration_manager& _mm; + std::unordered_map _finished_ranges_history; + shared_ptr _gossip_helper; std::unique_ptr _tracker; bool _stopped = false; @@ -63,6 +76,8 @@ public: repair_service(distributed& gossiper, netw::messaging_service& ms, sharded& db, + sharded& sp, + sharded& bm, sharded& sys_dist_ks, sharded& vug, service::migration_manager& mm, size_t max_repair_memory); @@ -77,6 +92,10 @@ public: // stop them abruptly). future<> shutdown(); + future> update_history(utils::UUID repair_id, utils::UUID table_id, dht::token_range range, gc_clock::time_point repair_time); + future<> cleanup_history(utils::UUID repair_id); + future<> load_history(); + int do_repair_start(sstring keyspace, std::unordered_map options_map); // The tokens are the tokens assigned to the bootstrap node. @@ -101,6 +120,14 @@ private: streaming::stream_reason reason, std::optional ops_uuid); + future repair_update_system_table_handler( + gms::inet_address from, + repair_update_system_table_request req); + + future repair_flush_hints_batchlog_handler( + gms::inet_address from, + repair_flush_hints_batchlog_request req); + public: netw::messaging_service& get_messaging() noexcept { return _messaging; } sharded& get_db() noexcept { return _db; } diff --git a/schema.cc b/schema.cc index 18448c2d0d..dfffd6554b 100644 --- a/schema.cc +++ b/schema.cc @@ -40,8 +40,10 @@ #include "dht/i_partitioner.hh" #include "dht/token-sharding.hh" #include "cdc/cdc_extension.hh" +#include "tombstone_gc_extension.hh" #include "db/paxos_grace_seconds_extension.hh" #include "utils/rjson.hh" +#include "tombstone_gc_options.hh" constexpr int32_t schema::NAME_LENGTH; @@ -529,6 +531,7 @@ bool operator==(const schema& x, const schema& y) && x._raw._compaction_strategy_options == y._raw._compaction_strategy_options && x._raw._compaction_enabled == y._raw._compaction_enabled && x.cdc_options() == y.cdc_options() + && x.tombstone_gc_options() == y.tombstone_gc_options() && x._raw._caching_options == y._raw._caching_options && x._raw._dropped_columns == y._raw._dropped_columns && x._raw._collections == y._raw._collections @@ -1267,11 +1270,26 @@ const cdc::options& schema::cdc_options() const { return default_cdc_options; } +const ::tombstone_gc_options& schema::tombstone_gc_options() const { + static const ::tombstone_gc_options default_tombstone_gc_options; + const auto& schema_extensions = _raw._extensions; + + if (auto it = schema_extensions.find(tombstone_gc_extension::NAME); it != schema_extensions.end()) { + return dynamic_pointer_cast(it->second)->get_options(); + } + return default_tombstone_gc_options; +} + schema_builder& schema_builder::with_cdc_options(const cdc::options& opts) { add_extension(cdc::cdc_extension::NAME, ::make_shared(opts)); return *this; } +schema_builder& schema_builder::with_tombstone_gc_options(const tombstone_gc_options& opts) { + add_extension(tombstone_gc_extension::NAME, ::make_shared(opts)); + return *this; +} + schema_builder& schema_builder::set_paxos_grace_seconds(int32_t seconds) { add_extension(db::paxos_grace_seconds_extension::NAME, ::make_shared(seconds)); return *this; diff --git a/schema.hh b/schema.hh index e98c125372..0fab3a19fd 100644 --- a/schema.hh +++ b/schema.hh @@ -41,6 +41,7 @@ #include "caching_options.hh" #include "column_computation.hh" #include "timestamp.hh" +#include "tombstone_gc_options.hh" namespace dht { @@ -821,6 +822,8 @@ public: const cdc::options& cdc_options() const; + const ::tombstone_gc_options& tombstone_gc_options() const; + const ::speculative_retry& speculative_retry() const { return _raw._speculative_retry; } diff --git a/schema_builder.hh b/schema_builder.hh index e11f0efd3f..79bbe36e68 100644 --- a/schema_builder.hh +++ b/schema_builder.hh @@ -25,6 +25,7 @@ #include "database_fwd.hh" #include "cdc/log.hh" #include "dht/i_partitioner.hh" +#include "tombstone_gc_options.hh" struct schema_builder { public: @@ -291,6 +292,7 @@ public: schema_builder& without_indexes(); schema_builder& with_cdc_options(const cdc::options&); + schema_builder& with_tombstone_gc_options(const tombstone_gc_options& opts); default_names get_default_names() const { return default_names(_raw); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 9d5ebcd4c9..0efe34c51d 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3145,7 +3145,7 @@ private: auto mp = mutation_partition(s, m.partition()); auto&& ranges = cmd.slice.row_ranges(s, m.key()); bool always_return_static_content = cmd.slice.options.contains(); - mp.compact_for_query(s, cmd.timestamp, ranges, always_return_static_content, is_reversed, limit); + mp.compact_for_query(s, m.decorated_key(), cmd.timestamp, ranges, always_return_static_content, is_reversed, limit); return primary_key{m.decorated_key(), get_last_reconciled_row(s, mp, is_reversed)}; } @@ -3220,7 +3220,7 @@ private: std::vector ranges; ranges.emplace_back(is_reversed ? query::clustering_range::make_starting_with(std::move(*shortest_read->clustering)) : query::clustering_range::make_ending_with(std::move(*shortest_read->clustering))); - it->live_row_count = it->mut.partition().compact_for_query(s, cmd.timestamp, ranges, always_return_static_content, + it->live_row_count = it->mut.partition().compact_for_query(s, it->mut.decorated_key(), cmd.timestamp, ranges, always_return_static_content, is_reversed, query::partition_max_rows); } } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index bebc286c42..aa98f80da9 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -88,6 +88,7 @@ #include "mx/reader.hh" #include "utils/bit_cast.hh" #include "utils/cached_file.hh" +#include "tombstone_gc.hh" thread_local disk_error_signal_type sstable_read_error; thread_local disk_error_signal_type sstable_write_error; @@ -3139,6 +3140,46 @@ std::optional sstable::get_large_data_stat(large_data_ty return std::make_optional(); } +// The gc_before returned by the function can only be used to estimate if the +// sstable is worth dropping some tombstones. We only return the maximum +// gc_before for all the partitions that have record in repair history map. It +// is fine that some of the partitions inside the sstable does not have a +// record. +gc_clock::time_point sstable::get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time) const { + auto s = get_schema(); + auto start = get_first_decorated_key().token(); + auto end = get_last_decorated_key().token(); + auto range = dht::token_range(dht::token_range::bound(start, true), dht::token_range::bound(end, true)); + sstlog.trace("sstable={}, ks={}, cf={}, range={}, estimate", get_filename(), s->ks_name(), s->cf_name(), range); + return ::get_gc_before_for_range(s, range, compaction_time).max_gc_before; +} + +// If the sstable contains any regular live cells, we can not drop the sstable. +// We do not even bother to query the gc_before. Return +// gc_clock::time_point::min() as gc_before. +// +// If the token range of the sstable contains tokens that do not have a record +// in the repair history map, we can not drop the sstable, in such case we +// return gc_clock::time_point::min() as gc_before. Otherwise, return the +// gc_before from the repair history map. +gc_clock::time_point sstable::get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time) const { + auto deletion_time = get_max_local_deletion_time(); + auto s = get_schema(); + // No need to query gc_before for the sstable if the max_deletion_time is max() + if (deletion_time == gc_clock::time_point(gc_clock::duration(std::numeric_limits::max()))) { + sstlog.trace("sstable={}, ks={}, cf={}, get_max_local_deletion_time={}, min_timestamp={}, gc_grace_seconds={}, shortcut", + get_filename(), s->ks_name(), s->cf_name(), deletion_time, get_stats_metadata().min_timestamp, s->gc_grace_seconds().count()); + return gc_clock::time_point::min(); + } + auto start = get_first_decorated_key().token(); + auto end = get_last_decorated_key().token(); + auto range = dht::token_range(dht::token_range::bound(start, true), dht::token_range::bound(end, true)); + sstlog.trace("sstable={}, ks={}, cf={}, range={}, get_max_local_deletion_time={}, min_timestamp={}, gc_grace_seconds={}, query", + get_filename(), s->ks_name(), s->cf_name(), range, deletion_time, get_stats_metadata().min_timestamp, s->gc_grace_seconds().count()); + auto res = ::get_gc_before_for_range(s, range, compaction_time); + return res.knows_entire_range ? res.min_gc_before : gc_clock::time_point::min(); +} + } namespace seastar { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 0de0557f56..26dd00c819 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -896,6 +896,8 @@ public: friend std::unique_ptr data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&); friend void lw_shared_ptr_deleter::dispose(sstable* s); + gc_clock::time_point get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time) const; + gc_clock::time_point get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time) const; }; // When we compact sstables, we have to atomically instantiate the new diff --git a/table.cc b/table.cc index fb199fe95d..5bb9bb0aaf 100644 --- a/table.cc +++ b/table.cc @@ -59,7 +59,6 @@ static logging::logger tlogger("table"); static seastar::metrics::label column_family_label("cf"); static seastar::metrics::label keyspace_label("ks"); - using namespace std::chrono_literals; flat_mutation_reader_v2 @@ -2408,8 +2407,8 @@ public: const sstables::sstable_set& get_sstable_set() const override { return _t.get_sstable_set(); } - std::unordered_set fully_expired_sstables(const std::vector& sstables) const override { - return sstables::get_fully_expired_sstables(*this, sstables, gc_clock::now() - schema()->gc_grace_seconds()); + std::unordered_set fully_expired_sstables(const std::vector& sstables, gc_clock::time_point query_time) const override { + return sstables::get_fully_expired_sstables(*this, sstables, query_time); } const std::vector& compacted_undeleted_sstables() const noexcept override { return _t.compacted_undeleted_sstables(); diff --git a/test/boost/counter_test.cc b/test/boost/counter_test.cc index eef9317cb7..517ab33b40 100644 --- a/test/boost/counter_test.cc +++ b/test/boost/counter_test.cc @@ -266,7 +266,7 @@ SEASTAR_TEST_CASE(test_counter_mutations) { m = m1; m.apply(m4); - m.partition().compact_for_query(*s, gc_clock::now(), { query::clustering_range::make_singular(ck) }, + m.partition().compact_for_query(*s, m.decorated_key(), gc_clock::now(), { query::clustering_range::make_singular(ck) }, false, false, query::max_rows); BOOST_REQUIRE_EQUAL(m.partition().clustered_rows().calculate_size(), 0); BOOST_REQUIRE(m.partition().static_row().empty()); diff --git a/test/boost/memtable_test.cc b/test/boost/memtable_test.cc index d3297a340e..3caa3fb922 100644 --- a/test/boost/memtable_test.cc +++ b/test/boost/memtable_test.cc @@ -158,7 +158,7 @@ SEASTAR_TEST_CASE(test_memtable_flush_reader) { const auto now = gc_clock::now(); auto compacted_muts = muts; for (auto& mut : compacted_muts) { - mut.partition().compact_for_compaction(*mut.schema(), always_gc, now); + mut.partition().compact_for_compaction(*mut.schema(), always_gc, mut.decorated_key(), now); } testlog.info("Simple read"); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 6a6853d525..e2c7f72773 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -1029,7 +1029,7 @@ sstables::shared_sstable create_sstable(sstables::test_env& env, schema_ptr s, s static mutation compacted(const mutation& m) { auto result = m; - result.partition().compact_for_compaction(*result.schema(), always_gc, gc_clock::now()); + result.partition().compact_for_compaction(*result.schema(), always_gc, result.decorated_key(), gc_clock::now()); return result; } @@ -2710,7 +2710,8 @@ SEASTAR_THREAD_TEST_CASE(test_compacting_reader_as_mutation_source) { streamed_mutation::forwarding fwd_sm, mutation_reader::forwarding fwd_mr) mutable { auto source = mt->make_flat_reader(s, std::move(permit), range, slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr); - auto mr = make_compacting_reader(std::move(source), query_time, [] (const dht::decorated_key&) { return api::min_timestamp; }); + auto mr = make_compacting_reader(std::move(source), query_time, + [] (const dht::decorated_key&) { return api::min_timestamp; }); if (single_fragment_buffer) { mr.set_max_buffer_size(1); } @@ -2761,7 +2762,8 @@ SEASTAR_THREAD_TEST_CASE(test_compacting_reader_next_partition) { } auto mr = make_compacting_reader(make_flat_mutation_reader_from_fragments(ss.schema(), permit, std::move(mfs)), - gc_clock::now(), [] (const dht::decorated_key&) { return api::min_timestamp; }); + gc_clock::now(), + [] (const dht::decorated_key&) { return api::min_timestamp; }); mr.set_max_buffer_size(buffer_size); return mr; diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 937abab644..2574ad8b91 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -457,7 +457,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_collection_allocation) { auto res_mut_opt = read_mutation_from_flat_mutation_reader(rd).get0(); BOOST_REQUIRE(res_mut_opt); - res_mut_opt->partition().compact_for_query(*schema, gc_clock::now(), {query::full_clustering_range}, true, false, + res_mut_opt->partition().compact_for_query(*schema, res_mut_opt->decorated_key(), gc_clock::now(), {query::full_clustering_range}, true, false, std::numeric_limits::max()); const auto stats_after = memory::stats(); @@ -1245,7 +1245,7 @@ SEASTAR_TEST_CASE(test_mutation_hash) { static mutation compacted(const mutation& m) { auto result = m; - result.partition().compact_for_compaction(*result.schema(), always_gc, gc_clock::now()); + result.partition().compact_for_compaction(*result.schema(), always_gc, result.decorated_key(), gc_clock::now()); return result; } @@ -1638,7 +1638,7 @@ SEASTAR_TEST_CASE(test_tombstone_purge) { tombstone tomb(api::new_timestamp(), gc_clock::now() - std::chrono::seconds(1)); m.partition().apply(tomb); BOOST_REQUIRE(!m.partition().empty()); - m.partition().compact_for_compaction(*s, always_gc, gc_clock::now()); + m.partition().compact_for_compaction(*s, always_gc, m.decorated_key(), gc_clock::now()); // Check that row was covered by tombstone. BOOST_REQUIRE(m.partition().empty()); // Check that tombstone was purged after compact_for_compaction(). @@ -1744,11 +1744,11 @@ SEASTAR_TEST_CASE(test_trim_rows) { auto compact_and_expect_empty = [&] (mutation m, std::vector ranges) { mutation m2 = m; - m.partition().compact_for_query(*s, now, ranges, false, false, query::max_rows); + m.partition().compact_for_query(*s, m.decorated_key(), now, ranges, false, false, query::max_rows); BOOST_REQUIRE(m.partition().clustered_rows().empty()); std::reverse(ranges.begin(), ranges.end()); - m2.partition().compact_for_query(*s, now, ranges, false, true, query::max_rows); + m2.partition().compact_for_query(*s, m2.decorated_key(), now, ranges, false, true, query::max_rows); BOOST_REQUIRE(m2.partition().clustered_rows().empty()); }; @@ -1830,8 +1830,8 @@ SEASTAR_TEST_CASE(test_mutation_diff_with_random_generator) { if (s != m2.schema()) { return; } - m1.partition().compact_for_compaction(*s, never_gc, now); - m2.partition().compact_for_compaction(*s, never_gc, now); + m1.partition().compact_for_compaction(*s, never_gc, m1.decorated_key(), now); + m2.partition().compact_for_compaction(*s, never_gc, m2.decorated_key(), now); auto m12 = m1; m12.apply(m2); auto m12_with_diff = m1; @@ -2949,6 +2949,7 @@ void run_compaction_data_stream_split_test(const schema& schema, reader_permit p auto get_max_purgeable = [] (const dht::decorated_key&) { return api::max_timestamp; }; + auto gc_grace_seconds = schema.gc_grace_seconds(); auto consumer = make_stable_flattened_mutations_consumer>( schema, query_time, diff --git a/test/boost/mutation_writer_test.cc b/test/boost/mutation_writer_test.cc index bc81d33c2e..30d26a589e 100644 --- a/test/boost/mutation_writer_test.cc +++ b/test/boost/mutation_writer_test.cc @@ -371,12 +371,12 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer) { const auto now = gc_clock::now(); for (auto& m : muts) { - m.partition().compact_for_compaction(*random_schema.schema(), always_gc, now); + m.partition().compact_for_compaction(*random_schema.schema(), always_gc, m.decorated_key(), now); } std::vector combined_mutations; while (auto m = read_mutation_from_flat_mutation_reader(reader).get0()) { - m->partition().compact_for_compaction(*random_schema.schema(), always_gc, now); + m->partition().compact_for_compaction(*random_schema.schema(), always_gc, m->decorated_key(), now); combined_mutations.emplace_back(std::move(*m)); } diff --git a/test/boost/mvcc_test.cc b/test/boost/mvcc_test.cc index 2fcda87ed6..3937e205b8 100644 --- a/test/boost/mvcc_test.cc +++ b/test/boost/mvcc_test.cc @@ -695,8 +695,8 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) { // Drop empty rows can_gc_fn never_gc = [] (tombstone) { return false; }; - actual.compact_for_compaction(*s, never_gc, gc_clock::now()); - expected.compact_for_compaction(*s, never_gc, gc_clock::now()); + actual.compact_for_compaction(*s, never_gc, m1.decorated_key(), gc_clock::now()); + expected.compact_for_compaction(*s, never_gc, m1.decorated_key(), gc_clock::now()); assert_that(s, actual).is_equal_to(expected); } diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index e47c34980b..8254ae2967 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -150,8 +150,8 @@ public: const sstables::sstable_set& get_sstable_set() const override { return _t->get_sstable_set(); } - std::unordered_set fully_expired_sstables(const std::vector& sstables) const override { - return sstables::get_fully_expired_sstables(_t->as_table_state(), sstables, gc_clock::now() - schema()->gc_grace_seconds()); + std::unordered_set fully_expired_sstables(const std::vector& sstables, gc_clock::time_point query_time) const override { + return sstables::get_fully_expired_sstables(_t->as_table_state(), sstables, query_time); } const std::vector& compacted_undeleted_sstables() const noexcept override { return _compacted_undeleted; @@ -1394,7 +1394,7 @@ SEASTAR_TEST_CASE(get_fully_expired_sstables_test) { auto sst2 = add_sstable_for_overlapping_test(env, cf, /*gen*/2, min_key, key_and_token_pair[2].first, build_stats(t0, t1, std::numeric_limits::max())); auto sst3 = add_sstable_for_overlapping_test(env, cf, /*gen*/3, min_key, max_key, build_stats(t3, t4, std::numeric_limits::max())); std::vector compacting = { sst1, sst2 }; - auto expired = get_fully_expired_sstables(cf->as_table_state(), compacting, /*gc before*/gc_clock::from_time_t(15)); + auto expired = get_fully_expired_sstables(cf->as_table_state(), compacting, /*gc before*/gc_clock::from_time_t(15) + cf->schema()->gc_grace_seconds()); BOOST_REQUIRE(expired.size() == 0); } @@ -1406,7 +1406,7 @@ SEASTAR_TEST_CASE(get_fully_expired_sstables_test) { auto sst2 = add_sstable_for_overlapping_test(env, cf, /*gen*/2, min_key, key_and_token_pair[2].first, build_stats(t2, t3, std::numeric_limits::max())); auto sst3 = add_sstable_for_overlapping_test(env, cf, /*gen*/3, min_key, max_key, build_stats(t3, t4, std::numeric_limits::max())); std::vector compacting = { sst1, sst2 }; - auto expired = get_fully_expired_sstables(cf->as_table_state(), compacting, /*gc before*/gc_clock::from_time_t(25)); + auto expired = get_fully_expired_sstables(cf->as_table_state(), compacting, /*gc before*/gc_clock::from_time_t(25) + cf->schema()->gc_grace_seconds()); BOOST_REQUIRE(expired.size() == 1); auto expired_sst = *expired.begin(); BOOST_REQUIRE(expired_sst->generation() == 1); @@ -3370,6 +3370,7 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) { auto gc_now = gc_clock::now(); gc_before = gc_now - s->gc_grace_seconds(); + auto gc_grace_seconds = s->gc_grace_seconds(); auto cfc = make_stable_flattened_mutations_consumer>( *s, gc_now, max_purgeable_func, std::move(cr), std::move(purged_cr)); diff --git a/test/lib/flat_mutation_reader_assertions.hh b/test/lib/flat_mutation_reader_assertions.hh index 043a45414a..1cfa893a2a 100644 --- a/test/lib/flat_mutation_reader_assertions.hh +++ b/test/lib/flat_mutation_reader_assertions.hh @@ -501,7 +501,7 @@ public: BOOST_REQUIRE(bool(mo)); memory::scoped_critical_alloc_section dfg; mutation got = *mo; - got.partition().compact_for_compaction(*m.schema(), always_gc, query_time); + got.partition().compact_for_compaction(*m.schema(), always_gc, got.decorated_key(), query_time); assert_that(got).is_equal_to(m, ck_ranges); return *this; } @@ -912,7 +912,7 @@ public: BOOST_REQUIRE(bool(mo)); memory::scoped_critical_alloc_section dfg; mutation got = *mo; - got.partition().compact_for_compaction(*m.schema(), always_gc, query_time); + got.partition().compact_for_compaction(*m.schema(), always_gc, got.decorated_key(), query_time); assert_that(got).is_equal_to(m, ck_ranges); return *this; } diff --git a/test/lib/mutation_source_test.cc b/test/lib/mutation_source_test.cc index 364bf36d10..324cc67a03 100644 --- a/test/lib/mutation_source_test.cc +++ b/test/lib/mutation_source_test.cc @@ -945,7 +945,7 @@ void test_all_data_is_read_back(tests::reader_concurrency_semaphore_wrapper& sem for_each_mutation([&semaphore, &populate, query_time] (const mutation& m) mutable { auto ms = populate(m.schema(), {m}, query_time); mutation copy(m); - copy.partition().compact_for_compaction(*copy.schema(), always_gc, query_time); + copy.partition().compact_for_compaction(*copy.schema(), always_gc, copy.decorated_key(), query_time); assert_that(ms.make_reader(m.schema(), semaphore.make_permit())).produces_compacted(copy, query_time); }); } @@ -1623,7 +1623,7 @@ void test_reader_conversions(tests::reader_concurrency_semaphore_wrapper& semaph const auto query_time = gc_clock::now(); mutation m_compacted(m); - m_compacted.partition().compact_for_compaction(*m_compacted.schema(), always_gc, query_time); + m_compacted.partition().compact_for_compaction(*m_compacted.schema(), always_gc, m_compacted.decorated_key(), query_time); { auto rd = ms.make_reader_v2(m.schema(), semaphore.make_permit()); diff --git a/tombstone_gc.cc b/tombstone_gc.cc new file mode 100644 index 0000000000..2e72483e2b --- /dev/null +++ b/tombstone_gc.cc @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2021-present ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include +#include +#include +#include "schema.hh" +#include "dht/i_partitioner.hh" +#include "gc_clock.hh" +#include "tombstone_gc.hh" +#include "locator/token_metadata.hh" +#include "exceptions/exceptions.hh" +#include "locator/abstract_replication_strategy.hh" +#include "database.hh" +#include "gms/feature_service.hh" + +extern logging::logger dblog; + +class repair_history_map { +public: + boost::icl::interval_map map; +}; + +thread_local std::unordered_map> repair_history_maps; + +static seastar::lw_shared_ptr get_or_create_repair_history_map_for_table(const utils::UUID& id) { + auto it = repair_history_maps.find(id); + if (it != repair_history_maps.end()) { + return it->second; + } else { + repair_history_maps[id] = seastar::make_lw_shared(); + return repair_history_maps[id]; + } +} + +seastar::lw_shared_ptr get_repair_history_map_for_table(const utils::UUID& id) { + auto it = repair_history_maps.find(id); + if (it != repair_history_maps.end()) { + return it->second; + } else { + return {}; + } +} + +void drop_repair_history_map_for_table(const utils::UUID& id) { + repair_history_maps.erase(id); +} + +// This is useful for a sstable to query a gc_before for a range. The range is +// defined by the first and last key in the sstable. +// +// The min_gc_before and max_gc_before returned are the min and max gc_before for all the keys in the range. +// +// The knows_entire_range is set to true: +// 1) if the tombstone_gc_mode is not repair, since we have the same value for all the keys in the ranges. +// 2) if the tombstone_gc_mode is repair, and the range is a sub range of a range in the repair history map. +get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time) { + bool knows_entire_range = true; + const auto& options = s->tombstone_gc_options(); + switch (options.mode()) { + case tombstone_gc_mode::timeout: { + dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=timeout", s->ks_name(), s->cf_name(), range); + auto gc_before = saturating_subtract(query_time, s->gc_grace_seconds()); + return {gc_before, gc_before, knows_entire_range}; + } + case tombstone_gc_mode::disabled: { + dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=disabled", s->ks_name(), s->cf_name(), range); + return {gc_clock::time_point::min(), gc_clock::time_point::min(), knows_entire_range}; + } + case tombstone_gc_mode::immediate: { + dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=immediate", s->ks_name(), s->cf_name(), range); + return {gc_clock::time_point::max(), gc_clock::time_point::max(), knows_entire_range}; + } + case tombstone_gc_mode::repair: { + const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds(); + auto min_gc_before = gc_clock::time_point::min(); + auto max_gc_before = gc_clock::time_point::min(); + auto min_repair_timestamp = gc_clock::time_point::min(); + auto max_repair_timestamp = gc_clock::time_point::min(); + int hits = 0; + knows_entire_range = false; + auto m = get_repair_history_map_for_table(s->id()); + if (m) { + auto interval = locator::token_metadata::range_to_interval(range); + auto min = gc_clock::time_point::max(); + auto max = gc_clock::time_point::min(); + bool contains_all = false; + for (auto& x : boost::make_iterator_range(m->map.equal_range(interval))) { + auto r = locator::token_metadata::interval_to_range(x.first); + min = std::min(x.second, min); + max = std::max(x.second, max); + if (++hits == 1 && r.contains(range, dht::tri_compare)) { + contains_all = true; + } + } + if (hits == 0) { + min_repair_timestamp = gc_clock::time_point::min(); + max_repair_timestamp = gc_clock::time_point::min(); + } else { + knows_entire_range = hits == 1 && contains_all; + min_repair_timestamp = min; + max_repair_timestamp = max; + } + min_gc_before = saturating_subtract(min_repair_timestamp, propagation_delay); + max_gc_before = saturating_subtract(max_repair_timestamp, propagation_delay); + }; + dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=repair, min_repair_timestamp={}, max_repair_timestamp={}, propagation_delay={}, min_gc_before={}, max_gc_before={}, hits={}, knows_entire_range={}", + s->ks_name(), s->cf_name(), range, min_repair_timestamp, max_repair_timestamp, propagation_delay.count(), min_gc_before, max_gc_before, hits, knows_entire_range); + return {min_gc_before, max_gc_before, knows_entire_range}; + } + } +} + +gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time) { + // if mode = timeout // default option, if user does not specify tombstone_gc options + // if mode = disabled // never gc tombstone + // if mode = immediate // can gc tombstone immediately + // if mode = repair // gc after repair + const auto& options = s->tombstone_gc_options(); + switch (options.mode()) { + case tombstone_gc_mode::timeout: + dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=timeout", s->ks_name(), s->cf_name(), dk); + return saturating_subtract(query_time, s->gc_grace_seconds()); + case tombstone_gc_mode::disabled: + dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=disabled", s->ks_name(), s->cf_name(), dk); + return gc_clock::time_point::min(); + case tombstone_gc_mode::immediate: + dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=immediate", s->ks_name(), s->cf_name(), dk); + return gc_clock::time_point::max(); + case tombstone_gc_mode::repair: + const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds(); + auto gc_before = gc_clock::time_point::min(); + auto repair_timestamp = gc_clock::time_point::min(); + auto m = get_repair_history_map_for_table(s->id()); + if (m) { + const auto it = m->map.find(dk.token()); + if (it == m->map.end()) { + gc_before = gc_clock::time_point::min(); + } else { + repair_timestamp = it->second; + gc_before = saturating_subtract(repair_timestamp, propagation_delay); + } + } + dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=repair, repair_timestamp={}, propagation_delay={}, gc_before={}", + s->ks_name(), s->cf_name(), dk, repair_timestamp, propagation_delay.count(), gc_before); + return gc_before; + } +} + +void update_repair_time(schema_ptr s, const dht::token_range& range, gc_clock::time_point repair_time) { + auto m = get_or_create_repair_history_map_for_table(s->id()); + m->map += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time); +} + +static bool needs_repair_before_gc(const database& db, sstring ks_name) { + // If a table uses local replication strategy or rf one, there is no + // need to run repair even if tombstone_gc mode = repair. + auto& ks = db.find_keyspace(ks_name); + auto& rs = ks.get_replication_strategy(); + auto erm = ks.get_effective_replication_map(); + bool needs_repair = rs.get_type() != locator::replication_strategy_type::local + && erm->get_replication_factor() != 1; + return needs_repair; +} + +void validate_tombstone_gc_options(const tombstone_gc_options* options, const database& db, sstring ks_name) { + if (!options) { + return; + } + if (!db.features().cluster_supports_tombstone_gc_options()) { + throw exceptions::configuration_exception("tombstone_gc option not supported by the cluster"); + } + + if (options->mode() == tombstone_gc_mode::repair && !needs_repair_before_gc(db, ks_name)) { + throw exceptions::configuration_exception("tombstone_gc option with mode = repair not supported for table with RF one or local replication strategy"); + } +} diff --git a/tombstone_gc.hh b/tombstone_gc.hh new file mode 100644 index 0000000000..fe10fb450c --- /dev/null +++ b/tombstone_gc.hh @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2021-present ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include +#include "gc_clock.hh" +#include "dht/token.hh" +#include "schema_fwd.hh" + +namespace dht { + +class decorated_key; + +using token_range = nonwrapping_range; + +} + +struct get_gc_before_for_range_result { + gc_clock::time_point min_gc_before; + gc_clock::time_point max_gc_before; + bool knows_entire_range; +}; + +void drop_repair_history_map_for_table(const utils::UUID& id); + +get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time); + +gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time); + +void update_repair_time(schema_ptr s, const dht::token_range& range, gc_clock::time_point repair_time); + +void validate_tombstone_gc_options(const tombstone_gc_options* options, const database& db, sstring ks_name); diff --git a/tombstone_gc_extension.hh b/tombstone_gc_extension.hh new file mode 100644 index 0000000000..60f70e2344 --- /dev/null +++ b/tombstone_gc_extension.hh @@ -0,0 +1,56 @@ +/* + * Copyright 2021-present ScyllaDB + */ +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include + +#include "bytes.hh" +#include "serializer.hh" +#include "db/extensions.hh" +#include "schema.hh" +#include "serializer_impl.hh" +#include "tombstone_gc_options.hh" + +class tombstone_gc_extension : public schema_extension { + tombstone_gc_options _tombstone_gc_options; +public: + static constexpr auto NAME = "tombstone_gc"; + + tombstone_gc_extension() = default; + tombstone_gc_extension(const tombstone_gc_options& opts) : _tombstone_gc_options(opts) {} + explicit tombstone_gc_extension(std::map tags) : _tombstone_gc_options(std::move(tags)) {} + explicit tombstone_gc_extension(const bytes& b) : _tombstone_gc_options(tombstone_gc_extension::deserialize(b)) {} + explicit tombstone_gc_extension(const seastar::sstring& s) { + throw std::logic_error("Cannot create tombstone_gc_extension info from string"); + } + bytes serialize() const override { + return ser::serialize_to_buffer(_tombstone_gc_options.to_map()); + } + static std::map deserialize(const bytes_view& buffer) { + return ser::deserialize_from_buffer(buffer, boost::type>()); + } + const tombstone_gc_options& get_options() const { + return _tombstone_gc_options; + } +}; + diff --git a/tombstone_gc_options.cc b/tombstone_gc_options.cc new file mode 100644 index 0000000000..2ef6787e07 --- /dev/null +++ b/tombstone_gc_options.cc @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2021-present ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + + +#include "tombstone_gc_options.hh" +#include "exceptions/exceptions.hh" +#include +#include +#include +#include "utils/rjson.hh" + +tombstone_gc_options::tombstone_gc_options(const std::map& map) { + for (const auto& x : map) { + if (x.first == "mode") { + if (x.second == "disabled") { + _mode = tombstone_gc_mode::disabled; + } else if (x.second == "repair") { + _mode = tombstone_gc_mode::repair; + } else if (x.second == "timeout") { + _mode = tombstone_gc_mode::timeout; + } else if (x.second == "immediate") { + _mode = tombstone_gc_mode::immediate; + } else { + throw exceptions::configuration_exception(format("Invalid value for tombstone_gc option mode: {}", x.second)); + } + } else if (x.first == "propagation_delay_in_seconds") { + try { + auto seconds = boost::lexical_cast(x.second); + if (seconds < 0) { + throw exceptions::configuration_exception(format("Invalid value for tombstone_gc option propagation_delay_in_seconds: {}", x.second)); + } + _propagation_delay_in_seconds = std::chrono::seconds(seconds); + } catch (...) { + throw exceptions::configuration_exception(format("Invalid value for tombstone_gc option propagation_delay_in_seconds: {}", x.second)); + } + } else { + throw exceptions::configuration_exception(format("Invalid tombstone_gc option: {}", x.first)); + } + } +} + +std::map tombstone_gc_options::to_map() const { + std::map res = { + {"mode", format("{}", _mode)}, + {"propagation_delay_in_seconds", format("{}", _propagation_delay_in_seconds.count())}, + }; + return res; +} + +seastar::sstring tombstone_gc_options::to_sstring() const { + return rjson::print(rjson::from_string_map(to_map())); +} + +bool +tombstone_gc_options::operator==(const tombstone_gc_options& other) const { + return _mode == other._mode && _propagation_delay_in_seconds == other._propagation_delay_in_seconds; +} + +bool +tombstone_gc_options::operator!=(const tombstone_gc_options& other) const { + return !(*this == other); +} + +std::ostream& operator<<(std::ostream& os, const tombstone_gc_mode& mode) { + switch (mode) { + case tombstone_gc_mode::timeout: return os << "timeout"; + case tombstone_gc_mode::disabled: return os << "disabled"; + case tombstone_gc_mode::immediate: return os << "immediate"; + case tombstone_gc_mode::repair: return os << "repair"; + } + return os << "unknown"; +} diff --git a/tombstone_gc_options.hh b/tombstone_gc_options.hh new file mode 100644 index 0000000000..f42d4d304a --- /dev/null +++ b/tombstone_gc_options.hh @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2021-present ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include +#include +#include + +enum class tombstone_gc_mode : uint8_t { timeout, disabled, immediate, repair }; + +class tombstone_gc_options { +private: + tombstone_gc_mode _mode = tombstone_gc_mode::timeout; + std::chrono::seconds _propagation_delay_in_seconds = std::chrono::seconds(3600); +public: + tombstone_gc_options() = default; + const tombstone_gc_mode& mode() const { return _mode; } + explicit tombstone_gc_options(const std::map& map); + const std::chrono::seconds& propagation_delay_in_seconds() const { + return _propagation_delay_in_seconds; + } + std::map to_map() const; + seastar::sstring to_sstring() const; + bool operator==(const tombstone_gc_options& other) const; + bool operator!=(const tombstone_gc_options& other) const; +}; + +std::ostream& operator<<(std::ostream& os, const tombstone_gc_mode& m);