diff --git a/repair/row_level.cc b/repair/row_level.cc index 2b631ea323..29320cda40 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -442,7 +442,7 @@ class repair_writer { size_t _nr_peer_nodes; // Needs more than one for repair master std::vector>> _writer_done; - std::vector>> _mq; + std::vector> _mq; // Current partition written to disk std::vector> _current_dk_written_to_sstable; // Is current partition still open. A partition is opened when a @@ -467,14 +467,14 @@ public: future<> write_start_and_mf(lw_shared_ptr dk, mutation_fragment mf, unsigned node_idx) { _current_dk_written_to_sstable[node_idx] = dk; if (mf.is_partition_start()) { - return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf))).then([this, node_idx] { + return _mq[node_idx]->push(std::move(mf)).then([this, node_idx] { _partition_opened[node_idx] = true; }); } else { auto start = mutation_fragment(partition_start(dk->dk, tombstone())); - return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(start))).then([this, node_idx, mf = std::move(mf)] () mutable { + return _mq[node_idx]->push(std::move(start)).then([this, node_idx, mf = std::move(mf)] () mutable { _partition_opened[node_idx] = true; - return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf))); + return _mq[node_idx]->push(std::move(mf)); }); } }; @@ -490,13 +490,10 @@ public: if (_writer_done[node_idx]) { return; } - _mq[node_idx] = seastar::queue(16); - auto get_next_mutation_fragment = [this, node_idx] () mutable { - return _mq[node_idx]->pop_eventually(); - }; + auto [queue_reader, queue_handle] = make_queue_reader(_schema); + _mq[node_idx] = std::move(queue_handle); table& t = db.local().find_column_family(_schema->id()); - _writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema, - make_generating_reader(_schema, std::move(get_next_mutation_fragment)), + _writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(queue_reader), [&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) { auto& t = db.local().find_column_family(reader.schema()); return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable { @@ -538,7 +535,7 @@ public: future<> write_partition_end(unsigned node_idx) { if (_partition_opened[node_idx]) { - return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] { + return _mq[node_idx]->push(partition_end()).then([this, node_idx] { _partition_opened[node_idx] = false; }); } @@ -548,7 +545,7 @@ public: future<> do_write(unsigned node_idx, lw_shared_ptr dk, mutation_fragment mf) { if (_current_dk_written_to_sstable[node_idx]) { if (_current_dk_written_to_sstable[node_idx]->dk.equal(*_schema, dk->dk)) { - return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf))); + return _mq[node_idx]->push(std::move(mf)); } else { return write_partition_end(node_idx).then([this, node_idx, dk = std::move(dk), mf = std::move(mf)] () mutable { @@ -565,8 +562,7 @@ public: return with_semaphore(_sem, 1, [this, node_idx] { // Partition_end is never sent on wire, so we have to write one ourselves. return write_partition_end(node_idx).then([this, node_idx] () mutable { - // Empty mutation_fragment_opt means no more data, so the writer can seal the sstables. - return _mq[node_idx]->push_eventually(mutation_fragment_opt()); + _mq[node_idx]->push_end_of_stream(); }).handle_exception([this, node_idx] (std::exception_ptr ep) { _mq[node_idx]->abort(ep); rlogger.warn("repair_writer: keyspace={}, table={}, write_end_of_stream failed: {}",