utils: redesign reusable_buffer

Large contiguous buffers put large pressure on the allocator
and are a common source of reactor stalls. Therefore, Scylla avoids
their use, replacing it with fragmented buffers whenever possible.
However, the use of large contiguous buffers is impossible to avoid
when dealing with some external libraries (i.e. some compression
libraries, like LZ4).

Fortunately, calls to external libraries are synchronous, so we can
minimize the allocator impact by reusing a single buffer between calls.

An implementation of such a reusable buffer has two conflicting goals:
to allocate as rarely as possible, and to waste as little memory as
possible. The bigger the buffer, the more likely that it will be able
to handle future requests without reallocation, but also the memory
memory it ties up.

If request sizes are repetitive, the near-optimal solution is to
simply resize the buffer up to match the biggest seen request,
and never resize down.

However, if we anticipate pathologically large requests, which are
caused by an application/configuration bug and are never repeated
again after they are fixed, we might want to resize down after such
pathological requests stop, so that the memory they took isn't tied
up forever.

The current implementation of reusable buffers handles this by
resizing down to 0 every 100'000 requests.

This patch attempts to solve a few shortcomings of the current
implementation.
1. Resizing to 0 is too aggressive. During regular operation, we will
surely need to resize it back to the previous size again. If something
is allocated in the hole left by the old buffer, this might cause
a stall. We prefer to resize down only after pathological requests.
2. When resizing, the current implementation allocates the new buffer
before freeing the old one. This increases allocator pressure for no
reason.
3. When resizing up, the buffer is resized to exactly the requested
size. That is, if the current size is 1MiB, following requests
of 1MiB+1B and 1MiB+2B will both cause a resize.
It's preferable to limit the set of possible sizes so that every
reset doesn't tend to cause multiple resizes of almost the same size.
The natural set of sizes is powers of 2, because that's what the
underlying buddy allocator uses. No waste is caused by rounding up
the allocation to a power of 2.
4. The interval of 100'000 uses is both too low and too arbitrary.
This is up for discussion, but I think that it's preferable to base
the dynamics of the buffer on time, rather than the number of uses.
It's more predictable to humans.

The implementation proposed in this patch addresses these as follows:
1. Instead of resizing down to 0, we resize to the biggest size
   seen in the last period.
   As long as at least one maximal (up to a power of 2) "normal" request
   appears each period, the buffer will never have to be resized.
2. The capacity of the buffer is always rounded up to the nearest
   power of 2.
3. The resize down period is no longer measured in number of requests
   but in real time.

Additionally, since a shared buffer in asynchronous code is quite a
footgun, some rudimentary refcounting is added to assert that only
one reference to the buffer exists at a time, and that the buffer isn't
downsized while a reference to it exists.

Fixes #13437
This commit is contained in:
Michał Chojnowski
2023-03-16 08:50:23 +01:00
parent e8fb718e4a
commit bf26a8c467
6 changed files with 441 additions and 240 deletions

View File

