zstd: share buffers between compressor instances

The zstd implementation of `compressor` has a separate decompression and
compression context per instance. This is unreasonably wasteful. One
decompression buffer and one compression buffer *per shard* is enough.

The waste is significant. There might exist thousands of SSTable readers, each
containing its own instance of `compressor` with several hundred KiB worth of
unneeded buffers. This adds up to gigabytes of wasted memory and gigapascals
of allocator pressure.

This patch modifies the implementation of zstd_processor so that all its
instances on the shard share their contexts.

Fixes #11733
This commit is contained in:
Michał Chojnowski
2023-03-24 22:52:11 +01:00
parent bf26a8c467
commit 16dd93cb7e

78
zstd.cc
View File

@@ -15,22 +15,62 @@
#include "compress.hh"
#include "utils/class_registrator.hh"
#include "utils/reusable_buffer.hh"
#include <concepts>
static const sstring COMPRESSION_LEVEL = "compression_level";
static const sstring COMPRESSOR_NAME = compressor::namespace_prefix + "ZstdCompressor";
static const size_t DCTX_SIZE = ZSTD_estimateDCtxSize();
class zstd_processor : public compressor {
int _compression_level = 3;
size_t _cctx_size;
// Manages memory for the compression context.
std::unique_ptr<char[], free_deleter> _cctx_raw;
// Compression context. Observer of _cctx_raw.
ZSTD_CCtx* _cctx;
static auto with_dctx(std::invocable<ZSTD_DCtx*> auto f) {
// The decompression context has a fixed size of ~128 KiB,
// so we don't bother ever resizing it the way we do with
// the compression context.
static thread_local std::unique_ptr<char[]> buf = std::invoke([&] {
auto ptr = std::unique_ptr<char[]>(new char[DCTX_SIZE]);
auto dctx = ZSTD_initStaticDCtx(ptr.get(), DCTX_SIZE);
if (!dctx) {
// Barring a bug, this should never happen.
throw std::runtime_error("Unable to initialize ZSTD decompression context");
}
return ptr;
});
return f(reinterpret_cast<ZSTD_DCtx*>(buf.get()));
}
static auto with_cctx(size_t cctx_size, std::invocable<ZSTD_CCtx*> auto f) {
// See the comments to reusable_buffer for a rationale of using it for compression.
static thread_local utils::reusable_buffer<lowres_clock> buf(std::chrono::seconds(600));
static thread_local size_t last_seen_reallocs = buf.reallocs();
auto guard = utils::reusable_buffer_guard(buf);
// Note that the compression context isn't initialized with a particular
// compression config, but only with a particular size. As long as
// it is big enough, we can reuse a context initialized by an
// unrelated instance of zstd_processor without reinitializing it.
//
// If the existing context isn't big enough, the reusable buffer will
// be resized by the next line, and the following `if` will notice that
// and reinitialize the context.
auto view = guard.get_temporary_buffer(cctx_size);
if (last_seen_reallocs != buf.reallocs()) {
// Either the buffer just grew because we requested a buffer bigger
// than its last capacity, or it was shrunk some time ago by a timer.
// Either way, the resize destroyed the contents of the buffer and
// we have to initialize the context anew.
auto cctx = ZSTD_initStaticCCtx(view.data(), buf.size());
if (!cctx) {
// Barring a bug, this should never happen.
throw std::runtime_error("Unable to initialize ZSTD compression context");
}
last_seen_reallocs = buf.reallocs();
}
return f(reinterpret_cast<ZSTD_CCtx*>(view.data()));
}
// Manages memory for the decompression context.
std::unique_ptr<char[], free_deleter> _dctx_raw;
// Decompression context. Observer of _dctx_raw.
ZSTD_DCtx* _dctx;
public:
zstd_processor(const opt_getter&);
@@ -74,24 +114,14 @@ zstd_processor::zstd_processor(const opt_getter& opts)
// We assume that the uncompressed input length is always <= chunk_len.
auto cparams = ZSTD_getCParams(_compression_level, chunk_len, 0);
auto cctx_size = ZSTD_estimateCCtxSize_usingCParams(cparams);
// According to the ZSTD documentation, pointer to the context buffer must be 8-bytes aligned.
_cctx_raw = allocate_aligned_buffer<char>(cctx_size, 8);
_cctx = ZSTD_initStaticCCtx(_cctx_raw.get(), cctx_size);
if (!_cctx) {
throw std::runtime_error("Unable to initialize ZSTD compression context");
}
_cctx_size = ZSTD_estimateCCtxSize_usingCParams(cparams);
auto dctx_size = ZSTD_estimateDCtxSize();
_dctx_raw = allocate_aligned_buffer<char>(dctx_size, 8);
_dctx = ZSTD_initStaticDCtx(_dctx_raw.get(), dctx_size);
if (!_cctx) {
throw std::runtime_error("Unable to initialize ZSTD decompression context");
}
}
size_t zstd_processor::uncompress(const char* input, size_t input_len, char* output, size_t output_len) const {
auto ret = ZSTD_decompressDCtx(_dctx, output, output_len, input, input_len);
auto ret = with_dctx([&] (ZSTD_DCtx* dctx) {
return ZSTD_decompressDCtx(dctx, output, output_len, input, input_len);
});
if (ZSTD_isError(ret)) {
throw std::runtime_error( format("ZSTD decompression failure: {}", ZSTD_getErrorName(ret)));
}
@@ -100,7 +130,9 @@ size_t zstd_processor::uncompress(const char* input, size_t input_len, char* out
size_t zstd_processor::compress(const char* input, size_t input_len, char* output, size_t output_len) const {
auto ret = ZSTD_compressCCtx(_cctx, output, output_len, input, input_len, _compression_level);
auto ret = with_cctx(_cctx_size, [&] (ZSTD_CCtx* cctx) {
return ZSTD_compressCCtx(cctx, output, output_len, input, input_len, _compression_level);
});
if (ZSTD_isError(ret)) {
throw std::runtime_error( format("ZSTD compression failure: {}", ZSTD_getErrorName(ret)));
}