treewide: use make_tracking_only_permit()
For all those reads that don't (won't or can't) pass through admission currently.
This commit is contained in:
@@ -539,7 +539,7 @@ protected:
|
||||
: _cf(cf)
|
||||
, _sstable_creator(std::move(descriptor.creator))
|
||||
, _schema(cf.schema())
|
||||
, _permit(_cf.compaction_concurrency_semaphore().make_permit(_cf.schema().get(), "compaction"))
|
||||
, _permit(_cf.compaction_concurrency_semaphore().make_tracking_only_permit(_cf.schema().get(), "compaction"))
|
||||
, _sstables(std::move(descriptor.sstables))
|
||||
, _max_sstable_size(descriptor.max_sstable_bytes)
|
||||
, _sstable_level(descriptor.level)
|
||||
@@ -1704,7 +1704,7 @@ static future<compaction_info> validate_sstables(sstables::compaction_descriptor
|
||||
|
||||
clogger.info("Validating {}", sstables_list_msg);
|
||||
|
||||
auto permit = cf.compaction_concurrency_semaphore().make_permit(schema.get(), "Validation");
|
||||
auto permit = cf.compaction_concurrency_semaphore().make_tracking_only_permit(schema.get(), "Validation");
|
||||
auto reader = sstables->make_local_shard_sstable_reader(schema, permit, query::full_partition_range, schema->full_slice(), descriptor.io_priority,
|
||||
tracing::trace_state_ptr(), ::streamed_mutation::forwarding::no, ::mutation_reader::forwarding::no, default_read_monitor_generator());
|
||||
|
||||
|
||||
@@ -1658,7 +1658,7 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
// counter state for each modified cell...
|
||||
|
||||
tracing::trace(trace_state, "Reading counter values from the CF");
|
||||
auto permit = get_reader_concurrency_semaphore().make_permit(m_schema.get(), "counter-read-before-write");
|
||||
auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write");
|
||||
return counter_write_query(m_schema, cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state, timeout)
|
||||
.then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) {
|
||||
// ...now, that we got existing state of all affected counter
|
||||
|
||||
@@ -1388,7 +1388,7 @@ view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_di
|
||||
: _db(db)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _mnotifier(mn)
|
||||
, _permit(_db.get_reader_concurrency_semaphore().make_permit(nullptr, "view_builder")) {
|
||||
, _permit(_db.get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "view_builder")) {
|
||||
setup_metrics();
|
||||
}
|
||||
|
||||
|
||||
@@ -223,7 +223,7 @@ public:
|
||||
tracing::trace_state_ptr trace_state)
|
||||
: _db(db)
|
||||
, _schema(std::move(s))
|
||||
, _permit(_db.local().get_reader_concurrency_semaphore().make_permit(_schema.get(), "multishard-mutation-query"))
|
||||
, _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema.get(), "multishard-mutation-query"))
|
||||
, _cmd(cmd)
|
||||
, _ranges(ranges)
|
||||
, _trace_state(std::move(trace_state))
|
||||
|
||||
@@ -124,7 +124,7 @@ future<> multishard_writer::make_shard_writer(unsigned shard) {
|
||||
reader = make_foreign(std::make_unique<flat_mutation_reader>(std::move(reader)))] () mutable {
|
||||
auto s = gs.get();
|
||||
auto semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, "shard_writer");
|
||||
auto permit = semaphore->make_permit(s.get(), "multishard-writer");
|
||||
auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer");
|
||||
auto this_shard_reader = make_foreign_reader(s, std::move(permit), std::move(reader));
|
||||
return make_foreign(std::make_unique<shard_writer>(gs.get(), std::move(semaphore), std::move(this_shard_reader), consumer));
|
||||
}).then([this, shard] (foreign_ptr<std::unique_ptr<shard_writer>> writer) {
|
||||
|
||||
@@ -757,7 +757,7 @@ public:
|
||||
, _enc_stats(enc_stats)
|
||||
, _shard(shard)
|
||||
, _semaphore(reader_concurrency_semaphore::no_limits{}, "mx writer")
|
||||
, _range_tombstones(range_tombstone_stream(_schema, _semaphore.make_permit(&s, "mx-writer")))
|
||||
, _range_tombstones(range_tombstone_stream(_schema, _semaphore.make_tracking_only_permit(&s, "mx-writer")))
|
||||
, _tmp_bufs(_sst.sstable_buffer_size)
|
||||
, _sst_schema(make_sstable_schema(s, _enc_stats, _cfg))
|
||||
, _run_identifier(cfg.run_identifier)
|
||||
|
||||
@@ -1790,7 +1790,7 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
|
||||
|
||||
auto s = summary_generator(_schema->get_partitioner(), _components->summary, _manager.config().sstable_summary_ratio());
|
||||
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(
|
||||
sem.make_permit(_schema.get(), "generate-summary"), s, trust_promoted_index::yes, *_schema, index_file, std::move(options), 0, index_size,
|
||||
sem.make_tracking_only_permit(_schema.get(), "generate-summary"), s, trust_promoted_index::yes, *_schema, index_file, std::move(options), 0, index_size,
|
||||
(_version >= sstable_version_types::mc
|
||||
? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header()))
|
||||
: std::optional<column_values_fixed_lengths>{}));
|
||||
@@ -2701,7 +2701,7 @@ future<bool> sstable::has_partition_key(const utils::hashed_key& hk, const dht::
|
||||
std::exception_ptr ex;
|
||||
auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::has_partition_key()");
|
||||
try {
|
||||
auto lh_index_ptr = std::make_unique<sstables::index_reader>(s, sem.make_permit(_schema.get(), s->get_filename()), default_priority_class(), tracing::trace_state_ptr());
|
||||
auto lh_index_ptr = std::make_unique<sstables::index_reader>(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename()), default_priority_class(), tracing::trace_state_ptr());
|
||||
present = co_await lh_index_ptr->advance_lower_and_check_if_present(dk);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
|
||||
8
table.cc
8
table.cc
@@ -644,7 +644,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
|
||||
|
||||
auto f = consumer(old->make_flush_reader(
|
||||
old->schema(),
|
||||
compaction_concurrency_semaphore().make_permit(old->schema().get(), "try_flush_memtable_to_sstable()"),
|
||||
compaction_concurrency_semaphore().make_tracking_only_permit(old->schema().get(), "try_flush_memtable_to_sstable()"),
|
||||
service::get_local_memtable_flush_priority()));
|
||||
|
||||
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
|
||||
@@ -1940,7 +1940,7 @@ write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, sstables::
|
||||
std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, "write_memtable_to_sstable"),
|
||||
cfg,
|
||||
[&mt, sst] (auto& monitor, auto& semaphore, auto& cfg) {
|
||||
return write_memtable_to_sstable(semaphore->make_permit(mt.schema().get(), "mt_to_sst"), mt, std::move(sst), monitor, cfg)
|
||||
return write_memtable_to_sstable(semaphore->make_tracking_only_permit(mt.schema().get(), "mt_to_sst"), mt, std::move(sst), monitor, cfg)
|
||||
.finally([&semaphore] {
|
||||
return semaphore->stop();
|
||||
});
|
||||
@@ -2272,7 +2272,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(schema_ptr s
|
||||
auto cr_ranges = co_await db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views);
|
||||
if (cr_ranges.empty()) {
|
||||
tracing::trace(tr_state, "View updates do not require read-before-write");
|
||||
co_await generate_and_propagate_view_updates(base, sem.make_permit(s.get(), "push-view-updates-1"), std::move(views), std::move(m), { }, std::move(tr_state), now);
|
||||
co_await generate_and_propagate_view_updates(base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1"), std::move(views), std::move(m), { }, std::move(tr_state), now);
|
||||
// In this case we are not doing a read-before-write, just a
|
||||
// write, so no lock is needed.
|
||||
co_return row_locker::lock_holder();
|
||||
@@ -2297,7 +2297,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(schema_ptr s
|
||||
co_await utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout);
|
||||
auto lock = co_await std::move(lockf);
|
||||
auto pk = dht::partition_range::make_singular(m.decorated_key());
|
||||
auto permit = sem.make_permit(base.get(), "push-view-updates-2");
|
||||
auto permit = sem.make_tracking_only_permit(base.get(), "push-view-updates-2");
|
||||
auto reader = source.make_reader(base, permit, pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
co_await this->generate_and_propagate_view_updates(base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now);
|
||||
tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name());
|
||||
|
||||
@@ -177,7 +177,7 @@ Note: UDT is not supported for now.
|
||||
sst->load().get();
|
||||
|
||||
{
|
||||
sstables::index_reader idx_reader(sst, rcs_sem.make_permit(primary_key_schema.get(), "idx"), default_priority_class(), {});
|
||||
sstables::index_reader idx_reader(sst, rcs_sem.make_tracking_only_permit(primary_key_schema.get(), "idx"), default_priority_class(), {});
|
||||
|
||||
list_partitions(*primary_key_schema, idx_reader);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user