Merge "Fix cache reader skipping rows in some cases" from Tomasz
"Fixes the problem of concurrent populations of clustering row ranges
leading to some readers skipping over some of the rows.
Spotted during code review.
Fixes #2834."
* tag 'tgrabiec/fix-cache-reader-skipping-rows-v2' of github.com:scylladb/seastar-dev:
tests: mvcc: Add test for partition_snapshot_row_cursor
tests: row_cache: Add test for concurrent population
tests: row_cache: Make populate_range() accept partition_range
tests: Add simple_schema::make_ckey_range()
cache_streamed_mutation: Add missing _next_row.maybe_refresh() call
mvcc: partition_snapshot_row_cursor: Fix cursor skipping over rows added after its position
mvcc: partition_snapshot_row_cursor: Rename up_to_date() to iterators_valid()
mvcc: Keep track of all iterators in partition_snapshot_row_cursor
mvcc: Make partition_snapshot_row_cursor printable
(cherry picked from commit af1976bc30)
[tgrabiec: resolved conflicts]
This commit is contained in:
committed by
Tomasz Grabiec
parent
454b90980a
commit
f9864686d2
@@ -327,7 +327,7 @@ void cache_streamed_mutation::maybe_add_to_cache(const clustering_row& cr) {
|
||||
auto new_entry = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(cr.key(), cr.tomb(), cr.marker(), cr.cells()));
|
||||
new_entry->set_continuous(false);
|
||||
auto it = _next_row.has_up_to_date_row_from_latest_version()
|
||||
auto it = _next_row.has_valid_row_from_latest_version()
|
||||
? _next_row.get_iterator_in_latest_version() : mp.clustered_rows().lower_bound(cr.key(), less);
|
||||
auto insert_result = mp.clustered_rows().insert_check(it, *new_entry, less);
|
||||
if (insert_result.second) {
|
||||
|
||||
@@ -34,6 +34,8 @@
|
||||
// When the cursor is invalidated, it still maintains its previous position. It can be brought
|
||||
// back to validity by calling maybe_refresh(), or advance_to().
|
||||
//
|
||||
// Insertion of row entries after cursor's position invalidates the cursor.
|
||||
//
|
||||
class partition_snapshot_row_cursor final {
|
||||
struct position_in_version {
|
||||
mutation_partition::rows_type::iterator it;
|
||||
@@ -55,6 +57,7 @@ class partition_snapshot_row_cursor final {
|
||||
logalloc::region& _region;
|
||||
partition_snapshot& _snp;
|
||||
std::vector<position_in_version> _heap;
|
||||
std::vector<mutation_partition::rows_type::iterator> _iterators;
|
||||
std::vector<position_in_version> _current_row;
|
||||
position_in_partition _position;
|
||||
uint64_t _last_reclaim_count = 0;
|
||||
@@ -78,13 +81,16 @@ public:
|
||||
, _snp(snp)
|
||||
, _position(position_in_partition::static_row_tag_t{})
|
||||
{ }
|
||||
bool has_up_to_date_row_from_latest_version() const {
|
||||
return up_to_date() && _current_row[0].version_no == 0;
|
||||
bool has_valid_row_from_latest_version() const {
|
||||
return iterators_valid() && _current_row[0].version_no == 0;
|
||||
}
|
||||
mutation_partition::rows_type::iterator get_iterator_in_latest_version() const {
|
||||
return _current_row[0].it;
|
||||
return _iterators[0];
|
||||
}
|
||||
bool up_to_date() const {
|
||||
|
||||
// Returns true iff the iterators obtained since the cursor was last made valid
|
||||
// are still valid. Note that this doesn't mean that the cursor itself is valid.
|
||||
bool iterators_valid() const {
|
||||
return _region.reclaim_counter() == _last_reclaim_count && _last_versions_count == _snp.version_count();
|
||||
}
|
||||
|
||||
@@ -97,9 +103,40 @@ public:
|
||||
//
|
||||
// but avoids work if not necessary.
|
||||
bool maybe_refresh() {
|
||||
if (!up_to_date()) {
|
||||
if (!iterators_valid()) {
|
||||
return advance_to(_position);
|
||||
}
|
||||
// Refresh latest version's iterator in case there was an insertion
|
||||
// before it and after cursor's position. There cannot be any
|
||||
// insertions for non-latest versions, so we don't have to update them.
|
||||
if (_current_row[0].version_no != 0) {
|
||||
rows_entry::compare less(_schema);
|
||||
position_in_partition::equal_compare eq(_schema);
|
||||
position_in_version::less_compare heap_less(_schema);
|
||||
auto& rows = _snp.version()->partition().clustered_rows();
|
||||
auto it = _iterators[0] = rows.lower_bound(_position, less);
|
||||
auto heap_i = boost::find_if(_heap, [](auto&& v) { return v.version_no == 0; });
|
||||
if (it == rows.end()) {
|
||||
if (heap_i != _heap.end()) {
|
||||
_heap.erase(heap_i);
|
||||
boost::range::make_heap(_heap, heap_less);
|
||||
}
|
||||
} else if (eq(_position, it->position())) {
|
||||
_current_row.insert(_current_row.begin(), position_in_version{it, rows.end(), 0});
|
||||
if (heap_i != _heap.end()) {
|
||||
_heap.erase(heap_i);
|
||||
boost::range::make_heap(_heap, heap_less);
|
||||
}
|
||||
} else {
|
||||
if (heap_i != _heap.end()) {
|
||||
heap_i->it = it;
|
||||
boost::range::make_heap(_heap, heap_less);
|
||||
} else {
|
||||
_heap.push_back({it, rows.end(), 0});
|
||||
boost::range::push_heap(_heap, heap_less);
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -119,11 +156,13 @@ public:
|
||||
position_in_version::less_compare heap_less(_schema);
|
||||
_heap.clear();
|
||||
_current_row.clear();
|
||||
_iterators.clear();
|
||||
int version_no = 0;
|
||||
for (auto&& v : _snp.versions()) {
|
||||
auto& rows = v.partition().clustered_rows();
|
||||
auto pos = rows.lower_bound(lower_bound, less);
|
||||
auto end = rows.end();
|
||||
_iterators.push_back(pos);
|
||||
if (pos != end) {
|
||||
_heap.push_back({pos, end, version_no});
|
||||
}
|
||||
@@ -142,9 +181,10 @@ public:
|
||||
// Can be only called on a valid cursor pointing at a row.
|
||||
bool next() {
|
||||
position_in_version::less_compare heap_less(_schema);
|
||||
assert(up_to_date());
|
||||
assert(iterators_valid());
|
||||
for (auto&& curr : _current_row) {
|
||||
++curr.it;
|
||||
_iterators[curr.version_no] = curr.it;
|
||||
if (curr.it != curr.end) {
|
||||
_heap.push_back(curr);
|
||||
boost::range::push_heap(_heap, heap_less);
|
||||
@@ -186,6 +226,32 @@ public:
|
||||
bool is_in_latest_version() const;
|
||||
bool previous_row_in_latest_version_has_key(const clustering_key_prefix& key) const;
|
||||
void set_continuous(bool val);
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& out, const partition_snapshot_row_cursor& cur) {
|
||||
out << "{cursor: position=" << cur._position << ", ";
|
||||
if (!cur.iterators_valid()) {
|
||||
return out << " iterators invalid}";
|
||||
}
|
||||
out << "current=[";
|
||||
bool first = true;
|
||||
for (auto&& v : cur._current_row) {
|
||||
if (!first) {
|
||||
out << ", ";
|
||||
}
|
||||
first = false;
|
||||
out << v.version_no;
|
||||
}
|
||||
out << "], heap=[";
|
||||
first = true;
|
||||
for (auto&& v : cur._heap) {
|
||||
if (!first) {
|
||||
out << ", ";
|
||||
}
|
||||
first = false;
|
||||
out << "{v=" << v.version_no << ", pos=" << v.it->position() << "}";
|
||||
}
|
||||
return out << "]}";
|
||||
};
|
||||
};
|
||||
|
||||
inline
|
||||
|
||||
@@ -1743,6 +1743,29 @@ SEASTAR_TEST_CASE(test_tombstone_merging_in_partial_partition) {
|
||||
});
|
||||
}
|
||||
|
||||
static void consume_all(mutation_reader& rd) {
|
||||
while (streamed_mutation_opt smo = rd().get0()) {
|
||||
auto&& sm = *smo;
|
||||
while (sm().get0()) ;
|
||||
}
|
||||
}
|
||||
|
||||
static void populate_range(row_cache& cache,
|
||||
const dht::partition_range& pr = query::full_partition_range,
|
||||
const query::clustering_range& r = query::full_clustering_range)
|
||||
{
|
||||
auto slice = partition_slice_builder(*cache.schema()).with_range(r).build();
|
||||
auto rd = cache.make_reader(cache.schema(), pr, slice);
|
||||
consume_all(rd);
|
||||
}
|
||||
|
||||
static void apply(row_cache& cache, memtable_snapshot_source& underlying, const mutation& m) {
|
||||
auto mt = make_lw_shared<memtable>(m.schema());
|
||||
mt->apply(m);
|
||||
underlying.apply(m);
|
||||
cache.update(*mt, make_default_partition_presence_checker()).get();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_query_only_static_row) {
|
||||
return seastar::async([] {
|
||||
simple_schema s;
|
||||
@@ -1779,3 +1802,87 @@ SEASTAR_TEST_CASE(test_query_only_static_row) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_concurrent_population_before_latest_version_iterator) {
|
||||
return seastar::async([] {
|
||||
simple_schema s;
|
||||
cache_tracker tracker;
|
||||
memtable_snapshot_source underlying(s.schema());
|
||||
|
||||
auto pk = s.make_pkey(0);
|
||||
auto pr = dht::partition_range::make_singular(pk);
|
||||
|
||||
mutation m1(pk, s.schema());
|
||||
s.add_row(m1, s.make_ckey(0), "v");
|
||||
s.add_row(m1, s.make_ckey(1), "v");
|
||||
underlying.apply(m1);
|
||||
|
||||
row_cache cache(s.schema(), snapshot_source([&] { return underlying(); }), tracker);
|
||||
|
||||
auto make_sm = [&] (const query::partition_slice& slice = query::full_slice) {
|
||||
auto rd = cache.make_reader(s.schema(), pr, slice);
|
||||
auto smo = rd().get0();
|
||||
BOOST_REQUIRE(smo);
|
||||
streamed_mutation& sm = *smo;
|
||||
sm.set_max_buffer_size(1);
|
||||
return assert_that_stream(std::move(sm));
|
||||
};
|
||||
|
||||
{
|
||||
populate_range(cache, pr, s.make_ckey_range(0, 1));
|
||||
auto rd = make_sm(); // to keep current version alive
|
||||
|
||||
mutation m2(pk, s.schema());
|
||||
s.add_row(m2, s.make_ckey(2), "v");
|
||||
s.add_row(m2, s.make_ckey(3), "v");
|
||||
s.add_row(m2, s.make_ckey(4), "v");
|
||||
apply(cache, underlying, m2);
|
||||
|
||||
auto slice1 = partition_slice_builder(*s.schema())
|
||||
.with_range(s.make_ckey_range(0, 5))
|
||||
.build();
|
||||
|
||||
auto sma1 = make_sm(slice1);
|
||||
sma1.produces_row_with_key(s.make_ckey(0));
|
||||
|
||||
populate_range(cache, pr, s.make_ckey_range(3, 3));
|
||||
|
||||
auto sma2 = make_sm(slice1);
|
||||
|
||||
sma2.produces_row_with_key(s.make_ckey(0));
|
||||
|
||||
populate_range(cache, pr, s.make_ckey_range(2, 3));
|
||||
|
||||
sma2.produces_row_with_key(s.make_ckey(1));
|
||||
sma2.produces_row_with_key(s.make_ckey(2));
|
||||
sma2.produces_row_with_key(s.make_ckey(3));
|
||||
sma2.produces_row_with_key(s.make_ckey(4));
|
||||
sma2.produces_end_of_stream();
|
||||
|
||||
sma1.produces_row_with_key(s.make_ckey(1));
|
||||
sma1.produces_row_with_key(s.make_ckey(2));
|
||||
sma1.produces_row_with_key(s.make_ckey(3));
|
||||
sma1.produces_row_with_key(s.make_ckey(4));
|
||||
sma1.produces_end_of_stream();
|
||||
}
|
||||
|
||||
{
|
||||
cache.evict();
|
||||
populate_range(cache, pr, s.make_ckey_range(4, 4));
|
||||
|
||||
auto slice1 = partition_slice_builder(*s.schema())
|
||||
.with_range(s.make_ckey_range(0, 1))
|
||||
.with_range(s.make_ckey_range(3, 3))
|
||||
.build();
|
||||
auto sma1 = make_sm(slice1);
|
||||
|
||||
sma1.produces_row_with_key(s.make_ckey(0));
|
||||
|
||||
populate_range(cache, pr, s.make_ckey_range(2, 4));
|
||||
|
||||
sma1.produces_row_with_key(s.make_ckey(1));
|
||||
sma1.produces_row_with_key(s.make_ckey(3));
|
||||
sma1.produces_end_of_stream();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -58,6 +58,10 @@ public:
|
||||
return clustering_key::from_single_value(*_s, data_value(ck).serialize());
|
||||
}
|
||||
|
||||
query::clustering_range make_ckey_range(uint32_t start_inclusive, uint32_t end_inclusive) {
|
||||
return query::clustering_range::make({make_ckey(start_inclusive)}, {make_ckey(end_inclusive)});
|
||||
}
|
||||
|
||||
// Make a clustering_key which is n-th in some arbitrary sequence of keys
|
||||
clustering_key make_ckey(uint32_t n) {
|
||||
return make_ckey(sprint("ck%010d", n));
|
||||
|
||||
Reference in New Issue
Block a user