repair: switch from queue+generating_reader to queue_reader

The queue_reader was inveted exactly to replace this construct and is
more efficient than it.

Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20200520155618.369873-1-bdenes@scylladb.com>
This commit is contained in:
Botond Dénes
2020-05-20 18:56:18 +03:00
committed by Avi Kivity
parent 7ce4a8b458
commit c29ccdea7e

View File

@@ -442,7 +442,7 @@ class repair_writer {
size_t _nr_peer_nodes;
// Needs more than one for repair master
std::vector<std::optional<future<>>> _writer_done;
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
std::vector<std::optional<queue_reader_handle>> _mq;
// Current partition written to disk
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
// Is current partition still open. A partition is opened when a
@@ -467,14 +467,14 @@ public:
future<> write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf, unsigned node_idx) {
_current_dk_written_to_sstable[node_idx] = dk;
if (mf.is_partition_start()) {
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf))).then([this, node_idx] {
return _mq[node_idx]->push(std::move(mf)).then([this, node_idx] {
_partition_opened[node_idx] = true;
});
} else {
auto start = mutation_fragment(partition_start(dk->dk, tombstone()));
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(start))).then([this, node_idx, mf = std::move(mf)] () mutable {
return _mq[node_idx]->push(std::move(start)).then([this, node_idx, mf = std::move(mf)] () mutable {
_partition_opened[node_idx] = true;
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
return _mq[node_idx]->push(std::move(mf));
});
}
};
@@ -490,13 +490,10 @@ public:
if (_writer_done[node_idx]) {
return;
}
_mq[node_idx] = seastar::queue<mutation_fragment_opt>(16);
auto get_next_mutation_fragment = [this, node_idx] () mutable {
return _mq[node_idx]->pop_eventually();
};
auto [queue_reader, queue_handle] = make_queue_reader(_schema);
_mq[node_idx] = std::move(queue_handle);
table& t = db.local().find_column_family(_schema->id());
_writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema,
make_generating_reader(_schema, std::move(get_next_mutation_fragment)),
_writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(queue_reader),
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
auto& t = db.local().find_column_family(reader.schema());
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
@@ -538,7 +535,7 @@ public:
future<> write_partition_end(unsigned node_idx) {
if (_partition_opened[node_idx]) {
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] {
return _mq[node_idx]->push(partition_end()).then([this, node_idx] {
_partition_opened[node_idx] = false;
});
}
@@ -548,7 +545,7 @@ public:
future<> do_write(unsigned node_idx, lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
if (_current_dk_written_to_sstable[node_idx]) {
if (_current_dk_written_to_sstable[node_idx]->dk.equal(*_schema, dk->dk)) {
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
return _mq[node_idx]->push(std::move(mf));
} else {
return write_partition_end(node_idx).then([this,
node_idx, dk = std::move(dk), mf = std::move(mf)] () mutable {
@@ -565,8 +562,7 @@ public:
return with_semaphore(_sem, 1, [this, node_idx] {
// Partition_end is never sent on wire, so we have to write one ourselves.
return write_partition_end(node_idx).then([this, node_idx] () mutable {
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
_mq[node_idx]->push_end_of_stream();
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
_mq[node_idx]->abort(ep);
rlogger.warn("repair_writer: keyspace={}, table={}, write_end_of_stream failed: {}",