add a priority class to mutation readers
SSTables already have a priority argument wired to their read path. However, most of our reads do not call that interface directly, but employ the services of a mutation reader instead. Some of those readers will be used to read through a mutation_source, and those have to patched as well. Right now, whenever we need to pass a class, we pass Seastar's default priority class. Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
37
database.cc
37
database.cc
@@ -128,8 +128,8 @@ column_family::make_partition_presence_checker(lw_shared_ptr<sstable_list> old_s
|
||||
|
||||
mutation_source
|
||||
column_family::sstables_as_mutation_source() {
|
||||
return mutation_source([this] (schema_ptr s, const query::partition_range& r) {
|
||||
return make_sstable_reader(std::move(s), r);
|
||||
return mutation_source([this] (schema_ptr s, const query::partition_range& r, const io_priority_class& pc) {
|
||||
return make_sstable_reader(std::move(s), r, pc);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -155,10 +155,14 @@ class range_sstable_reader final : public mutation_reader::impl {
|
||||
const query::partition_range& _pr;
|
||||
lw_shared_ptr<sstable_list> _sstables;
|
||||
mutation_reader _reader;
|
||||
// Use a pointer instead of copying, so we don't need to regenerate the reader if
|
||||
// the priority changes.
|
||||
const io_priority_class* _pc;
|
||||
public:
|
||||
range_sstable_reader(schema_ptr s, lw_shared_ptr<sstable_list> sstables, const query::partition_range& pr)
|
||||
range_sstable_reader(schema_ptr s, lw_shared_ptr<sstable_list> sstables, const query::partition_range& pr, const io_priority_class& pc)
|
||||
: _pr(pr)
|
||||
, _sstables(std::move(sstables))
|
||||
, _pc(&pc)
|
||||
{
|
||||
std::vector<mutation_reader> readers;
|
||||
for (const lw_shared_ptr<sstables::sstable>& sst : *_sstables | boost::adaptors::map_values) {
|
||||
@@ -185,11 +189,15 @@ class single_key_sstable_reader final : public mutation_reader::impl {
|
||||
mutation_opt _m;
|
||||
bool _done = false;
|
||||
lw_shared_ptr<sstable_list> _sstables;
|
||||
// Use a pointer instead of copying, so we don't need to regenerate the reader if
|
||||
// the priority changes.
|
||||
const io_priority_class* _pc;
|
||||
public:
|
||||
single_key_sstable_reader(schema_ptr schema, lw_shared_ptr<sstable_list> sstables, const partition_key& key)
|
||||
single_key_sstable_reader(schema_ptr schema, lw_shared_ptr<sstable_list> sstables, const partition_key& key, const io_priority_class& pc)
|
||||
: _schema(std::move(schema))
|
||||
, _key(sstables::key::from_partition_key(*_schema, key))
|
||||
, _sstables(std::move(sstables))
|
||||
, _pc(&pc)
|
||||
{ }
|
||||
|
||||
virtual future<mutation_opt> operator()() override {
|
||||
@@ -208,26 +216,26 @@ public:
|
||||
};
|
||||
|
||||
mutation_reader
|
||||
column_family::make_sstable_reader(schema_ptr s, const query::partition_range& pr) const {
|
||||
column_family::make_sstable_reader(schema_ptr s, const query::partition_range& pr, const io_priority_class& pc) const {
|
||||
if (pr.is_singular() && pr.start()->value().has_key()) {
|
||||
const dht::ring_position& pos = pr.start()->value();
|
||||
if (dht::shard_of(pos.token()) != engine().cpu_id()) {
|
||||
return make_empty_reader(); // range doesn't belong to this shard
|
||||
}
|
||||
return make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key());
|
||||
return make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key(), pc);
|
||||
} else {
|
||||
// range_sstable_reader is not movable so we need to wrap it
|
||||
return make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr);
|
||||
return make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, pc);
|
||||
}
|
||||
}
|
||||
|
||||
key_source column_family::sstables_as_key_source() const {
|
||||
return key_source([this] (const query::partition_range& range) {
|
||||
return key_source([this] (const query::partition_range& range, const io_priority_class& pc) {
|
||||
std::vector<key_reader> readers;
|
||||
readers.reserve(_sstables->size());
|
||||
std::transform(_sstables->begin(), _sstables->end(), std::back_inserter(readers), [&] (auto&& entry) {
|
||||
auto& sst = entry.second;
|
||||
auto rd = sstables::make_key_reader(_schema, sst, range);
|
||||
auto rd = sstables::make_key_reader(_schema, sst, range, pc);
|
||||
if (sst->is_shared()) {
|
||||
rd = make_filtering_reader(std::move(rd), [] (const dht::decorated_key& dk) {
|
||||
return dht::shard_of(dk.token()) == engine().cpu_id();
|
||||
@@ -276,7 +284,7 @@ column_family::find_row(schema_ptr s, const dht::decorated_key& partition_key, c
|
||||
}
|
||||
|
||||
mutation_reader
|
||||
column_family::make_reader(schema_ptr s, const query::partition_range& range) const {
|
||||
column_family::make_reader(schema_ptr s, const query::partition_range& range, const io_priority_class& pc) const {
|
||||
if (query::is_wrap_around(range, *s)) {
|
||||
// make_combined_reader() can't handle streams that wrap around yet.
|
||||
fail(unimplemented::cause::WRAP_AROUND);
|
||||
@@ -310,14 +318,15 @@ column_family::make_reader(schema_ptr s, const query::partition_range& range) co
|
||||
}
|
||||
|
||||
if (_config.enable_cache) {
|
||||
readers.emplace_back(_cache.make_reader(s, range));
|
||||
readers.emplace_back(_cache.make_reader(s, range, pc));
|
||||
} else {
|
||||
readers.emplace_back(make_sstable_reader(s, range));
|
||||
readers.emplace_back(make_sstable_reader(s, range, pc));
|
||||
}
|
||||
|
||||
return make_combined_reader(std::move(readers));
|
||||
}
|
||||
|
||||
// Not performance critical. Currently used for testing only.
|
||||
template <typename Func>
|
||||
future<bool>
|
||||
column_family::for_all_partitions(schema_ptr s, Func&& func) const {
|
||||
@@ -1563,8 +1572,8 @@ column_family::query(schema_ptr s, const query::read_command& cmd, const std::ve
|
||||
|
||||
mutation_source
|
||||
column_family::as_mutation_source() const {
|
||||
return mutation_source([this] (schema_ptr s, const query::partition_range& range) {
|
||||
return this->make_reader(std::move(s), range);
|
||||
return mutation_source([this] (schema_ptr s, const query::partition_range& range, const io_priority_class& pc) {
|
||||
return this->make_reader(std::move(s), range, pc);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -201,7 +201,7 @@ private:
|
||||
// Caller needs to ensure that column_family remains live (FIXME: relax this).
|
||||
// The 'range' parameter must be live as long as the reader is used.
|
||||
// Mutations returned by the reader will all have given schema.
|
||||
mutation_reader make_sstable_reader(schema_ptr schema, const query::partition_range& range) const;
|
||||
mutation_reader make_sstable_reader(schema_ptr schema, const query::partition_range& range, const io_priority_class& pc) const;
|
||||
|
||||
mutation_source sstables_as_mutation_source();
|
||||
key_source sstables_as_key_source() const;
|
||||
@@ -213,7 +213,11 @@ public:
|
||||
// Note: for data queries use query() instead.
|
||||
// The 'range' parameter must be live as long as the reader is used.
|
||||
// Mutations returned by the reader will all have given schema.
|
||||
mutation_reader make_reader(schema_ptr schema, const query::partition_range& range = query::full_partition_range) const;
|
||||
// If I/O needs to be issued to read anything in the specified range, the operations
|
||||
// will be scheduled under the priority class given by pc.
|
||||
mutation_reader make_reader(schema_ptr schema,
|
||||
const query::partition_range& range = query::full_partition_range,
|
||||
const io_priority_class& pc = default_priority_class()) const;
|
||||
|
||||
mutation_source as_mutation_source() const;
|
||||
|
||||
|
||||
@@ -83,10 +83,17 @@ key_reader make_filtering_reader(key_reader&& reader, Filter&& filter) {
|
||||
}
|
||||
|
||||
class key_source {
|
||||
std::function<key_reader(const query::partition_range& range)> _fn;
|
||||
std::function<key_reader(const query::partition_range& range, const io_priority_class& pc)> _fn;
|
||||
public:
|
||||
key_source(std::function<key_reader(const query::partition_range& range)> fn) : _fn(std::move(fn)) {}
|
||||
key_source(std::function<key_reader(const query::partition_range& range, const io_priority_class& pc)> fn) : _fn(std::move(fn)) {}
|
||||
key_source(std::function<key_reader(const query::partition_range& range)> fn)
|
||||
: _fn([fn = std::move(fn)](const query::partition_range& range, const io_priority_class& pc) {
|
||||
return fn(range);
|
||||
}) {}
|
||||
key_reader operator()(const query::partition_range& range, const io_priority_class& pc) {
|
||||
return _fn(range, pc);
|
||||
}
|
||||
key_reader operator()(const query::partition_range& range) {
|
||||
return _fn(range);
|
||||
return _fn(range, default_priority_class());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -153,11 +153,19 @@ future<> consume(mutation_reader& reader, Consumer consumer) {
|
||||
// The reader returns mutations having all the same schema, the one passed
|
||||
// when invoking the source.
|
||||
class mutation_source {
|
||||
std::function<mutation_reader(schema_ptr, const query::partition_range& range)> _fn;
|
||||
std::function<mutation_reader(schema_ptr, const query::partition_range& range, const io_priority_class& pc)> _fn;
|
||||
public:
|
||||
mutation_source(std::function<mutation_reader(schema_ptr, const query::partition_range& range)> fn) : _fn(std::move(fn)) {}
|
||||
mutation_source(std::function<mutation_reader(schema_ptr, const query::partition_range& range, const io_priority_class& pc)> fn) : _fn(std::move(fn)) {}
|
||||
mutation_source(std::function<mutation_reader(schema_ptr, const query::partition_range& range)> fn)
|
||||
: _fn([fn = std::move(fn)] (schema_ptr s, const query::partition_range& range, const io_priority_class& pc) {
|
||||
return fn(s, range);
|
||||
}) {}
|
||||
|
||||
mutation_reader operator()(schema_ptr s, const query::partition_range& range, const io_priority_class& pc) const {
|
||||
return _fn(std::move(s), range, pc);
|
||||
}
|
||||
mutation_reader operator()(schema_ptr s, const query::partition_range& range) const {
|
||||
return _fn(std::move(s), range);
|
||||
return _fn(std::move(s), range, default_priority_class());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
24
row_cache.cc
24
row_cache.cc
@@ -264,12 +264,14 @@ class scanning_and_populating_reader final : public mutation_reader::impl {
|
||||
key_reader _keys;
|
||||
dht::decorated_key_opt _next_key;
|
||||
dht::decorated_key_opt _last_secondary_key;
|
||||
const io_priority_class _pc;
|
||||
public:
|
||||
scanning_and_populating_reader(schema_ptr s, row_cache& cache, const query::partition_range& range)
|
||||
scanning_and_populating_reader(schema_ptr s, row_cache& cache, const query::partition_range& range, const io_priority_class& pc)
|
||||
: _cache(cache), _schema(s),
|
||||
_primary(make_mutation_reader<just_cache_scanning_reader>(s, cache, range)),
|
||||
_underlying(cache._underlying), _original_range(range), _underlying_keys(cache._underlying_keys),
|
||||
_keys(_underlying_keys(range))
|
||||
_keys(_underlying_keys(range, pc)),
|
||||
_pc(pc)
|
||||
{ }
|
||||
virtual future<mutation_opt> operator()() override {
|
||||
// FIXME: store in cache information whether the immediate successor
|
||||
@@ -304,7 +306,7 @@ public:
|
||||
_range = query::partition_range(query::partition_range::bound { std::move(*dk), true }, std::move(end));
|
||||
_last_secondary_key = {};
|
||||
_secondary_phase = _cache._populate_phaser.phase();
|
||||
_secondary = _underlying(_cache._schema, _range);
|
||||
_secondary = _underlying(_cache._schema, _range, _pc);
|
||||
_secondary_only = true;
|
||||
return next_secondary();
|
||||
});
|
||||
@@ -317,13 +319,13 @@ private:
|
||||
auto cmp = dht::ring_position_comparator(*_schema);
|
||||
_range = _range.split_after(*_last_secondary_key, cmp);
|
||||
_secondary_phase = _cache._populate_phaser.phase();
|
||||
_secondary = _underlying(_cache._schema, _range);
|
||||
_secondary = _underlying(_cache._schema, _range, _pc);
|
||||
}
|
||||
return _secondary().then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) {
|
||||
if (!mo && _next_primary) {
|
||||
auto cmp = dht::ring_position_comparator(*_schema);
|
||||
_range = _original_range.split_after(_next_primary->decorated_key(), cmp);
|
||||
_keys = _underlying_keys(_range);
|
||||
_keys = _underlying_keys(_range, _pc);
|
||||
_secondary_only = false;
|
||||
_cache.on_hit();
|
||||
return std::move(_next_primary);
|
||||
@@ -346,21 +348,21 @@ private:
|
||||
};
|
||||
|
||||
mutation_reader
|
||||
row_cache::make_scanning_reader(schema_ptr s, const query::partition_range& range) {
|
||||
row_cache::make_scanning_reader(schema_ptr s, const query::partition_range& range, const io_priority_class& pc) {
|
||||
if (range.is_wrap_around(dht::ring_position_comparator(*s))) {
|
||||
warn(unimplemented::cause::WRAP_AROUND);
|
||||
throw std::runtime_error("row_cache doesn't support wrap-around ranges");
|
||||
}
|
||||
return make_mutation_reader<scanning_and_populating_reader>(std::move(s), *this, range);
|
||||
return make_mutation_reader<scanning_and_populating_reader>(std::move(s), *this, range, pc);
|
||||
}
|
||||
|
||||
mutation_reader
|
||||
row_cache::make_reader(schema_ptr s, const query::partition_range& range) {
|
||||
row_cache::make_reader(schema_ptr s, const query::partition_range& range, const io_priority_class& pc) {
|
||||
if (range.is_singular()) {
|
||||
const query::ring_position& pos = range.start()->value();
|
||||
|
||||
if (!pos.has_key()) {
|
||||
return make_scanning_reader(std::move(s), range);
|
||||
return make_scanning_reader(std::move(s), range, pc);
|
||||
}
|
||||
|
||||
return _read_section(_tracker.region(), [&] {
|
||||
@@ -374,12 +376,12 @@ row_cache::make_reader(schema_ptr s, const query::partition_range& range) {
|
||||
return make_reader_returning(e.read(s));
|
||||
} else {
|
||||
on_miss();
|
||||
return make_mutation_reader<populating_reader>(s, *this, _underlying(_schema, range));
|
||||
return make_mutation_reader<populating_reader>(s, *this, _underlying(_schema, range, pc));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return make_scanning_reader(std::move(s), range);
|
||||
return make_scanning_reader(std::move(s), range, pc);
|
||||
}
|
||||
|
||||
row_cache::~row_cache() {
|
||||
|
||||
@@ -195,7 +195,7 @@ private:
|
||||
logalloc::allocating_section _update_section;
|
||||
logalloc::allocating_section _populate_section;
|
||||
logalloc::allocating_section _read_section;
|
||||
mutation_reader make_scanning_reader(schema_ptr, const query::partition_range&);
|
||||
mutation_reader make_scanning_reader(schema_ptr, const query::partition_range&, const io_priority_class& pc);
|
||||
void on_hit();
|
||||
void on_miss();
|
||||
void upgrade_entry(cache_entry&);
|
||||
@@ -211,7 +211,8 @@ public:
|
||||
// User needs to ensure that the row_cache object stays alive
|
||||
// as long as the reader is used.
|
||||
// The range must not wrap around.
|
||||
mutation_reader make_reader(schema_ptr, const query::partition_range& = query::full_partition_range);
|
||||
mutation_reader make_reader(schema_ptr, const query::partition_range& = query::full_partition_range, const io_priority_class& = default_priority_class());
|
||||
|
||||
const stats& stats() const { return _stats; }
|
||||
public:
|
||||
// Populate cache from given mutation. The mutation must contain all
|
||||
|
||||
@@ -2906,6 +2906,7 @@ class shard_reader final : public mutation_reader::impl {
|
||||
const query::partition_range _range;
|
||||
global_schema_ptr _schema;
|
||||
schema_ptr _local_schema;
|
||||
const io_priority_class *_pc;
|
||||
struct remote_state {
|
||||
mutation_reader reader;
|
||||
std::experimental::optional<frozen_mutation> _m;
|
||||
@@ -2916,7 +2917,7 @@ private:
|
||||
return _db.invoke_on(_shard, [this] (database& db) {
|
||||
schema_ptr s = _schema;
|
||||
column_family& cf = db.find_column_family(s->id());
|
||||
return make_foreign(std::make_unique<remote_state>(remote_state{cf.make_reader(std::move(s), _range)}));
|
||||
return make_foreign(std::make_unique<remote_state>(remote_state{cf.make_reader(std::move(s), _range, *_pc)}));
|
||||
}).then([this] (auto&& ptr) {
|
||||
_remote = std::move(ptr);
|
||||
});
|
||||
@@ -2925,12 +2926,14 @@ public:
|
||||
shard_reader(schema_ptr s,
|
||||
distributed<database>& db,
|
||||
unsigned shard,
|
||||
const query::partition_range& range)
|
||||
const query::partition_range& range,
|
||||
const io_priority_class& pc)
|
||||
: _db(db)
|
||||
, _shard(shard)
|
||||
, _range(range)
|
||||
, _schema(s)
|
||||
, _local_schema(std::move(s))
|
||||
, _pc(&pc)
|
||||
{ }
|
||||
|
||||
virtual future<mutation_opt> operator()() override {
|
||||
@@ -2959,7 +2962,8 @@ public:
|
||||
};
|
||||
|
||||
mutation_reader
|
||||
storage_proxy::make_local_reader(utils::UUID cf_id, const query::partition_range& range) {
|
||||
storage_proxy::make_local_reader(utils::UUID cf_id, const query::partition_range& range,
|
||||
const io_priority_class& pc) {
|
||||
// Split ranges which wrap around, because the individual readers created
|
||||
// by shard_reader do not support them:
|
||||
auto schema = _db.local().find_column_family(cf_id).schema();
|
||||
@@ -2967,8 +2971,8 @@ storage_proxy::make_local_reader(utils::UUID cf_id, const query::partition_range
|
||||
auto unwrapped = range.unwrap();
|
||||
std::vector<mutation_reader> both;
|
||||
both.reserve(2);
|
||||
both.push_back(make_local_reader(cf_id, unwrapped.first));
|
||||
both.push_back(make_local_reader(cf_id, unwrapped.second));
|
||||
both.push_back(make_local_reader(cf_id, unwrapped.first, pc));
|
||||
both.push_back(make_local_reader(cf_id, unwrapped.second, pc));
|
||||
return make_joining_reader(std::move(both));
|
||||
}
|
||||
|
||||
@@ -2976,7 +2980,7 @@ storage_proxy::make_local_reader(utils::UUID cf_id, const query::partition_range
|
||||
unsigned last_shard = range.end() ? dht::shard_of(range.end()->value().token()) : smp::count - 1;
|
||||
std::vector<mutation_reader> readers;
|
||||
for (auto cpu = first_shard; cpu <= last_shard; ++cpu) {
|
||||
readers.emplace_back(make_mutation_reader<shard_reader>(schema, _db, cpu, range));
|
||||
readers.emplace_back(make_mutation_reader<shard_reader>(schema, _db, cpu, range, pc));
|
||||
}
|
||||
return make_joining_reader(std::move(readers));
|
||||
}
|
||||
|
||||
@@ -229,7 +229,8 @@ public:
|
||||
* which combines data from all shards.
|
||||
* Uses schema current at the time of invocation.
|
||||
*/
|
||||
mutation_reader make_local_reader(utils::UUID cf_id, const query::partition_range&);
|
||||
mutation_reader make_local_reader(utils::UUID cf_id, const query::partition_range&,
|
||||
const io_priority_class& pc = default_priority_class());
|
||||
|
||||
future<> stop();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user