mutation_reader: convert foreign_reader to v2
This commit is contained in:
@@ -859,13 +859,13 @@ struct remote_fill_buffer_result_v2 {
|
||||
}
|
||||
|
||||
/// See make_foreign_reader() for description.
|
||||
class foreign_reader : public flat_mutation_reader::impl {
|
||||
class foreign_reader : public flat_mutation_reader_v2::impl {
|
||||
template <typename T>
|
||||
using foreign_unique_ptr = foreign_ptr<std::unique_ptr<T>>;
|
||||
|
||||
using fragment_buffer = flat_mutation_reader::tracked_buffer;
|
||||
using fragment_buffer = flat_mutation_reader_v2::tracked_buffer;
|
||||
|
||||
foreign_unique_ptr<flat_mutation_reader> _reader;
|
||||
foreign_unique_ptr<flat_mutation_reader_v2> _reader;
|
||||
foreign_unique_ptr<future<>> _read_ahead_future;
|
||||
streamed_mutation::forwarding _fwd_sm;
|
||||
|
||||
@@ -909,7 +909,7 @@ class foreign_reader : public flat_mutation_reader::impl {
|
||||
public:
|
||||
foreign_reader(schema_ptr schema,
|
||||
reader_permit permit,
|
||||
foreign_unique_ptr<flat_mutation_reader> reader,
|
||||
foreign_unique_ptr<flat_mutation_reader_v2> reader,
|
||||
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no);
|
||||
|
||||
// this is captured.
|
||||
@@ -927,7 +927,7 @@ public:
|
||||
|
||||
foreign_reader::foreign_reader(schema_ptr schema,
|
||||
reader_permit permit,
|
||||
foreign_unique_ptr<flat_mutation_reader> reader,
|
||||
foreign_unique_ptr<flat_mutation_reader_v2> reader,
|
||||
streamed_mutation::forwarding fwd_sm)
|
||||
: impl(std::move(schema), std::move(permit))
|
||||
, _reader(std::move(reader))
|
||||
@@ -942,13 +942,13 @@ future<> foreign_reader::fill_buffer() {
|
||||
return forward_operation([reader = _reader.get()] () {
|
||||
auto f = reader->is_buffer_empty() ? reader->fill_buffer() : make_ready_future<>();
|
||||
return f.then([=] {
|
||||
return make_ready_future<remote_fill_buffer_result>(remote_fill_buffer_result(reader->detach_buffer(), reader->is_end_of_stream()));
|
||||
return make_ready_future<remote_fill_buffer_result_v2>(remote_fill_buffer_result_v2(reader->detach_buffer(), reader->is_end_of_stream()));
|
||||
});
|
||||
}).then([this] (remote_fill_buffer_result res) mutable {
|
||||
}).then([this] (remote_fill_buffer_result_v2 res) mutable {
|
||||
_end_of_stream = res.end_of_stream;
|
||||
for (const auto& mf : *res.buffer) {
|
||||
// Need a copy since the mf is on the remote shard.
|
||||
push_mutation_fragment(mutation_fragment(*_schema, _permit, mf));
|
||||
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, mf));
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -1005,14 +1005,14 @@ future<> foreign_reader::close() noexcept {
|
||||
});
|
||||
}
|
||||
|
||||
flat_mutation_reader make_foreign_reader(schema_ptr schema,
|
||||
flat_mutation_reader_v2 make_foreign_reader(schema_ptr schema,
|
||||
reader_permit permit,
|
||||
foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader,
|
||||
foreign_ptr<std::unique_ptr<flat_mutation_reader_v2>> reader,
|
||||
streamed_mutation::forwarding fwd_sm) {
|
||||
if (reader.get_owner_shard() == this_shard_id()) {
|
||||
return std::move(*reader);
|
||||
}
|
||||
return make_flat_mutation_reader<foreign_reader>(std::move(schema), std::move(permit), std::move(reader), fwd_sm);
|
||||
return make_flat_mutation_reader_v2<foreign_reader>(std::move(schema), std::move(permit), std::move(reader), fwd_sm);
|
||||
}
|
||||
|
||||
template <typename... Arg>
|
||||
|
||||
@@ -359,9 +359,9 @@ using mutation_source_opt = optimized_optional<mutation_source>;
|
||||
/// If the reader resides on this shard (the shard where make_foreign_reader()
|
||||
/// is called) there is no need to wrap it in foreign_reader, just return it as
|
||||
/// is.
|
||||
flat_mutation_reader make_foreign_reader(schema_ptr schema,
|
||||
flat_mutation_reader_v2 make_foreign_reader(schema_ptr schema,
|
||||
reader_permit permit,
|
||||
foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader,
|
||||
foreign_ptr<std::unique_ptr<flat_mutation_reader_v2>> reader,
|
||||
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no);
|
||||
|
||||
/// Make an auto-paused evictable reader.
|
||||
|
||||
@@ -108,11 +108,11 @@ future<> multishard_writer::make_shard_writer(unsigned shard) {
|
||||
_queue_reader_handles[shard] = std::move(handle);
|
||||
return smp::submit_to(shard, [gs = global_schema_ptr(_s),
|
||||
consumer = _consumer,
|
||||
reader = make_foreign(std::make_unique<flat_mutation_reader>(std::move(reader)))] () mutable {
|
||||
reader = make_foreign(std::make_unique<flat_mutation_reader_v2>(upgrade_to_v2(std::move(reader))))] () mutable {
|
||||
auto s = gs.get();
|
||||
auto semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, "shard_writer");
|
||||
auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer", db::no_timeout);
|
||||
auto this_shard_reader = make_foreign_reader(s, std::move(permit), std::move(reader));
|
||||
auto this_shard_reader = downgrade_to_v1(make_foreign_reader(s, std::move(permit), std::move(reader)));
|
||||
return make_foreign(std::make_unique<shard_writer>(gs.get(), std::move(semaphore), std::move(this_shard_reader), consumer));
|
||||
}).then([this, shard] (foreign_ptr<std::unique_ptr<shard_writer>> writer) {
|
||||
_shard_writers[shard] = std::move(writer);
|
||||
|
||||
@@ -1260,14 +1260,14 @@ SEASTAR_THREAD_TEST_CASE(test_foreign_reader_as_mutation_source) {
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
auto remote_reader = env.db().invoke_on(remote_shard,
|
||||
[&, s = global_schema_ptr(s), fwd_sm, fwd_mr, trace_state = tracing::global_trace_state_ptr(trace_state)] (replica::database& db) {
|
||||
return make_foreign(std::make_unique<flat_mutation_reader>(remote_mt->make_flat_reader(s.get(),
|
||||
return make_foreign(std::make_unique<flat_mutation_reader_v2>(upgrade_to_v2(remote_mt->make_flat_reader(s.get(),
|
||||
make_reader_permit(env),
|
||||
range,
|
||||
slice,
|
||||
pc,
|
||||
trace_state.get(),
|
||||
fwd_sm,
|
||||
fwd_mr)));
|
||||
fwd_mr))));
|
||||
}).get0();
|
||||
return make_foreign_reader(s, std::move(permit), std::move(remote_reader), fwd_sm);
|
||||
};
|
||||
@@ -1900,14 +1900,14 @@ SEASTAR_THREAD_TEST_CASE(test_stopping_reader_with_pending_read_ahead) {
|
||||
const auto shard_of_interest = (this_shard_id() + 1) % smp::count;
|
||||
auto s = simple_schema();
|
||||
auto remote_control_remote_reader = smp::submit_to(shard_of_interest, [&env, gs = global_simple_schema(s)] {
|
||||
using control_type = foreign_ptr<std::unique_ptr<puppet_reader::control>>;
|
||||
using reader_type = foreign_ptr<std::unique_ptr<flat_mutation_reader>>;
|
||||
using control_type = foreign_ptr<std::unique_ptr<puppet_reader_v2::control>>;
|
||||
using reader_type = foreign_ptr<std::unique_ptr<flat_mutation_reader_v2>>;
|
||||
|
||||
auto control = make_foreign(std::make_unique<puppet_reader::control>());
|
||||
auto reader = make_foreign(std::make_unique<flat_mutation_reader>(make_flat_mutation_reader<puppet_reader>(gs.get(),
|
||||
auto control = make_foreign(std::make_unique<puppet_reader_v2::control>());
|
||||
auto reader = make_foreign(std::make_unique<flat_mutation_reader_v2>(make_flat_mutation_reader_v2<puppet_reader_v2>(gs.get(),
|
||||
make_reader_permit(env),
|
||||
*control,
|
||||
std::vector{puppet_reader::fill_buffer_action::fill, puppet_reader::fill_buffer_action::block},
|
||||
std::vector{puppet_reader_v2::fill_buffer_action::fill, puppet_reader_v2::fill_buffer_action::block},
|
||||
std::vector<uint32_t>{0, 1})));
|
||||
|
||||
return make_ready_future<std::tuple<control_type, reader_type>>(std::tuple(std::move(control), std::move(reader)));
|
||||
|
||||
Reference in New Issue
Block a user