cache_streamed_mutation: use consumer based read_context reader

This commit is contained in:
Paweł Dziepak
2017-07-20 13:25:47 +01:00
parent 2066354de3
commit c2ec43f70b
2 changed files with 30 additions and 25 deletions

View File

@@ -216,35 +216,33 @@ future<> cache_streamed_mutation::do_fill_buffer() {
inline
future<> cache_streamed_mutation::read_from_underlying() {
return do_until([this] { return !_reading_underlying || is_buffer_full(); }, [this] {
return _read_context->get_next_fragment().then([this] (auto&& mfopt) {
if (!mfopt) {
_reading_underlying = false;
return _lsa_manager.run_in_update_section([this] {
auto same_pos = _next_row.maybe_refresh();
assert(same_pos); // FIXME: handle eviction
if (_next_row_in_range) {
return consume_mutation_fragments_until(_read_context->get_streamed_mutation(),
[this] { return !_reading_underlying || is_buffer_full(); },
[this] (mutation_fragment mf) {
_read_context->cache().on_row_miss();
maybe_add_to_cache(mf);
add_to_buffer(std::move(mf));
},
[this] {
_reading_underlying = false;
return _lsa_manager.run_in_update_section([this] {
auto same_pos = _next_row.maybe_refresh();
assert(same_pos); // FIXME: handle eviction
if (_next_row_in_range) {
maybe_update_continuity();
add_to_buffer(_next_row);
return move_to_next_entry();
} else {
if (no_clustering_row_between(*_schema, _upper_bound, _next_row.position())) {
this->maybe_update_continuity();
this->add_to_buffer(_next_row);
return this->move_to_next_entry();
} else {
if (no_clustering_row_between(*_schema, _upper_bound, _next_row.position())) {
this->maybe_update_continuity();
} else {
// FIXME: Insert dummy entry at _upper_bound.
_read_context->cache().on_mispopulate();
}
return this->move_to_next_range();
// FIXME: Insert dummy entry at _upper_bound.
_read_context->cache().on_mispopulate();
}
});
} else {
_read_context->cache().on_row_miss();
this->maybe_add_to_cache(*mfopt);
this->add_to_buffer(std::move(*mfopt));
return make_ready_future<>();
}
return move_to_next_range();
}
});
});
});
}
inline

View File

@@ -223,6 +223,13 @@ public:
return _sm->fast_forward_to(std::move(range));
});
}
// Returns the underlying streamed_mutation.
// The caller has to ensure that the streamed mutation was already created
// (e.g. the most recent call to enter_partition(const dht::decorated_key&, ...)
// was followed by a call to fast_forward_to()).
streamed_mutation& get_streamed_mutation() noexcept {
return *_sm;
}
// Gets the next fragment from the underlying streamed_mutation
future<mutation_fragment_opt> get_next_fragment() {
return ensure_sm_created().then([this] {