From ab440e1a07c3f4aa9e32b3f66776d8844003a87f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 2 Mar 2022 16:59:45 +0200 Subject: [PATCH] mutation_writer: drop now unused v1 variants of bucket_writer feed_writer() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Botond Dénes Message-Id: <20220302145945.189607-2-bdenes@scylladb.com> --- mutation_writer/feed_writers.cc | 40 --------------------- mutation_writer/feed_writers.hh | 63 --------------------------------- 2 files changed, 103 deletions(-) diff --git a/mutation_writer/feed_writers.cc b/mutation_writer/feed_writers.cc index d065690237..aa164a40c4 100644 --- a/mutation_writer/feed_writers.cc +++ b/mutation_writer/feed_writers.cc @@ -10,46 +10,6 @@ namespace mutation_writer { -bucket_writer::bucket_writer(schema_ptr schema, std::pair queue_reader, reader_consumer& consumer) - : _schema(schema) - , _handle(std::move(queue_reader.second)) - , _consume_fut(consumer(std::move(queue_reader.first))) -{ } - -bucket_writer::bucket_writer(schema_ptr schema, reader_permit permit, reader_consumer& consumer) - : bucket_writer(schema, make_queue_reader(schema, std::move(permit)), consumer) -{ } - -future<> bucket_writer::consume(mutation_fragment mf) { - if (_handle.is_terminated()) { - // When the handle is terminated, it was aborted - // or associated reader was closed prematurely. - // In this case return _consume_fut that will propagate - // the root-cause error. - auto ex = _handle.get_exception(); - if (!ex) { - // shouldn't really happen - ex = make_exception_ptr(std::runtime_error("queue_reader_handle is terminated")); - } - return std::exchange(_consume_fut, make_exception_future<>(ex)).then([ex = std::move(ex)] () mutable { - return make_exception_future<>(std::move(ex)); - }); - } - return _handle.push(std::move(mf)); -} - -void bucket_writer::consume_end_of_stream() { - _handle.push_end_of_stream(); -} - -void bucket_writer::abort(std::exception_ptr ep) noexcept { - _handle.abort(std::move(ep)); -} - -future<> bucket_writer::close() noexcept { - return std::move(_consume_fut); -} - bucket_writer_v2::bucket_writer_v2(schema_ptr schema, std::pair queue_reader, reader_consumer_v2& consumer) : _schema(schema) , _handle(std::move(queue_reader.second)) diff --git a/mutation_writer/feed_writers.hh b/mutation_writer/feed_writers.hh index ad4acbfde1..beffbb2086 100644 --- a/mutation_writer/feed_writers.hh +++ b/mutation_writer/feed_writers.hh @@ -14,69 +14,6 @@ namespace mutation_writer { -class bucket_writer { - schema_ptr _schema; - queue_reader_handle _handle; - future<> _consume_fut; - -private: - bucket_writer(schema_ptr schema, std::pair queue_reader, reader_consumer& consumer); - -public: - bucket_writer(schema_ptr schema, reader_permit permit, reader_consumer& consumer); - - future<> consume(mutation_fragment mf); - - void consume_end_of_stream(); - - void abort(std::exception_ptr ep) noexcept; - - future<> close() noexcept; -}; - -template -requires MutationFragmentConsumer> -future<> feed_writer(flat_mutation_reader&& rd_ref, Writer wr) { - // Only move in reader if stack was successfully allocated, so caller can close reader otherwise. - auto rd = std::move(rd_ref); - std::exception_ptr ex; - try { - while (!rd.is_end_of_stream() || !rd.is_buffer_empty()) { - co_await rd.fill_buffer(); - while (!rd.is_buffer_empty()) { - co_await rd.pop_mutation_fragment().consume(wr); - } - } - } catch (...) { - ex = std::current_exception(); - } - - co_await rd.close(); - - try { - if (ex) { - wr.abort(ex); - } else { - wr.consume_end_of_stream(); - } - } catch (...) { - if (!ex) { - ex = std::current_exception(); - } - } - - try { - co_await wr.close(); - } catch (...) { - if (!ex) { - ex = std::current_exception(); - } - } - if (ex) { - std::rethrow_exception(ex); - } -} - class bucket_writer_v2 { schema_ptr _schema; queue_reader_handle_v2 _handle;