Merge "Fixes for skipping in sstable reader" from Tomasz

* 'tgrabiec/fix-fast-forwarding' of github.com:scylladb/seastar-dev:
  tests: mutation_source_test: Add more tests for fast forwarding across partitions
  sstables: Fix abort in mutation reader for certain skip pattern
  sstables: Fix reader returning partition past the query range in some cases
  sstables: Introduce data_consume_context::eof()
This commit is contained in:
Avi Kivity
2017-08-28 12:48:02 +03:00
5 changed files with 112 additions and 1 deletions

View File

@@ -316,6 +316,10 @@ public:
return _stream_position;
}
bool eof() const {
return _remain == 0;
}
future<> close() {
return _input.close();
}

View File

@@ -1149,6 +1149,10 @@ future<> sstable_data_source::advance_to_next_partition() {
future<streamed_mutation_opt> sstable_data_source::read_next_partition() {
sstlog.trace("reader {}: read next partition", this);
if (!_read_enabled) {
sstlog.trace("reader {}: eof", this);
return make_ready_future<streamed_mutation_opt>();
}
return advance_to_next_partition().then([this] {
return read_partition();
});
@@ -1173,7 +1177,7 @@ future<streamed_mutation_opt> sstable_data_source::read_partition() {
// need to use the index anyway soon.
//
if (_index_in_current_partition) {
if (_lh_index->eof()) {
if (_context.eof()) {
sstlog.trace("reader {}: eof", this);
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
}

View File

@@ -422,6 +422,9 @@ public:
_ctx->reset(el);
return _ctx->skip_to(begin);
}
bool eof() const {
return _ctx->eof();
}
};
data_consume_context::~data_consume_context() = default;
@@ -442,6 +445,9 @@ future<> data_consume_context::fast_forward_to(uint64_t begin, uint64_t end) {
future<> data_consume_context::skip_to(indexable_element el, uint64_t begin) {
return _pimpl->skip_to(el, begin);
}
bool data_consume_context::eof() const {
return _pimpl->eof();
}
data_consume_context sstable::data_consume_rows(
row_consumer& consumer, sstable::disk_read_range toread, uint64_t last_end) {

View File

@@ -87,6 +87,7 @@ public:
future<> fast_forward_to(uint64_t begin, uint64_t end);
future<> skip_to(indexable_element, uint64_t begin);
uint64_t position() const;
bool eof() const;
// Define (as defaults) the destructor and move operations in the source
// file, so here we don't need to know the incomplete impl type.
~data_consume_context();

View File

@@ -227,6 +227,101 @@ static void test_streamed_mutation_forwarding_guarantees(populate_fn populate) {
}
}
// Reproduces https://github.com/scylladb/scylla/issues/2733
static void test_fast_forwarding_across_partitions_to_empty_range(populate_fn populate) {
BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__);
simple_schema table;
schema_ptr s = table.schema();
std::vector<mutation> partitions;
const unsigned ckeys_per_part = 100;
auto keys = table.make_pkeys(10);
auto missing_key = keys[3];
keys.erase(keys.begin() + 3);
auto key_after_all = keys.back();
keys.erase(keys.begin() + (keys.size() - 1));
unsigned next_ckey = 0;
for (auto&& key : keys) {
mutation m(key, s);
sstring val(sstring::initialized_later(), 1024);
for (auto i : boost::irange(0u, ckeys_per_part)) {
table.add_row(m, table.make_ckey(next_ckey + i), val);
}
next_ckey += ckeys_per_part;
partitions.push_back(m);
}
mutation_source ms = populate(s, partitions);
mutation_reader rd = ms(s,
dht::partition_range::make({keys[0]}, {keys[1]}),
query::full_slice,
default_priority_class(),
nullptr,
streamed_mutation::forwarding::no,
mutation_reader::forwarding::yes);
{
streamed_mutation_opt smo = rd().get0();
BOOST_REQUIRE(smo);
//smo->fast_forward_to(position_range::all_clustered_rows()).get();
smo->fill_buffer().get();
BOOST_REQUIRE(smo->is_buffer_full()); // if not, increase n_ckeys
BOOST_REQUIRE(smo->decorated_key().equal(*s, keys[0]));
assert_that_stream(std::move(*smo))
.produces_row_with_key(table.make_ckey(0))
.produces_row_with_key(table.make_ckey(1));
// ...don't finish consumption to leave the reader in the middle of partition
}
rd.fast_forward_to(dht::partition_range::make({missing_key}, {missing_key})).get();
{
streamed_mutation_opt smo = rd().get0();
BOOST_REQUIRE(!smo);
}
rd.fast_forward_to(dht::partition_range::make({keys[3]}, {keys[3]})).get();
{
streamed_mutation_opt smo = rd().get0();
BOOST_REQUIRE(smo);
BOOST_REQUIRE(smo->decorated_key().equal(*s, keys[3]));
assert_that_stream(std::move(*smo))
.produces_row_with_key(table.make_ckey(ckeys_per_part * 3))
.produces_row_with_key(table.make_ckey(ckeys_per_part * 3 + 1));
}
{
streamed_mutation_opt smo = rd().get0();
BOOST_REQUIRE(!smo);
}
rd.fast_forward_to(dht::partition_range::make_starting_with({keys[keys.size() - 1]})).get();
{
streamed_mutation_opt smo = rd().get0();
BOOST_REQUIRE(smo);
BOOST_REQUIRE(smo->decorated_key().equal(*s, keys.back()));
assert_that_stream(std::move(*smo))
.produces_row_with_key(table.make_ckey(ckeys_per_part * (keys.size() - 1)));
// ...don't finish consumption to leave the reader in the middle of partition
}
rd.fast_forward_to(dht::partition_range::make({key_after_all}, {key_after_all})).get();
{
streamed_mutation_opt smo = rd().get0();
BOOST_REQUIRE(!smo);
}
}
static void test_streamed_mutation_slicing_returns_only_relevant_tombstones(populate_fn populate) {
BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__);
@@ -744,6 +839,7 @@ static void test_query_only_static_row(populate_fn populate) {
}
void run_mutation_source_tests(populate_fn populate) {
test_fast_forwarding_across_partitions_to_empty_range(populate);
test_clustering_slices(populate);
test_streamed_mutation_fragments_have_monotonic_positions(populate);
test_streamed_mutation_forwarding_across_range_tombstones(populate);