db/view: Consume updated rows regardless of static row

Using Materialized Views, if the base table has static columns,
and the update in base table mutates static and non static rows,
the streamed_mutation is stopped before process non static row.
The patch avoids stopping the stream_mutation and adds a test case.

Message-Id: <20171220173434.25091-1-tavares.george@gmail.com>
This commit is contained in:
George Tavares
2017-12-20 15:34:34 -02:00
committed by Duarte Nunes
parent dfe48bbbc7
commit ceecd542cd
2 changed files with 39 additions and 2 deletions

View File

@@ -636,8 +636,10 @@ future<stop_iteration> view_update_builder::on_results() {
}
// If we have updates and it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it
if (_update && _update->is_clustering_row()) {
generate_update(std::move(*_update).as_clustering_row(), { });
if (_update) {
if (_update->is_clustering_row()) {
generate_update(std::move(*_update).as_clustering_row(), { });
}
return advance_updates();
}

View File

@@ -893,6 +893,41 @@ SEASTAR_TEST_CASE(test_static_table) {
});
}
SEASTAR_TEST_CASE(test_static_data) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("create table ab ( a int, b int , c int static , primary key(a,b)) with clustering order by (b asc);").get();
e.execute_cql("create materialized view ba as select a ,b from ab "
"where a is not null and b is not null primary key (b,a) with clustering order by (b asc);").get();
e.execute_cql("insert into ab (a, b) values (1, 2);").get();
auto msg = e.execute_cql("select a, b from ab where a = 1;").get0();
assert_that(msg).is_rows()
.with_size(1)
.with_row({ {int32_type->decompose(1)}, {int32_type->decompose(2)} });
eventually([&] {
auto msg = e.execute_cql("select a, b from ba where b = 2;").get0();
assert_that(msg).is_rows()
.with_size(1)
.with_row({ {int32_type->decompose(1)}, {int32_type->decompose(2)} });
});
e.execute_cql("insert into ab (a , b , c) values (3, 4, 5);").get();
auto msg2 = e.execute_cql("select a, b from ab where a = 3;").get0();
assert_that(msg2).is_rows()
.with_size(1)
.with_row({ {int32_type->decompose(3)}, {int32_type->decompose(4)} });
eventually([&] {
auto msg = e.execute_cql("select a, b from ba where b = 4;").get0();
assert_that(msg).is_rows()
.with_size(1)
.with_row({ {int32_type->decompose(3)}, {int32_type->decompose(4)} });
});
});
}
SEASTAR_TEST_CASE(test_old_timestamps) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get();