diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 09eefc69cd..ab8fd9df84 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -148,25 +148,6 @@ std::ostream& operator<<(std::ostream& os, compaction_type_options::scrub::quara return os << to_string(quarantine_mode); } -std::ostream& operator<<(std::ostream& os, pretty_printed_data_size data) { - static constexpr const char* suffixes[] = { " bytes", "kB", "MB", "GB", "TB", "PB" }; - - unsigned exp = 0; - while ((data._size >= 1000) && (exp < sizeof(suffixes))) { - exp++; - data._size /= 1000; - } - - os << data._size << suffixes[exp]; - return os; -} - -std::ostream& operator<<(std::ostream& os, pretty_printed_throughput tp) { - uint64_t throughput = tp._duration.count() > 0 ? tp._size / tp._duration.count() : 0; - os << pretty_printed_data_size(throughput) << "/s"; - return os; -} - static api::timestamp_type get_max_purgeable_timestamp(const table_state& table_s, sstable_set::incremental_selector& selector, const std::unordered_set& compacting_set, const dht::decorated_key& dk, uint64_t& bloom_filter_checks) { if (!table_s.tombstone_gc_enabled()) [[unlikely]] { @@ -806,8 +787,8 @@ protected: // By the time being, using estimated key count. log_info("{} {} sstables to {}. {} to {} (~{}% of original) in {}ms = {}. ~{} total partitions merged to {}.", report_finish_desc(), - _input_sstable_generations.size(), new_sstables_msg, pretty_printed_data_size(_start_size), pretty_printed_data_size(_end_size), int(ratio * 100), - std::chrono::duration_cast(duration).count(), pretty_printed_throughput(_end_size, duration), + _input_sstable_generations.size(), new_sstables_msg, utils::pretty_printed_data_size(_start_size), utils::pretty_printed_data_size(_end_size), int(ratio * 100), + std::chrono::duration_cast(duration).count(), utils::pretty_printed_throughput(_end_size, duration), _cdata.total_partitions, _cdata.total_keys_written); return ret; @@ -944,7 +925,7 @@ void compacted_fragments_writer::split_large_partition() { _c.log_debug("Closing active tombstone {} with {} for partition {}", _current_partition.current_emitted_tombstone, rtc, *_current_partition.dk); _compaction_writer->writer.consume(std::move(rtc)); } - _c.log_debug("Splitting large partition {} in order to respect SSTable size limit of {}", *_current_partition.dk, pretty_printed_data_size(_c._max_sstable_size)); + _c.log_debug("Splitting large partition {} in order to respect SSTable size limit of {}", *_current_partition.dk, utils::pretty_printed_data_size(_c._max_sstable_size)); // Close partition in current writer, and open it again in a new writer. do_consume_end_of_partition(); stop_current_writer(); diff --git a/compaction/compaction.hh b/compaction/compaction.hh index 4db5442216..652c70fb56 100644 --- a/compaction/compaction.hh +++ b/compaction/compaction.hh @@ -14,6 +14,7 @@ #include "gc_clock.hh" #include "compaction_weight_registration.hh" #include "utils/UUID.hh" +#include "utils/pretty_printers.hh" #include "table_state.hh" #include #include @@ -24,21 +25,6 @@ namespace sstables { bool is_eligible_for_compaction(const sstables::shared_sstable& sst) noexcept; -class pretty_printed_data_size { - uint64_t _size; -public: - pretty_printed_data_size(uint64_t size) : _size(size) {} - friend std::ostream& operator<<(std::ostream&, pretty_printed_data_size); -}; - -class pretty_printed_throughput { - uint64_t _size; - std::chrono::duration _duration; -public: - pretty_printed_throughput(uint64_t size, std::chrono::duration dur) : _size(size), _duration(std::move(dur)) {} - friend std::ostream& operator<<(std::ostream&, pretty_printed_throughput); -}; - // Return the name of the compaction type // as used over the REST api, e.g. "COMPACTION" or "CLEANUP". sstring compaction_name(compaction_type type); diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index 84ca9e4635..d8988454ec 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -11,6 +11,7 @@ #include "replica/database.hh" #include "sstables/sstables.hh" #include "sstables/sstable_directory.hh" +#include "utils/pretty_printers.hh" namespace compaction { @@ -295,7 +296,7 @@ future<> table_reshaping_compaction_task_impl::run() { if (total_size > 0) { auto duration = std::chrono::duration_cast>(std::chrono::steady_clock::now() - start); - dblog.info("Reshaped {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration)); + dblog.info("Reshaped {} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), duration.count(), utils::pretty_printed_throughput(total_size, duration)); } } diff --git a/configure.py b/configure.py index f20f8f5f33..37a8704873 100755 --- a/configure.py +++ b/configure.py @@ -762,6 +762,7 @@ scylla_core = (['message/messaging_service.cc', 'utils/rjson.cc', 'utils/human_readable.cc', 'utils/histogram_metrics_helper.cc', + 'utils/pretty_printers.cc', 'converting_mutation_partition_applier.cc', 'readers/combined.cc', 'readers/multishard.cc', diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 4ba4c58fb0..bbb703200d 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -16,6 +16,7 @@ #include "sstables/progress_monitor.hh" #include "readers/evictable.hh" #include "dht/partition_filter.hh" +#include "utils/pretty_printers.hh" static logging::logger vug_logger("view_update_generator"); @@ -127,9 +128,9 @@ future<> view_update_generator::start() { auto& [t, sstables] = *table_it; schema_ptr s = t->schema(); - vug_logger.trace("Processing {}.{}: {} sstables", s->ks_name(), s->cf_name(), sstables.size()); - const auto num_sstables = sstables.size(); + auto start_time = db_clock::now(); + uint64_t input_size = 0; try { // Exploit the fact that sstables in the staging directory @@ -138,8 +139,12 @@ future<> view_update_generator::start() { auto ssts = make_lw_shared(sstables::make_partitioned_sstable_set(s, false)); for (auto& sst : sstables) { ssts->insert(sst); + input_size += sst->data_size(); } + vug_logger.info("Processing {}.{}: {} in {} sstables", + s->ks_name(), s->cf_name(), utils::pretty_printed_data_size(input_size), sstables.size()); + auto permit = _db.obtain_reader_permit(*t, "view_update_generator", db::no_timeout, {}).get0(); auto ms = mutation_source([this, ssts] ( schema_ptr s, @@ -184,6 +189,12 @@ future<> view_update_generator::start() { vug_logger.warn("Moving {} from staging failed: {}:{}. Ignoring...", s->ks_name(), s->cf_name(), std::current_exception()); } _registration_sem.signal(num_sstables); + + auto end_time = db_clock::now(); + auto duration = std::chrono::duration(end_time - start_time); + vug_logger.info("Processed {}.{}: {} sstables in {}ms = {}", s->ks_name(), s->cf_name(), sstables.size(), + std::chrono::duration_cast(duration).count(), + utils::pretty_printed_throughput(input_size, duration)); } // For each table, move the processed staging sstables into the table's base dir. for (auto it = _sstables_to_move.begin(); it != _sstables_to_move.end(); ) { diff --git a/replica/database.hh b/replica/database.hh index d8f29a9ac4..4ee69903a0 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -590,7 +590,8 @@ private: const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const; + mutation_reader::forwarding fwd_mr, + const sstables::sstable_predicate& = sstables::default_sstable_predicate()) const; lw_shared_ptr make_maintenance_sstable_set() const; lw_shared_ptr make_compound_sstable_set(); @@ -667,9 +668,8 @@ public: tracing::trace_state_ptr trace_state = nullptr, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const; - flat_mutation_reader_v2 make_reader_v2_excluding_sstables(schema_ptr schema, + flat_mutation_reader_v2 make_reader_v2_excluding_staging(schema_ptr schema, reader_permit permit, - std::vector& sst, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state = nullptr, @@ -706,7 +706,7 @@ public: sstables::shared_sstable make_streaming_staging_sstable(); mutation_source as_mutation_source() const; - mutation_source as_mutation_source_excluding(std::vector& sst) const; + mutation_source as_mutation_source_excluding_staging() const; void set_virtual_reader(mutation_source virtual_reader) { _virtual_reader = std::move(virtual_reader); diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index af0eb012c1..14944e2f77 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -278,7 +278,7 @@ future<> run_resharding_jobs(sharded& dir, std::vec } auto start = std::chrono::steady_clock::now(); - dblog.info("Resharding {} for {}.{}", sstables::pretty_printed_data_size(total_size), ks_name, table_name); + dblog.info("Resharding {} for {}.{}", utils::pretty_printed_data_size(total_size), ks_name, table_name); co_await dir.invoke_on_all(coroutine::lambda([&] (sstables::sstable_directory& d) -> future<> { auto& table = db.local().find_column_family(ks_name, table_name); @@ -293,7 +293,7 @@ future<> run_resharding_jobs(sharded& dir, std::vec })); auto duration = std::chrono::duration_cast>(std::chrono::steady_clock::now() - start); - dblog.info("Resharded {} for {}.{} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), ks_name, table_name, duration.count(), sstables::pretty_printed_throughput(total_size, duration)); + dblog.info("Resharded {} for {}.{} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), ks_name, table_name, duration.count(), utils::pretty_printed_throughput(total_size, duration)); } // Global resharding function. Done in two parts: diff --git a/replica/table.cc b/replica/table.cc index be7d435b9e..cd793c75b3 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -97,7 +97,8 @@ table::make_sstable_reader(schema_ptr s, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstables::sstable_predicate& predicate) const { // CAVEAT: if make_sstable_reader() is called on a single partition // we want to optimize and read exactly this partition. As a // consequence, fast_forward_to() will *NOT* work on the result, @@ -109,10 +110,10 @@ table::make_sstable_reader(schema_ptr s, } return sstables->create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), - _stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr); + _stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate); } else { return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, - std::move(trace_state), fwd, fwd_mr); + std::move(trace_state), fwd, fwd_mr, default_read_monitor_generator(), predicate); } } @@ -2551,9 +2552,8 @@ void table::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept { } flat_mutation_reader_v2 -table::make_reader_v2_excluding_sstables(schema_ptr s, +table::make_reader_v2_excluding_staging(schema_ptr s, reader_permit permit, - std::vector& excluded, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, @@ -2565,16 +2565,11 @@ table::make_reader_v2_excluding_sstables(schema_ptr s, readers.reserve(memtable_count + 1); }); - auto excluded_ssts = boost::copy_range>(excluded); - auto effective_sstables = make_lw_shared(_compaction_strategy.make_sstable_set(_schema)); - _sstables->for_each_sstable([&excluded_ssts, &effective_sstables] (const sstables::shared_sstable& sst) mutable { - if (excluded_ssts.contains(sst)) { - return; - } - effective_sstables->insert(sst); - }); + static std::predicate auto excl_staging_predicate = [] (const sstable& sst) { + return !sst.requires_view_building(); + }; - readers.emplace_back(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, std::move(trace_state), fwd, fwd_mr)); + readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr, excl_staging_predicate)); return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr); } @@ -2714,22 +2709,22 @@ table::stream_view_replica_updates(shared_ptr g s, std::move(m), timeout, - as_mutation_source_excluding(excluded_sstables), + as_mutation_source_excluding_staging(), tracing::trace_state_ptr(), *_config.streaming_read_concurrency_semaphore, query::partition_slice::option_set::of()); } mutation_source -table::as_mutation_source_excluding(std::vector& ssts) const { - return mutation_source([this, &ssts] (schema_ptr s, +table::as_mutation_source_excluding_staging() const { + return mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return this->make_reader_v2_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, std::move(trace_state), fwd, fwd_mr); + return this->make_reader_v2_excluding_staging(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr); }); } diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index 2b07bacb71..bacd457ae7 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -830,10 +830,22 @@ make_pk_filter(const dht::ring_position& pos, const schema& schema) { }; } -// Filter out sstables for reader using bloom filter +const sstable_predicate& default_sstable_predicate() { + static const sstable_predicate predicate = [] (const sstable&) { return true; }; + return predicate; +} + +static std::predicate auto +make_sstable_filter(const dht::ring_position& pos, const schema& schema, const sstable_predicate& predicate) { + return [pk_filter = make_pk_filter(pos, schema), &predicate] (const sstable& sst) { + return predicate(sst) && pk_filter(sst); + }; +} + +// Filter out sstables for reader using bloom filter and supplied predicate static std::vector -filter_sstable_for_reader_by_pk(std::vector&& sstables, const schema& schema, const dht::ring_position& pos) { - auto filter = [_filter = make_pk_filter(pos, schema)] (const shared_sstable& sst) { return !_filter(*sst); }; +filter_sstable_for_reader(std::vector&& sstables, const schema& schema, const dht::ring_position& pos, const sstable_predicate& predicate) { + auto filter = [_filter = make_sstable_filter(pos, schema, predicate)] (const shared_sstable& sst) { return !_filter(*sst); }; sstables.erase(boost::remove_if(sstables, filter), sstables.end()); return std::move(sstables); } @@ -887,10 +899,11 @@ sstable_set_impl::create_single_key_sstable_reader( const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { const auto& pos = pr.start()->value(); - auto selected_sstables = filter_sstable_for_reader_by_pk(select(pr), *schema, pos); + auto selected_sstables = filter_sstable_for_reader(select(pr), *schema, pos, predicate); auto num_sstables = selected_sstables.size(); if (!num_sstables) { return make_empty_flat_reader_v2(schema, permit); @@ -929,7 +942,8 @@ time_series_sstable_set::create_single_key_sstable_reader( const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd_sm, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { const auto& pos = pr.start()->value(); // First check if the optimized algorithm for TWCS single partition queries can be applied. // Multiple conditions must be satisfied: @@ -951,11 +965,11 @@ time_series_sstable_set::create_single_key_sstable_reader( // Some of the conditions were not satisfied so we use the standard query path. return sstable_set_impl::create_single_key_sstable_reader( cf, std::move(schema), std::move(permit), sstable_histogram, - pr, slice, std::move(trace_state), fwd_sm, fwd_mr); + pr, slice, std::move(trace_state), fwd_sm, fwd_mr, predicate); } - auto pk_filter = make_pk_filter(pos, *schema); - auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return pk_filter(*e.second); }); + auto sst_filter = make_sstable_filter(pos, *schema, predicate); + auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return sst_filter(*e.second); }); if (it == _sstables->end()) { // No sstables contain data for the queried partition. return make_empty_flat_reader_v2(std::move(schema), std::move(permit)); @@ -968,6 +982,7 @@ time_series_sstable_set::create_single_key_sstable_reader( return sst.make_reader(schema, permit, pr, slice, trace_state, fwd_sm); }; + auto pk_filter = make_pk_filter(pos, *schema); auto ck_filter = [ranges = slice.get_all_ranges()] (const sstable& sst) { return sst.may_contain_rows(ranges); }; // We're going to pass this filter into sstable_position_reader_queue. The queue guarantees that @@ -1168,7 +1183,8 @@ compound_sstable_set::create_single_key_sstable_reader( const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { auto sets = _sets; auto it = std::partition(sets.begin(), sets.end(), [] (const auto& set) { return set->size() > 0; }); auto non_empty_set_count = std::distance(sets.begin(), it); @@ -1179,13 +1195,13 @@ compound_sstable_set::create_single_key_sstable_reader( // optimize for common case where only 1 set is populated, avoiding the expensive combined reader if (non_empty_set_count == 1) { const auto& non_empty_set = *std::begin(sets); - return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr); + return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate); } auto readers = boost::copy_range>( boost::make_iterator_range(sets.begin(), it) | boost::adaptors::transformed([&] (const lw_shared_ptr& non_empty_set) { - return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, trace_state, fwd, fwd_mr); + return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate); }) ); return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr); @@ -1201,10 +1217,11 @@ sstable_set::create_single_key_sstable_reader( const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { assert(pr.is_singular() && pr.start()->value().has_key()); return _impl->create_single_key_sstable_reader(cf, std::move(schema), - std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr); + std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate); } flat_mutation_reader_v2 @@ -1240,11 +1257,15 @@ sstable_set::make_local_shard_sstable_reader( tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, - read_monitor_generator& monitor_generator) const + read_monitor_generator& monitor_generator, + const sstable_predicate& predicate) const { - auto reader_factory_fn = [s, permit, &slice, trace_state, fwd, fwd_mr, &monitor_generator] + auto reader_factory_fn = [s, permit, &slice, trace_state, fwd, fwd_mr, &monitor_generator, &predicate] (shared_sstable& sst, const dht::partition_range& pr) mutable { assert(!sst->is_shared()); + if (!predicate(*sst)) { + return make_empty_flat_reader_v2(s, permit); + } return sst->make_reader(s, permit, pr, slice, trace_state, fwd, fwd_mr, monitor_generator(sst)); }; if (_impl->size() == 1) [[unlikely]] { diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index 33b23319db..e3176fba28 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -55,6 +55,10 @@ public: virtual std::tuple, dht::ring_position_ext> select(const dht::ring_position_view&) = 0; }; +using sstable_predicate = noncopyable_function; +// Default predicate includes everything +const sstable_predicate& default_sstable_predicate(); + class sstable_set_impl { public: virtual ~sstable_set_impl() {} @@ -78,7 +82,8 @@ public: const query::partition_slice&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const; + mutation_reader::forwarding, + const sstable_predicate&) const; }; class sstable_set : public enable_lw_shared_from_this { @@ -167,7 +172,8 @@ public: const query::partition_slice&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const; + mutation_reader::forwarding, + const sstable_predicate& p = default_sstable_predicate()) const; /// Read a range from the sstable set. /// @@ -192,7 +198,8 @@ public: tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding, - read_monitor_generator& rmg = default_read_monitor_generator()) const; + read_monitor_generator& rmg = default_read_monitor_generator(), + const sstable_predicate& p = default_sstable_predicate()) const; flat_mutation_reader_v2 make_crawling_reader( schema_ptr, diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh index e236567c78..b52f285f8d 100644 --- a/sstables/sstable_set_impl.hh +++ b/sstables/sstable_set_impl.hh @@ -114,7 +114,8 @@ public: const query::partition_slice&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const override; + mutation_reader::forwarding, + const sstable_predicate&) const override; friend class sstable_position_reader_queue; }; @@ -147,7 +148,8 @@ public: const query::partition_slice&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const override; + mutation_reader::forwarding, + const sstable_predicate&) const override; class incremental_selector; }; diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 5f06243bac..cfae56cf31 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -4518,7 +4518,7 @@ SEASTAR_TEST_CASE(simple_backlog_controller_test) { auto backlog_before = t.as_table_state().get_backlog_tracker().backlog(); t->add_sstable_and_update_cache(sst).get(); testlog.debug("\tNew sstable of size={} level={}; Backlog diff={};", - sstables::pretty_printed_data_size(data_size), level, + utils::pretty_printed_data_size(data_size), level, t.as_table_state().get_backlog_tracker().backlog() - backlog_before); }; @@ -4556,7 +4556,7 @@ SEASTAR_TEST_CASE(simple_backlog_controller_test) { for (auto target_table_count : target_table_count_s) { const uint64_t per_table_max_disk_usage = std::ceil(all_tables_disk_usage / target_table_count); - testlog.info("Creating tables, with max size={}", sstables::pretty_printed_data_size(per_table_max_disk_usage)); + testlog.info("Creating tables, with max size={}", utils::pretty_printed_data_size(per_table_max_disk_usage)); std::vector tables; uint64_t tables_total_size = 0; @@ -4577,18 +4577,18 @@ SEASTAR_TEST_CASE(simple_backlog_controller_test) { } auto table_size = t->get_stats().live_disk_space_used; - testlog.debug("T{}: {} tiers, with total size={}", t_idx, tiers, sstables::pretty_printed_data_size(table_size)); + testlog.debug("T{}: {} tiers, with total size={}", t_idx, tiers, utils::pretty_printed_data_size(table_size)); tables.push_back(t); tables_total_size += table_size; } - testlog.debug("Created {} tables, with total size={}", tables.size(), sstables::pretty_printed_data_size(tables_total_size)); + testlog.debug("Created {} tables, with total size={}", tables.size(), utils::pretty_printed_data_size(tables_total_size)); results.push_back(result{ tables.size(), per_table_max_disk_usage, normalize_backlog(manager.backlog()) }); for (auto& t : tables) { t.stop().get(); } } for (auto& r : results) { - testlog.info("Tables={} with max size={} -> NormalizedBacklog={}", r.table_count, sstables::pretty_printed_data_size(r.per_table_max_disk_usage), r.normalized_backlog); + testlog.info("Tables={} with max size={} -> NormalizedBacklog={}", r.table_count, utils::pretty_printed_data_size(r.per_table_max_disk_usage), r.normalized_backlog); // Expect 0 backlog as tiers are all perfectly compacted // With LCS, the size of levels *set up by the test* can slightly exceed their target size, // so let's account for the microscopical amount of backlog returned. diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index f96ee272fa..81ae7928f9 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -3108,3 +3108,87 @@ SEASTAR_TEST_CASE(test_sstable_bytes_on_disk_correctness) { SEASTAR_TEST_CASE(test_sstable_bytes_on_s3_correctness) { return test_sstable_bytes_correctness(get_name() + "_s3", test_env_config{ .storage = make_test_object_storage_options() }); } + +SEASTAR_TEST_CASE(test_sstable_set_predicate) { + return test_env::do_with_async([] (test_env& env) { + auto random_spec = tests::make_random_schema_specification( + get_name(), + std::uniform_int_distribution(1, 4), + std::uniform_int_distribution(2, 4), + std::uniform_int_distribution(2, 8), + std::uniform_int_distribution(2, 8)); + auto random_schema = tests::random_schema{tests::random::get_int(), *random_spec}; + auto s = random_schema.schema(); + + testlog.info("Random schema:\n{}", random_schema.cql()); + + const auto muts = tests::generate_random_mutations(random_schema, 20).get(); + + auto sst = make_sstable_containing(env.make_sstable(s), muts); + + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + sstable_set set = cs.make_sstable_set(s); + set.insert(sst); + + auto first_key_pr = dht::partition_range::make_singular(sst->get_first_decorated_key()); + + auto make_point_query_reader = [&] (std::predicate auto& pred) { + auto t = env.make_table_for_tests(s); + auto close_t = deferred_stop(t); + utils::estimated_histogram eh; + return set.create_single_key_sstable_reader(&*t, s, env.make_reader_permit(), eh, + first_key_pr, + s->full_slice(), + tracing::trace_state_ptr(), + ::streamed_mutation::forwarding::no, + ::mutation_reader::forwarding::no, + pred); + }; + + auto make_full_scan_reader = [&] (std::predicate auto& pred) { + return set.make_local_shard_sstable_reader(s, env.make_reader_permit(), + query::full_partition_range, + s->full_slice(), + tracing::trace_state_ptr(), + ::streamed_mutation::forwarding::no, + ::mutation_reader::forwarding::no, + default_read_monitor_generator(), + pred); + }; + + auto verify_reader_result = [&] (flat_mutation_reader_v2 sst_mr, bool expect_eos) { + auto close_mr = deferred_close(sst_mr); + auto sst_mut = read_mutation_from_flat_mutation_reader(sst_mr).get0(); + + if (expect_eos) { + BOOST_REQUIRE(sst_mr.is_buffer_empty()); + BOOST_REQUIRE(sst_mr.is_end_of_stream()); + BOOST_REQUIRE(!sst_mut); + } else { + BOOST_REQUIRE(sst_mut); + } + }; + + { + static std::predicate auto excluding_pred = [] (const sstable&) { + return false; + }; + + testlog.info("excluding_pred: point query"); + verify_reader_result(make_point_query_reader(excluding_pred), true); + testlog.info("excluding_pred: range query"); + verify_reader_result(make_full_scan_reader(excluding_pred), true); + } + + { + static std::predicate auto inclusive_pred = [] (const sstable&) { + return true; + }; + + testlog.info("inclusive_pred: point query"); + verify_reader_result(make_point_query_reader(inclusive_pred), false); + testlog.info("inclusive_pred: range query"); + verify_reader_result(make_full_scan_reader(inclusive_pred), false); + } + }); +} diff --git a/utils/pretty_printers.cc b/utils/pretty_printers.cc new file mode 100644 index 0000000000..c4d51ea317 --- /dev/null +++ b/utils/pretty_printers.cc @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include "pretty_printers.hh" + +namespace utils { + +std::ostream& operator<<(std::ostream& os, pretty_printed_data_size data) { + static constexpr const char *suffixes[] = {" bytes", "kB", "MB", "GB", "TB", "PB"}; + + unsigned exp = 0; + while ((data._size >= 1000) && (exp < sizeof(suffixes))) { + exp++; + data._size /= 1000; + } + + os << data._size << suffixes[exp]; + return os; +} + +std::ostream& operator<<(std::ostream& os, pretty_printed_throughput tp) { + uint64_t throughput = tp._duration.count() > 0 ? tp._size / tp._duration.count() : 0; + os << pretty_printed_data_size(throughput) << "/s"; + return os; +} + +} diff --git a/utils/pretty_printers.hh b/utils/pretty_printers.hh new file mode 100644 index 0000000000..e90e1c5164 --- /dev/null +++ b/utils/pretty_printers.hh @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include +#include + +namespace utils { + +class pretty_printed_data_size { + uint64_t _size; +public: + pretty_printed_data_size(uint64_t size) : _size(size) {} + + friend std::ostream& operator<<(std::ostream&, pretty_printed_data_size); +}; + +class pretty_printed_throughput { + uint64_t _size; + std::chrono::duration _duration; +public: + pretty_printed_throughput(uint64_t size, std::chrono::duration dur) : _size(size), _duration(std::move(dur)) {} + + friend std::ostream& operator<<(std::ostream&, pretty_printed_throughput); +}; + +}