mutation_writer: drop now unused v1 variants of bucket_writer feed_writer()
Signed-off-by: Botond Dénes <bdenes@scylladb.com> Message-Id: <20220302145945.189607-2-bdenes@scylladb.com>
This commit is contained in:
@@ -10,46 +10,6 @@
|
||||
|
||||
namespace mutation_writer {
|
||||
|
||||
bucket_writer::bucket_writer(schema_ptr schema, std::pair<flat_mutation_reader, queue_reader_handle> queue_reader, reader_consumer& consumer)
|
||||
: _schema(schema)
|
||||
, _handle(std::move(queue_reader.second))
|
||||
, _consume_fut(consumer(std::move(queue_reader.first)))
|
||||
{ }
|
||||
|
||||
bucket_writer::bucket_writer(schema_ptr schema, reader_permit permit, reader_consumer& consumer)
|
||||
: bucket_writer(schema, make_queue_reader(schema, std::move(permit)), consumer)
|
||||
{ }
|
||||
|
||||
future<> bucket_writer::consume(mutation_fragment mf) {
|
||||
if (_handle.is_terminated()) {
|
||||
// When the handle is terminated, it was aborted
|
||||
// or associated reader was closed prematurely.
|
||||
// In this case return _consume_fut that will propagate
|
||||
// the root-cause error.
|
||||
auto ex = _handle.get_exception();
|
||||
if (!ex) {
|
||||
// shouldn't really happen
|
||||
ex = make_exception_ptr(std::runtime_error("queue_reader_handle is terminated"));
|
||||
}
|
||||
return std::exchange(_consume_fut, make_exception_future<>(ex)).then([ex = std::move(ex)] () mutable {
|
||||
return make_exception_future<>(std::move(ex));
|
||||
});
|
||||
}
|
||||
return _handle.push(std::move(mf));
|
||||
}
|
||||
|
||||
void bucket_writer::consume_end_of_stream() {
|
||||
_handle.push_end_of_stream();
|
||||
}
|
||||
|
||||
void bucket_writer::abort(std::exception_ptr ep) noexcept {
|
||||
_handle.abort(std::move(ep));
|
||||
}
|
||||
|
||||
future<> bucket_writer::close() noexcept {
|
||||
return std::move(_consume_fut);
|
||||
}
|
||||
|
||||
bucket_writer_v2::bucket_writer_v2(schema_ptr schema, std::pair<flat_mutation_reader_v2, queue_reader_handle_v2> queue_reader, reader_consumer_v2& consumer)
|
||||
: _schema(schema)
|
||||
, _handle(std::move(queue_reader.second))
|
||||
|
||||
@@ -14,69 +14,6 @@
|
||||
|
||||
namespace mutation_writer {
|
||||
|
||||
class bucket_writer {
|
||||
schema_ptr _schema;
|
||||
queue_reader_handle _handle;
|
||||
future<> _consume_fut;
|
||||
|
||||
private:
|
||||
bucket_writer(schema_ptr schema, std::pair<flat_mutation_reader, queue_reader_handle> queue_reader, reader_consumer& consumer);
|
||||
|
||||
public:
|
||||
bucket_writer(schema_ptr schema, reader_permit permit, reader_consumer& consumer);
|
||||
|
||||
future<> consume(mutation_fragment mf);
|
||||
|
||||
void consume_end_of_stream();
|
||||
|
||||
void abort(std::exception_ptr ep) noexcept;
|
||||
|
||||
future<> close() noexcept;
|
||||
};
|
||||
|
||||
template <typename Writer>
|
||||
requires MutationFragmentConsumer<Writer, future<>>
|
||||
future<> feed_writer(flat_mutation_reader&& rd_ref, Writer wr) {
|
||||
// Only move in reader if stack was successfully allocated, so caller can close reader otherwise.
|
||||
auto rd = std::move(rd_ref);
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
while (!rd.is_end_of_stream() || !rd.is_buffer_empty()) {
|
||||
co_await rd.fill_buffer();
|
||||
while (!rd.is_buffer_empty()) {
|
||||
co_await rd.pop_mutation_fragment().consume(wr);
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
co_await rd.close();
|
||||
|
||||
try {
|
||||
if (ex) {
|
||||
wr.abort(ex);
|
||||
} else {
|
||||
wr.consume_end_of_stream();
|
||||
}
|
||||
} catch (...) {
|
||||
if (!ex) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
co_await wr.close();
|
||||
} catch (...) {
|
||||
if (!ex) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
}
|
||||
if (ex) {
|
||||
std::rethrow_exception(ex);
|
||||
}
|
||||
}
|
||||
|
||||
class bucket_writer_v2 {
|
||||
schema_ptr _schema;
|
||||
queue_reader_handle_v2 _handle;
|
||||
|
||||
Reference in New Issue
Block a user