Materialized views: limit size of row batching during bulk view building
The bulk materialized-view building processes (when adding a materialized view to a table with existing data) currently reads the base table in batches of 128 (view_builder::batch_size) rows. This is clearly better than reading entire partitions (which may be huge), but still, 128 rows may grow pretty large when we have rows with large strings or blobs, and there is no real reason to buffer 128 rows when they are large. Instead, when the rows we read so far exceed some size threshold (in this patch, 1MB), we can operate on them immediately instead of waiting for 128. As a side-effect, this patch also solves another bug: At worst case, all the base rows of one batch may be written into one output view partition, in one mutation. But there is a hard limit on the size of one mutation (commitlog_segment_size_in_mb, by default 32MB), so we cannot allow the batch size to exceed this limit. By not batching further after 1MB, we avoid reaching this limit when individual rows do not reach it but 128 of them did. Fixes #4213. This patch also includes a unit test reproducing #4213, and demonstrating that it is now solved. Signed-off-by: Nadav Har'El <nyh@scylladb.com> Message-Id: <20190214093424.7172-1-nyh@scylladb.com>
This commit is contained in:
@@ -1472,7 +1472,16 @@ private:
|
||||
built_views _built_views;
|
||||
std::vector<view_ptr> _views_to_build;
|
||||
std::deque<mutation_fragment> _fragments;
|
||||
|
||||
// The compact_for_query<> that feeds this consumer is already configured
|
||||
// to feed us up to view_builder::batchsize (128) rows and not an entire
|
||||
// partition. Still, if rows contain large blobs, saving 128 of them in
|
||||
// _fragments may be too much. So we want to track _fragment's memory
|
||||
// usage, and flush the _fragments if it has grown too large.
|
||||
// Additionally, limiting _fragment's size also solves issue #4213:
|
||||
// A single view mutation can be as large as the size of the base rows
|
||||
// used to build it, and we cannot allow its serialized size to grow
|
||||
// beyond our limit on mutation size (by default 32 MB).
|
||||
size_t _fragments_memory_usage = 0;
|
||||
public:
|
||||
consumer(view_builder& builder, build_step& step)
|
||||
: _builder(builder)
|
||||
@@ -1535,7 +1544,15 @@ public:
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
|
||||
_fragments_memory_usage += cr.memory_usage(*_step.base->schema());
|
||||
_fragments.push_back(std::move(cr));
|
||||
if (_fragments_memory_usage > 1024*1024) {
|
||||
// Although we have not yet completed the batch of base rows that
|
||||
// compact_for_query<> planned for us (view_builder::batchsize),
|
||||
// we've still collected enough rows to reach sizeable memory use,
|
||||
// so let's flush these rows now.
|
||||
flush_fragments();
|
||||
}
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
@@ -1543,7 +1560,7 @@ public:
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
stop_iteration consume_end_of_partition() {
|
||||
void flush_fragments() {
|
||||
_builder._as.check();
|
||||
if (!_fragments.empty()) {
|
||||
_fragments.push_front(partition_start(_step.current_key, tombstone()));
|
||||
@@ -1552,7 +1569,12 @@ public:
|
||||
_step.current_token(),
|
||||
make_flat_mutation_reader_from_fragments(_step.base->schema(), std::move(_fragments))).get();
|
||||
_fragments.clear();
|
||||
_fragments_memory_usage = 0;
|
||||
}
|
||||
}
|
||||
|
||||
stop_iteration consume_end_of_partition() {
|
||||
flush_fragments();
|
||||
return stop_iteration(_step.build_status.empty());
|
||||
}
|
||||
|
||||
|
||||
@@ -81,6 +81,42 @@ SEASTAR_TEST_CASE(test_builder_with_large_partition) {
|
||||
});
|
||||
}
|
||||
|
||||
// This test reproduces issue #4213. We have a large base partition with
|
||||
// many rows, and the view has the *same* partition key as the base, so all
|
||||
// the generated view rows will go to the same view partition. The view
|
||||
// builder used to batch up to 128 (view_builder::batch_size) of these view
|
||||
// rows into one view mutation, and if the individual rows are big (i.e.,
|
||||
// contain very long strings or blobs), this 128-row mutation can be too
|
||||
// large to be applied because of our limitation on commit-log segment size
|
||||
// (commitlog_segment_size_in_mb, by default 32 MB). When applying the
|
||||
// mutation fails, view building retries indefinitely and this test hung.
|
||||
SEASTAR_TEST_CASE(test_builder_with_large_partition_2) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
const int nrows = 64; // meant to be lower than view_builder::batch_size
|
||||
const int target_size = 33*1024*1024; // meant to be higher than commitlog_segment_size_in_mb
|
||||
e.execute_cql("create table cf (p int, c int, s ascii, primary key (p, c))").get();
|
||||
const sstring longstring = sstring(target_size / nrows, 'x');
|
||||
for (auto i = 0; i < nrows; ++i) {
|
||||
e.execute_cql(format("insert into cf (p, c, s) values (0, {:d}, '{}')", i, longstring)).get();
|
||||
}
|
||||
|
||||
auto f = e.local_view_builder().wait_until_built("ks", "vcf");
|
||||
e.execute_cql("create materialized view vcf as select * from cf "
|
||||
"where p is not null and c is not null "
|
||||
"primary key (p, c)").get();
|
||||
|
||||
f.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf"));
|
||||
|
||||
auto msg = e.execute_cql("select count(*) from vcf").get0();
|
||||
assert_that(msg).is_rows().with_size(1);
|
||||
assert_that(msg).is_rows().with_rows({{{long_type->decompose(long(nrows))}}});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
SEASTAR_TEST_CASE(test_builder_with_multiple_partitions) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get();
|
||||
|
||||
Reference in New Issue
Block a user