Make queue_reader public

Extract it from `mutlishard_writer.cc` and move it to
`mutation_reader.{hh,cc}` so other code can start using it too.
This commit is contained in:
Botond Dénes
2019-05-20 09:43:09 +03:00
parent 7a0c6cd583
commit a597e46792
3 changed files with 36 additions and 30 deletions

View File

@@ -27,35 +27,6 @@
#include <seastar/core/future-util.hh>
#include <seastar/core/queue.hh>
class queue_reader final : public flat_mutation_reader::impl {
seastar::queue<mutation_fragment_opt>& _mq;
public:
queue_reader(schema_ptr s, seastar::queue<mutation_fragment_opt>& mq)
: impl(std::move(s))
, _mq(mq) {
}
virtual future<> fill_buffer(db::timeout_clock::time_point) override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
return _mq.pop_eventually().then([this] (mutation_fragment_opt mopt) {
if (!mopt) {
_end_of_stream = true;
} else {
push_mutation_fragment(std::move(*mopt));
}
});
});
}
virtual void next_partition() override {
throw std::bad_function_call();
}
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override {
throw std::bad_function_call();
}
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override {
throw std::bad_function_call();
}
};
class shard_writer {
private:
schema_ptr _s;
@@ -138,7 +109,7 @@ multishard_writer::multishard_writer(
}
future<> multishard_writer::make_shard_writer(unsigned shard) {
auto this_shard_reader = make_foreign(std::make_unique<flat_mutation_reader>(make_flat_mutation_reader<queue_reader>(_s, _queues[shard])));
auto this_shard_reader = make_foreign(std::make_unique<flat_mutation_reader>(make_queue_reader(_s, _queues[shard])));
return smp::submit_to(shard, [gs = global_schema_ptr(_s),
consumer = _consumer,
reader = std::move(this_shard_reader)] () mutable {

View File

@@ -1565,3 +1565,36 @@ flat_mutation_reader make_multishard_combining_reader(
return make_flat_mutation_reader<multishard_combining_reader>(std::move(lifecycle_policy), partitioner, std::move(schema), pr, ps, pc,
std::move(trace_state), fwd_mr);
}
class queue_reader final : public flat_mutation_reader::impl {
seastar::queue<mutation_fragment_opt>& _mq;
public:
queue_reader(schema_ptr s, seastar::queue<mutation_fragment_opt>& mq)
: impl(std::move(s))
, _mq(mq) {
}
virtual future<> fill_buffer(db::timeout_clock::time_point) override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
return _mq.pop_eventually().then([this] (mutation_fragment_opt mopt) {
if (!mopt) {
_end_of_stream = true;
} else {
push_mutation_fragment(std::move(*mopt));
}
});
});
}
virtual void next_partition() override {
throw std::bad_function_call();
}
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override {
throw std::bad_function_call();
}
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override {
throw std::bad_function_call();
}
};
flat_mutation_reader make_queue_reader(schema_ptr s, queue<mutation_fragment_opt>& mq) {
return make_flat_mutation_reader<queue_reader>(std::move(s), mq);
}

View File

@@ -507,3 +507,5 @@ flat_mutation_reader make_multishard_combining_reader(
const io_priority_class& pc,
tracing::trace_state_ptr trace_state = nullptr,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no);
flat_mutation_reader make_queue_reader(schema_ptr s, queue<mutation_fragment_opt>& mq);