database: use valid permit for counter read-before-write
Counter writes involve a read-before-write, which will soon require a valid permit to be passed to it, so make sure we create and pass a valid permit to this read. We use `database::make_query_class_config()` to obtain the semaphore for the read which selects the appropriate user/system semaphore based on the scheduling group the counter write is running in.
This commit is contained in:
@@ -1341,7 +1341,7 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
// counter state for each modified cell...
|
||||
|
||||
tracing::trace(trace_state, "Reading counter values from the CF");
|
||||
return counter_write_query(m_schema, cf.as_mutation_source(), m.decorated_key(), slice, trace_state)
|
||||
return counter_write_query(m_schema, cf.as_mutation_source(), make_query_class_config().semaphore.make_permit(), m.decorated_key(), slice, trace_state)
|
||||
.then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) {
|
||||
// ...now, that we got existing state of all affected counter
|
||||
// cells we can look for our shard in each of them, increment
|
||||
|
||||
@@ -2501,7 +2501,7 @@ mutation_partition::fully_discontinuous(const schema& s, const position_range& r
|
||||
return check_continuity(s, r, is_continuous::no);
|
||||
}
|
||||
|
||||
future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& source,
|
||||
future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& source, reader_permit permit,
|
||||
const dht::decorated_key& dk,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_ptr)
|
||||
@@ -2513,19 +2513,19 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
|
||||
range_and_reader(range_and_reader&&) = delete;
|
||||
range_and_reader(const range_and_reader&) = delete;
|
||||
|
||||
range_and_reader(schema_ptr s, const mutation_source& source,
|
||||
range_and_reader(schema_ptr s, const mutation_source& source, reader_permit permit,
|
||||
const dht::decorated_key& dk,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_ptr)
|
||||
: range(dht::partition_range::make_singular(dk))
|
||||
, reader(source.make_reader(s, no_reader_permit(), range, slice, service::get_local_sstable_query_read_priority(),
|
||||
, reader(source.make_reader(s, std::move(permit), range, slice, service::get_local_sstable_query_read_priority(),
|
||||
std::move(trace_ptr), streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding::no))
|
||||
{ }
|
||||
};
|
||||
|
||||
// do_with() doesn't support immovable objects
|
||||
auto r_a_r = std::make_unique<range_and_reader>(s, source, dk, slice, std::move(trace_ptr));
|
||||
auto r_a_r = std::make_unique<range_and_reader>(s, source, std::move(permit), dk, slice, std::move(trace_ptr));
|
||||
auto cwqrb = counter_write_query_result_builder(*s);
|
||||
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::yes, counter_write_query_result_builder>>(
|
||||
*s, gc_clock::now(), slice, query::max_rows, query::max_rows, std::move(cwqrb));
|
||||
|
||||
@@ -204,7 +204,7 @@ public:
|
||||
};
|
||||
|
||||
// Performs a query for counter updates.
|
||||
future<mutation_opt> counter_write_query(schema_ptr, const mutation_source&,
|
||||
future<mutation_opt> counter_write_query(schema_ptr, const mutation_source&, reader_permit permit,
|
||||
const dht::decorated_key& dk,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_ptr);
|
||||
|
||||
Reference in New Issue
Block a user