@@ -6,64 +6,72 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#define BOOST_TEST_MODULE reusable_buffer
#include <boost/test/unit_test.hpp>
#include "utils/reusable_buffer.hh"
#include <boost/range/algorithm/copy.hpp>
#include "test/lib/random_utils.hh"
#include "test/lib/log.hh"
BOOST_AUTO_TEST_CASE(test_get_linearized_view) {
utils::reusable_buffer buffer;
#include <boost/range/algorithm/copy.hpp>
auto test = [&buffer] (size_t n) {
#include "utils/reusable_buffer.hh"
#include <seastar/core/manual_clock.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/util/later.hh>
#include <seastar/core/coroutine.hh>
#include <bit>
using namespace seastar;
SEASTAR_TEST_CASE(test_get_linearized_view) {
auto test = [] (size_t n, utils::reusable_buffer<manual_clock>& buffer) {
testlog.info("Testing buffer size {}", n);
auto original = tests::random::get_bytes(n);
bytes_ostream bo;
bo.write(original);
auto view = buffer.get_linearized_view(bo);
BOOST_REQUIRE_EQUAL(view.size(), n);
BOOST_REQUIRE(view == original);
BOOST_REQUIRE(bo.linearize() == original);
std::vector<temporary_buffer<char>> tbufs;
bytes_view left = original;
while (!left.empty()) {
auto this_size = std::min<size_t>(left.size(), fragmented_temporary_buffer::default_fragment_size);
tbufs.emplace_back(reinterpret_cast<const char*>(left.data()), this_size);
left.remove_prefix(this_size);
{
auto bufguard = utils::reusable_buffer_guard(buffer);
auto view = bufguard.get_linearized_view(bo);
BOOST_REQUIRE_EQUAL(view.size(), n);
BOOST_REQUIRE(view == original);
BOOST_REQUIRE(bo.linearize() == original);
}
auto fbuf = fragmented_temporary_buffer(std::move(tbufs), original.size());
view = buffer.get_linearized_view(fragmented_temporary_buffer::view(fbuf));
BOOST_REQUIRE_EQUAL(view.size(), n);
BOOST_REQUIRE(view == original);
BOOST_REQUIRE(linearized(fragmented_temporary_buffer::view(fbuf)) == original);
{
std::vector<temporary_buffer<char>> tbufs;
bytes_view left = original;
while (!left.empty()) {
auto this_size = std::min<size_t>(left.size(), fragmented_temporary_buffer::default_fragment_size);
tbufs.emplace_back(reinterpret_cast<const char*>(left.data()), this_size);
left.remove_prefix(this_size);
}
auto bufguard = utils::reusable_buffer_guard(buffer);
auto fbuf = fragmented_temporary_buffer(std::move(tbufs), original.size());
auto view = bufguard.get_linearized_view(fragmented_temporary_buffer::view(fbuf));
BOOST_REQUIRE_EQUAL(view.size(), n);
BOOST_REQUIRE(view == original);
BOOST_REQUIRE(linearized(fragmented_temporary_buffer::view(fbuf)) == original);
}
};
for (auto j = 0; j < 2; j++) {
buffer.clear();
utils::reusable_buffer<manual_clock> buffer(std::chrono::milliseconds(1));
test(0);
test(1'000'000);
test(1'000);
test(100'000);
test(0, buffer);
test(1'000'000, buffer);
test(1'000, buffer);
test(100'000, buffer);
for (auto i = 0; i < 25; i++) {
test(tests::random::get_int(512 * 1024));
test(tests::random::get_int(512 * 1024), buffer);
}
}
return make_ready_future<>();
}
BOOST_AUTO_TEST_CASE(test_make_buffer) {
utils::reusable_buffer buffer;
auto test = [&buffer] (size_t maximum, size_t actual) {
SEASTAR_TEST_CASE(test_make_buffer) {
auto test = [] (size_t maximum, size_t actual, utils::reusable_buffer<manual_clock>& buffer) {
testlog.info("Testing maximum buffer size {}, actual: {} ", maximum, actual);
bytes original;
@@ -75,30 +83,71 @@ BOOST_AUTO_TEST_CASE(test_make_buffer) {
return actual;
};
auto bo = buffer.make_buffer(maximum, make_buffer_fn);
{
auto bufguard = utils::reusable_buffer_guard(buffer);
auto bo = bufguard.make_bytes_ostream(maximum, make_buffer_fn);
BOOST_REQUIRE_EQUAL(bo.size(), actual);
BOOST_REQUIRE(bo.linearize() == original);
BOOST_REQUIRE_EQUAL(bo.size(), actual);
BOOST_REQUIRE(bo.linearize() == original);
}
auto fbuf = buffer.make_fragmented_temporary_buffer(maximum, fragmented_temporary_buffer::default_fragment_size, make_buffer_fn);
auto view = fragmented_temporary_buffer::view(fbuf);
{
auto bufguard = utils::reusable_buffer_guard(buffer);
auto fbuf = bufguard.make_fragmented_temporary_buffer(maximum, make_buffer_fn);
auto view = fragmented_temporary_buffer::view(fbuf);
BOOST_REQUIRE_EQUAL(view.size_bytes(), actual);
BOOST_REQUIRE(linearized(view) == original);
BOOST_REQUIRE_EQUAL(view.size_bytes(), actual);
BOOST_REQUIRE(linearized(view) == original);
}
};
for (auto j = 0; j < 2; j++) {
buffer.clear();
utils::reusable_buffer<manual_clock> buffer(std::chrono::milliseconds(1));
test(0, 0);
test(100'000, 0);
test(200'000, 200'000);
test(400'000, 100'000);
test(0, 0, buffer);
test(100'000, 0, buffer);
test(200'000, 200'000, buffer);
test(400'000, 100'000, buffer);
for (auto i = 0; i < 25; i++) {
auto a = tests::random::get_int(512 * 1024);
auto b = tests::random::get_int(512 * 1024);
test(std::max(a, b), std::min(a, b));
test(std::max(a, b), std::min(a, b), buffer);
}
}
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_decay) {
using namespace std::chrono_literals;
utils::reusable_buffer<manual_clock> buffer(1s);
auto get_buffer = [&buffer] (size_t size) {
auto bufguard = utils::reusable_buffer_guard(buffer);
bufguard.get_temporary_buffer(size);
};
auto advance_clock = [] (manual_clock::duration d) {
manual_clock::advance(d);
return yield();
};
BOOST_REQUIRE(buffer.reallocs() == 0);
get_buffer(1'000'000);
get_buffer(1'000'001);
get_buffer(1'000'000);
get_buffer(1'000);
BOOST_REQUIRE_EQUAL(buffer.reallocs(), 1);
// It isn't strictly required from the implementation to use
// power-of-2 sizes, just sizes coarse enough to limit the number
// of allocations.
// If the implementation is modified, this assert can be freely changed.
BOOST_REQUIRE_EQUAL(buffer.size(), std::bit_ceil(size_t(1'000'001)));
co_await advance_clock(1500ms);
get_buffer(1'000);
BOOST_REQUIRE_EQUAL(buffer.reallocs(), 1);
co_await advance_clock(1000ms);
BOOST_REQUIRE_EQUAL(buffer.reallocs(), 2);
BOOST_REQUIRE_EQUAL(buffer.size(), std::bit_ceil(size_t(1'000)));
co_await advance_clock(1000ms);
BOOST_REQUIRE_EQUAL(buffer.reallocs(), 3);
BOOST_REQUIRE_EQUAL(buffer.size(), 0);
}

View File

@@ -10,7 +10,6 @@
#include "server.hh"
#include "utils/utf8.hh"
#include "utils/reusable_buffer.hh"
#include "utils/fragmented_temporary_buffer.hh"
namespace cql_transport {

View File

@@ -9,7 +9,6 @@
#pragma once
#include "server.hh"
#include "utils/reusable_buffer.hh"
namespace cql_transport {

View File

@@ -62,6 +62,7 @@
#include "transport/cql_protocol_extension.hh"
#include "utils/bit_cast.hh"
#include "db/config.hh"
#include "utils/reusable_buffer.hh"
template<typename T = void>
using coordinator_result = exceptions::coordinator_result<T>;
@@ -790,71 +791,67 @@ future<> cql_server::connection::process_request() {
});
}
namespace compression_buffers {
// Reusable buffers for compression and decompression. Cleared every
// clear_buffers_trigger uses.
static constexpr size_t clear_buffers_trigger = 100'000;
static thread_local size_t buffer_use_count = 0;
static thread_local utils::reusable_buffer input_buffer;
static thread_local utils::reusable_buffer output_buffer;
void on_compression_buffer_use() {
if (++buffer_use_count == clear_buffers_trigger) {
input_buffer.clear();
output_buffer.clear();
buffer_use_count = 0;
}
// Contiguous buffers for use with compression primitives.
// Be careful when dealing with them, because they are shared and
// can be modified on preemption points.
// See the comments on reusable_buffer for a discussion.
static utils::reusable_buffer_guard input_buffer_guard() {
using namespace std::chrono_literals;
static thread_local utils::reusable_buffer<lowres_clock> buf(600s);
return buf;
}
static utils::reusable_buffer_guard output_buffer_guard() {
using namespace std::chrono_literals;
static thread_local utils::reusable_buffer<lowres_clock> buf(600s);
return buf;
}
future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_frame(size_t length, uint8_t flags)
{
using namespace compression_buffers;
if (flags & cql_frame_flags::compression) {
if (_compression == cql_compression::lz4) {
if (length < 4) {
throw std::runtime_error(fmt::format("CQL frame truncated: expected to have at least 4 bytes, got {}", length));
}
return _buffer_reader.read_exactly(_read_buf, length).then([] (fragmented_temporary_buffer buf) {
auto linearization_buffer = bytes_ostream();
int32_t uncomp_len = request_reader(buf.get_istream(), linearization_buffer).read_int();
auto input_buffer = input_buffer_guard();
auto output_buffer = output_buffer_guard();
auto v = fragmented_temporary_buffer::view(buf);
int32_t uncomp_len = read_simple<int32_t>(v);
if (uncomp_len < 0) {
throw std::runtime_error("CQL frame uncompressed length is negative: " + std::to_string(uncomp_len));
}
buf.remove_prefix(4);
auto in = input_buffer.get_linearized_view(fragmented_temporary_buffer::view(buf));
auto uncomp = output_buffer.make_fragmented_temporary_buffer(uncomp_len, fragmented_temporary_buffer::default_fragment_size, [&] (bytes_mutable_view out) {
auto ret = LZ4_decompress_safe(reinterpret_cast<const char*>(in.data()), reinterpret_cast<char*>(out.data()),
in.size(), out.size());
auto in = input_buffer.get_linearized_view(v);
return output_buffer.make_fragmented_temporary_buffer(uncomp_len, [&in] (bytes_mutable_view out) {
auto ret = LZ4_decompress_safe(reinterpret_cast<const char*>(in.data()), reinterpret_cast<char*>(out.data()), in.size(), out.size());
if (ret < 0) {
throw std::runtime_error("CQL frame LZ4 uncompression failure");
}
if (size_t(ret) != out.size()) { // ret is known to be positive here
if (static_cast<size_t>(ret) != out.size()) { // ret is known to be positive here
throw std::runtime_error("Malformed CQL frame - provided uncompressed size different than real uncompressed size");
}
return static_cast<size_t>(ret);
});
on_compression_buffer_use();
return uncomp;
});
} else if (_compression == cql_compression::snappy) {
return _buffer_reader.read_exactly(_read_buf, length).then([] (fragmented_temporary_buffer buf) {
auto input_buffer = input_buffer_guard();
auto output_buffer = output_buffer_guard();
auto in = input_buffer.get_linearized_view(fragmented_temporary_buffer::view(buf));
size_t uncomp_len;
if (snappy_uncompressed_length(reinterpret_cast<const char*>(in.data()), in.size(), &uncomp_len) != SNAPPY_OK) {
throw std::runtime_error("CQL frame Snappy uncompressed size is unknown");
}
auto uncomp = output_buffer.make_fragmented_temporary_buffer(uncomp_len, fragmented_temporary_buffer::default_fragment_size, [&] (bytes_mutable_view out) {
return output_buffer.make_fragmented_temporary_buffer(uncomp_len, [&in] (bytes_mutable_view out) {
size_t output_len = out.size();
if (snappy_uncompress(reinterpret_cast<const char*>(in.data()), in.size(), reinterpret_cast<char*>(out.data()), &output_len) != SNAPPY_OK) {
throw std::runtime_error("CQL frame Snappy uncompression failure");
}
if (output_len != out.size()) {
throw std::runtime_error("Malformed CQL frame - provided uncompressed size different than real uncompressed size");
}
return output_len;
});
on_compression_buffer_use();
return uncomp;
});
} else {
throw exceptions::protocol_exception(format("Unknown compression algorithm"));
@@ -1597,43 +1594,38 @@ void cql_server::response::compress(cql_compression compression)
void cql_server::response::compress_lz4()
{
using namespace compression_buffers;
auto view = input_buffer.get_linearized_view(_body);
const char* input = reinterpret_cast<const char*>(view.data());
size_t input_len = view.size();
auto input_buffer = input_buffer_guard();
auto output_buffer = output_buffer_guard();
size_t output_len = LZ4_COMPRESSBOUND(input_len) + 4;
_body = output_buffer.make_buffer(output_len, [&] (bytes_mutable_view output_view) {
char* output = reinterpret_cast<char*>(output_view.data());
output[0] = (input_len >> 24) & 0xFF;
output[1] = (input_len >> 16) & 0xFF;
output[2] = (input_len >> 8) & 0xFF;
output[3] = input_len & 0xFF;
auto ret = LZ4_compress_default(input, output + 4, input_len, LZ4_compressBound(input_len));
auto in = input_buffer.get_linearized_view(_body);
size_t output_len = LZ4_COMPRESSBOUND(in.size()) + 4;
_body = output_buffer.make_bytes_ostream(output_len, [&in] (bytes_mutable_view out) {
out.data()[0] = (in.size() >> 24) & 0xFF;
out.data()[1] = (in.size() >> 16) & 0xFF;
out.data()[2] = (in.size() >> 8) & 0xFF;
out.data()[3] = in.size() & 0xFF;
auto ret = LZ4_compress_default(reinterpret_cast<const char*>(in.data()), reinterpret_cast<char*>(out.data() + 4), in.size(), out.size() - 4);
if (ret == 0) {
throw std::runtime_error("CQL frame LZ4 compression failure");
}
return ret + 4;
return static_cast<size_t>(ret) + 4;
});
on_compression_buffer_use();
}
void cql_server::response::compress_snappy()
{
using namespace compression_buffers;
auto view = input_buffer.get_linearized_view(_body);
const char* input = reinterpret_cast<const char*>(view.data());
size_t input_len = view.size();
auto input_buffer = input_buffer_guard();
auto output_buffer = output_buffer_guard();
size_t output_len = snappy_max_compressed_length(input_len);
_body = output_buffer.make_buffer(output_len, [&] (bytes_mutable_view output_view) {
char* output = reinterpret_cast<char*>(output_view.data());
if (snappy_compress(input, input_len, output, &output_len) != SNAPPY_OK) {
auto in = input_buffer.get_linearized_view(_body);
size_t output_len = snappy_max_compressed_length(in.size());
_body = output_buffer.make_bytes_ostream(output_len, [&in] (bytes_mutable_view out) {
size_t actual_len = out.size();
if (snappy_compress(reinterpret_cast<const char*>(in.data()), in.size(), reinterpret_cast<char*>(out.data()), &actual_len) != SNAPPY_OK) {
throw std::runtime_error("CQL frame Snappy compression failure");
}
return output_len;
return actual_len;
});
on_compression_buffer_use();
}
void cql_server::response::serialize(const event::schema_change& event, uint8_t version)

View File

@@ -38,6 +38,16 @@ public:
: _fragments(std::move(fragments)), _size_bytes(size_bytes)
{ }
fragmented_temporary_buffer(const char* str, size_t size)
{
*this = allocate_to_fit(size);
size_t pos = 0;
for (auto& frag : _fragments) {
std::memcpy(frag.get_write(), str + pos, frag.size());
pos += frag.size();
}
}
explicit operator view() const noexcept;
istream get_istream() const noexcept;

View File

@@ -8,161 +8,313 @@
#pragma once
#include <seastar/core/memory.hh>
#include "bytes.hh"
#include "bytes_ostream.hh"
#include "utils/fragmented_temporary_buffer.hh"
#include <boost/range/algorithm/for_each.hpp>
#include <seastar/core/timer.hh>
#include <seastar/core/memory.hh>
#include <chrono>
#include <bit>
namespace utils {
/// A reusable buffer, for temporary linearisation of bytes_ostream
///
/// This class provides helpers for temporary linearisation of bytes_ostream.
/// Both cases when bytes_ostream holds input data as well as when it is
/// an output are supported. Copies are avoided if the buffers are not
/// fragmented.
///
/// Example of reading a possibly fragmented bytes_ostream:
///
/// ```c++
/// thread_local reusable_buffer rb;
/// bytes_ostream potentially_fragmented_buffer;
/// bytes_view view = rb.get_linearized_view(potentially_fragmented_buffer);
/// // view is a view of a buffer that holds the same data as potentially_fragmented buffer
/// ```
///
/// Example of writing to a bytes_ostream:
///
/// ```c++
/// thread_local reusable_buffer rb;
/// size_t maximum_size = compute_maximum_size();
/// bytes_ostream destination = rb.make_buffer([&] (bytes_mutable_view view) {
/// // view is a mutable view of some buffer which content will be in
/// // the bytes_ostream returned by make_buffer.
/// return actual_length_of_the_buffer;
/// });
/// ```
class reusable_buffer { // extract to utils
// FIXME: We should start using std::byte for these things.
std::unique_ptr<int8_t[]> _buffer;
size_t _size = 0;
private:
bytes_mutable_view reserve(size_t n) {
if (_size < n) {
// Reusable buffers are expected to be used when large contiguous
// allocations are unavoidable. There is not much point in warning
// about them since there isn't much that can be done.
seastar::memory::scoped_large_allocation_warning_disable g;
// std::make_unique would zero-initialise the buffer which is
// just a waste of cycles. We can, however, summon an ancient
// entity from the elder days of C++ to help us.
_buffer.reset(new int8_t[n]);
_size = n;
}
return bytes_mutable_view(_buffer.get(), n);
}
// Users of reusable_buffer don't need the templated parts,
// so the non-templated implementation parts are extracted
// to a separate class.
// This should prevent some unnecessary template instantiations.
class reusable_buffer_impl {
protected:
friend class reusable_buffer_guard;
// Max size observed since the last decay(), rounded up.
// Currently we round to the smallest power of 2 greater or equal to
// to the observed size.
size_t _high_watermark = 0;
std::unique_ptr<bytes::value_type[]> _buf; // The underlying contiguous buffer.
size_t _buf_size = 0;
size_t _refcount = 0; // Used purely for anti-misuse checks.
size_t _reallocs = 0; // Number of size changes.
public:
/// Returns a linearised view of the provided bytes_ostream
///
/// This function returns a linearised view of the data stored in the
/// provided bytes_ostream. If it is fragmented the linearisation uses
/// a buffer owned by this.
/// The returned view remains valid as long as the original bytes_ostream
/// is not modifed and no other member functions of this are called.
bytes_view get_linearized_view(const bytes_ostream& data) {
if (data.is_linearized()) {
return data.view();
}
auto mutable_view = reserve(data.size());
auto dst = mutable_view.begin();
for (bytes_view fragment : data) {
dst = std::copy(fragment.begin(), fragment.end(), dst);
}
return bytes_view(mutable_view);
size_t size() const noexcept { return _buf_size; }
size_t reallocs() const noexcept { return _reallocs; }
protected:
// The guard keeps a reference to the buffer, so it must have a stable address.
reusable_buffer_impl(const reusable_buffer_impl&) = delete;
reusable_buffer_impl& operator=(const reusable_buffer_impl&) = delete;
reusable_buffer_impl() = default;
~reusable_buffer_impl() {
assert(_refcount == 0);
}
void resize(size_t new_size) & {
// Reusable buffers are expected to be used when large contiguous
// allocations are unavoidable. There is no point in warning
// about them since there isn't much that can be done.
seastar::memory::scoped_large_allocation_warning_disable g;
bytes_view get_linearized_view(const fragmented_temporary_buffer::view& data) {
if (data.empty()) {
return { };
} else if (std::next(data.begin()) == data.end()) {
return *data.begin();
}
auto mutable_view = reserve(data.size_bytes());
auto dst = mutable_view.begin();
using boost::range::for_each;
for_each(data, [&] (bytes_view fragment) {
dst = std::copy(fragment.begin(), fragment.end(), dst);
});
return bytes_view(mutable_view);
// Clear before shrinking so that the old buffer
// is freed before allocating the new buffer.
// This lesses the pressure on the allocator and should guarantee
// a success if the new size is smaller than the old size.
_buf = nullptr;
_buf_size = 0;
_reallocs += 1;
_buf.reset(new bytes::value_type[new_size]);
_buf_size = new_size;
}
/// Creates a bytes_ostream
///
/// make_buffer calls provided function object and gives it a mutable
/// view of some buffer. Data written to that view will be in the returned
/// bytes_ostream. The function object is expected to return the actual
/// size of the buffer (less than or equal the previously specified maximum
/// length).
// The below methods can be considered "public", but they are only accessible
// through their guard wrappers, to make misuse harder.
bytes_mutable_view get_temporary_buffer(size_t size) & {
_high_watermark = std::max(_high_watermark, std::bit_ceil(size));
if (_high_watermark > _buf_size) {
resize(_high_watermark);
}
return {_buf.get(), size};
}
// The helpers below only interact with the rest of the class through
// get_temporary_buffer().
// They could as well be free functions.
// Returns a linearized view onto the provided data.
// If the input view is already linearized, it is returned as-is.
// Otherwise the contents are copied to the reusable buffer
// and a view into the buffer is returned.
bytes_view get_linearized_view(fragmented_temporary_buffer::view ftb) & {
if (ftb.current_fragment().size() == ftb.size_bytes()) {
return {ftb.current_fragment().data(), ftb.size_bytes()};
}
const auto out = get_temporary_buffer(ftb.size_bytes()).data();
auto dst = out;
for (bytes_view fragment : fragmented_temporary_buffer::view(ftb)) {
dst = std::copy(fragment.begin(), fragment.end(), dst);
}
return {out, ftb.size_bytes()};
}
// Returns a linearized view onto the provided data.
// If the input view is already linearized, it is returned as-is.
// Otherwise the contents are copied to the reusable buffer
// and a view into the buffer is returned.
bytes_view get_linearized_view(bytes_ostream& bo) & {
if (bo.is_linearized()) {
return bo.view();
}
const auto out = get_temporary_buffer(bo.size()).data();
auto dst = out;
for (bytes_view fragment : bo) {
dst = std::copy(fragment.begin(), fragment.end(), dst);
}
return {out, bo.size()};
}
// Provides a contiguous buffer of size `maximum_length` to `fn`.
// `fn` writes to the buffer and returns the number of bytes written.
// A fragmented buffer containing the data written by `fn` is returned.
//
// If the data fits into a single fragment, `fn` is ran directly on
// the only fragment of a newly allocated fragmented_buffer.
// Otherwise the contiguous buffer for `fn` is allocated from the reusable
// buffer, and its contents are copied to a new fragmented buffer after `fn`
// returns.
// This way a copy is avoided in the small case.
template<typename Function>
requires requires(Function fn, bytes_mutable_view view) {
{ fn(view) } -> std::convertible_to<size_t>;
}
bytes_ostream make_buffer(size_t maximum_length, Function&& fn) {
requires std::is_invocable_r_v<size_t, Function, bytes_mutable_view>
bytes_ostream make_bytes_ostream(size_t maximum_length, Function&& fn) & {
bytes_ostream output;
bytes_mutable_view view = [&] {
if (maximum_length && maximum_length <= bytes_ostream::max_chunk_size()) {
auto ptr = output.write_place_holder(maximum_length);
return bytes_mutable_view(ptr, maximum_length);
}
return reserve(maximum_length);
}();
size_t actual_length = fn(view);
if (output.empty()) {
output.write(bytes_view(view.data(), actual_length));
} else {
if (maximum_length && maximum_length <= bytes_ostream::max_chunk_size()) {
auto ptr = output.write_place_holder(maximum_length);
size_t actual_length = fn(bytes_mutable_view(ptr, maximum_length));
output.remove_suffix(output.size() - actual_length);
} else {
auto view = get_temporary_buffer(maximum_length);
size_t actual_length = fn(view);
output.write(bytes_view(view.data(), actual_length));
}
return output;
}
// Provides a contiguous buffer of size `maximum_length` to `fn`.
// `fn` writes to the buffer and returns the number of bytes written.
// A fragmented buffer containing the data written by `fn` is returned.
//
// If the data fits into a single fragment, `fn` is ran directly on
// the only fragment of a newly allocated fragmented_buffer.
// Otherwise the contiguous buffer for `fn` is allocated from the reusable
// buffer, and its contents are copied to a new fragmented buffer after `fn`
// returns.
// This way a copy is avoided in the small case.
template<typename Function>
requires requires(Function fn, bytes_mutable_view view) {
{ fn(view) } -> std::same_as<size_t>;
}
fragmented_temporary_buffer make_fragmented_temporary_buffer(size_t maximum_length, size_t maximum_fragment_size, Function&& fn) {
std::vector<temporary_buffer<char>> fragments;
bytes_mutable_view view = [&] {
if (maximum_length <= maximum_fragment_size) {
fragments.emplace_back(maximum_length);
return bytes_mutable_view(reinterpret_cast<bytes::pointer>(fragments.back().get_write()), maximum_length);
}
return reserve(maximum_length);
}();
size_t actual_length = fn(view);
if (fragments.empty()) {
auto left = actual_length;
auto src = reinterpret_cast<const bytes::value_type*>(_buffer.get());
while (left) {
auto this_length = std::min(left, maximum_fragment_size);
fragments.emplace_back(reinterpret_cast<const char*>(src), this_length);
src += this_length;
left -= this_length;
}
requires std::is_invocable_r_v<size_t, Function, bytes_mutable_view>
fragmented_temporary_buffer make_fragmented_temporary_buffer(size_t maximum_length, Function&& fn) & {
if (maximum_length <= fragmented_temporary_buffer::default_fragment_size) {
seastar::temporary_buffer<char> buf(maximum_length);
auto view = bytes_mutable_view(reinterpret_cast<bytes::value_type*>(buf.get_write()), buf.size());
size_t actual_length = fn(view);
buf.trim(actual_length);
std::vector<seastar::temporary_buffer<char>> chunks;
chunks.push_back(std::move(buf));
return fragmented_temporary_buffer(std::move(chunks), actual_length);
} else {
fragments.back().trim(actual_length);
auto view = get_temporary_buffer(maximum_length);
size_t actual_length = fn(view);
return fragmented_temporary_buffer(reinterpret_cast<const char*>(view.data()), actual_length);
}
return fragmented_temporary_buffer(std::move(fragments), actual_length);
}
/// Releases all allocated memory.
void clear() noexcept {
_buffer.reset();
_size = 0;
}
};
}
/* Sometimes (e.g. for compression), we need a big contiguous buffer
* of a given size.
*
* Big buffers are a problem for the allocator, so if such a buffer
* is needed regularly, we want to reuse it. In the optimum, we
* only want to use one buffer per concurrent task (if all uses
* of the buffer are synchronous, one buffer per shard is enough),
* and only resize the buffer when it's too small for the current
* request.
*
* At the same time we might not want to keep the buffer around forever.
* A pathological request might cause the buffer to grow to some
* unreasonable size. After the source of the pathology (e.g. bad compression
* chunk size) is fixed, we want the buffer to free the capacity that will
* most likely not be used again.
*
* This class provides a buffer which balances both tasks by periodically
* shrinking its size to the largest recently seen size.
* The buffer always has a power-of-2 size, to minimize the number of
* allocations for small size changes.
*
* Even though the views returned by the buffer's methods have exactly the same
* requested size, the entire underlying capacity (of size size()) is safe to use.
* The returned views are always a prefix of the underlying capacity.
*
* Under a stable workload, the buffer's size will be stable.
* If a pathological request comes, the buffer will grow to accomodate it.
* 1-2 periods after the pathology ceases, the buffer will shrink back
* to its steady-state size.
*
* Be very careful with this buffer.
* Its content might disappear during a preemption point. This can be
* checked by comparing reallocs() before and after the preemption point.
* The content might be also easily invalidated by an unrelated future
* sharing the same buffer, if you are not careful.
*
* To make this buffer slightly harder to misuse, it's only accessible
* by reusable_buffer_guard.
* There can only exist one guard at a time.
* The guard can be used to obtain a view at most once.
*/
template <typename Clock>
class reusable_buffer : public reusable_buffer_impl {
public:
using period_type = typename Clock::duration;
private:
seastar::timer<Clock> _decay_timer;
period_type _decay_period;
void decay() & {
assert(_refcount == 0);
if (_high_watermark <= _buf_size / 16) {
// We shrink when the size falls at least by four power-of-2
// notches, instead of just one notch. This adds hysteresis:
// it prevents small oscillations from inducing shrink/grow cycles.
// With the factor of 16 above, only outliers at least 8x bigger than
// the otherwise stable size might cause reallocations.
//
// In other words, we "define" a pathological request as one that is
// at least 8x bigger than the stable size, where the stable size
// is the max of all sizes seen within a decay period.
resize(_high_watermark);
}
_high_watermark = 0;
}
public:
reusable_buffer(period_type period)
: _decay_period(period)
{
_decay_timer.set_callback([this] {decay();});
_decay_timer.arm_periodic(_decay_period);
}
};
/* Exists only to assert that there exists at most one reference to the
* reusable_buffer, to hopefully make it less of a footgun.
*
* The reference/use counts exist only for assert purposes.
* They don't influence the program otherwise.
*
* Never keep the guard across preemption points.
*
* Don't let views obtained through the guard outlive the guard,
* that defeats its purpose and is begging for trouble.
*
* The guard only accesses _refcount and its "public" methods.
* It doesn't mess with its internals.
*/
class reusable_buffer_guard {
private:
reusable_buffer_impl& _buf;
bool used = false;
private:
void mark_used() {
assert(!used);
used = true;
}
public:
reusable_buffer_guard(const reusable_buffer_guard&) = delete;
reusable_buffer_guard& operator=(const reusable_buffer_guard&) = delete;
reusable_buffer_guard(reusable_buffer_impl& _buf)
: _buf(_buf)
{
assert(_buf._refcount == 0);
_buf._refcount += 1;
}
~reusable_buffer_guard() {
_buf._refcount -= 1;
}
// The result mustn't outlive `this`.
// No method of `this` may be called again.
bytes_mutable_view get_temporary_buffer(size_t size) & {
mark_used();
return _buf.get_temporary_buffer(size);
}
// The result mustn't outlive `this`.
// No method of `this` may be called again.
bytes_view get_linearized_view(fragmented_temporary_buffer::view ftb) & {
mark_used();
return _buf.get_linearized_view(std::move(ftb));
}
// The result mustn't outlive `this`.
// No method of `this` may be called again.
bytes_view get_linearized_view(bytes_ostream& bo) & {
mark_used();
return _buf.get_linearized_view(bo);
}
// The result mustn't outlive `this`.
// No method of `this` may be called again.
template<typename Function>
requires std::is_invocable_r_v<size_t, Function, bytes_mutable_view>
bytes_ostream make_bytes_ostream(size_t maximum_length, Function&& fn) & {
mark_used();
return _buf.make_bytes_ostream(maximum_length, std::forward<Function>(fn));
}
// The result mustn't outlive `this`.
// No method of `this` may be called again.
template<typename Function>
requires std::is_invocable_r_v<size_t, Function, bytes_mutable_view>
fragmented_temporary_buffer make_fragmented_temporary_buffer(size_t maximum_length, Function&& fn) & {
mark_used();
return _buf.make_fragmented_temporary_buffer(maximum_length, std::forward<Function>(fn));
}
};
} // namespace utils