diff --git a/db/view/view.cc b/db/view/view.cc index f9e1ee2c26..f70fa1a2a3 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1472,7 +1472,16 @@ private: built_views _built_views; std::vector _views_to_build; std::deque _fragments; - + // The compact_for_query<> that feeds this consumer is already configured + // to feed us up to view_builder::batchsize (128) rows and not an entire + // partition. Still, if rows contain large blobs, saving 128 of them in + // _fragments may be too much. So we want to track _fragment's memory + // usage, and flush the _fragments if it has grown too large. + // Additionally, limiting _fragment's size also solves issue #4213: + // A single view mutation can be as large as the size of the base rows + // used to build it, and we cannot allow its serialized size to grow + // beyond our limit on mutation size (by default 32 MB). + size_t _fragments_memory_usage = 0; public: consumer(view_builder& builder, build_step& step) : _builder(builder) @@ -1535,7 +1544,15 @@ public: return stop_iteration::yes; } + _fragments_memory_usage += cr.memory_usage(*_step.base->schema()); _fragments.push_back(std::move(cr)); + if (_fragments_memory_usage > 1024*1024) { + // Although we have not yet completed the batch of base rows that + // compact_for_query<> planned for us (view_builder::batchsize), + // we've still collected enough rows to reach sizeable memory use, + // so let's flush these rows now. + flush_fragments(); + } return stop_iteration::no; } @@ -1543,7 +1560,7 @@ public: return stop_iteration::no; } - stop_iteration consume_end_of_partition() { + void flush_fragments() { _builder._as.check(); if (!_fragments.empty()) { _fragments.push_front(partition_start(_step.current_key, tombstone())); @@ -1552,7 +1569,12 @@ public: _step.current_token(), make_flat_mutation_reader_from_fragments(_step.base->schema(), std::move(_fragments))).get(); _fragments.clear(); + _fragments_memory_usage = 0; } + } + + stop_iteration consume_end_of_partition() { + flush_fragments(); return stop_iteration(_step.build_status.empty()); } diff --git a/tests/view_build_test.cc b/tests/view_build_test.cc index 480329a397..a959d56e45 100644 --- a/tests/view_build_test.cc +++ b/tests/view_build_test.cc @@ -81,6 +81,42 @@ SEASTAR_TEST_CASE(test_builder_with_large_partition) { }); } +// This test reproduces issue #4213. We have a large base partition with +// many rows, and the view has the *same* partition key as the base, so all +// the generated view rows will go to the same view partition. The view +// builder used to batch up to 128 (view_builder::batch_size) of these view +// rows into one view mutation, and if the individual rows are big (i.e., +// contain very long strings or blobs), this 128-row mutation can be too +// large to be applied because of our limitation on commit-log segment size +// (commitlog_segment_size_in_mb, by default 32 MB). When applying the +// mutation fails, view building retries indefinitely and this test hung. +SEASTAR_TEST_CASE(test_builder_with_large_partition_2) { + return do_with_cql_env_thread([] (cql_test_env& e) { + const int nrows = 64; // meant to be lower than view_builder::batch_size + const int target_size = 33*1024*1024; // meant to be higher than commitlog_segment_size_in_mb + e.execute_cql("create table cf (p int, c int, s ascii, primary key (p, c))").get(); + const sstring longstring = sstring(target_size / nrows, 'x'); + for (auto i = 0; i < nrows; ++i) { + e.execute_cql(format("insert into cf (p, c, s) values (0, {:d}, '{}')", i, longstring)).get(); + } + + auto f = e.local_view_builder().wait_until_built("ks", "vcf"); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null " + "primary key (p, c)").get(); + + f.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 1); + BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf")); + + auto msg = e.execute_cql("select count(*) from vcf").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(long(nrows))}}}); + }); +} + + SEASTAR_TEST_CASE(test_builder_with_multiple_partitions) { return do_with_cql_env_thread([] (cql_test_env& e) { e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get();