From f14e6e73bb52183b5f4a657925c413cac7735273 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 22 Jul 2019 15:00:43 +0200 Subject: [PATCH] Add ZStandard compression This adds the option to compress sstables using the Zstandard algorithm (https://facebook.github.io/zstd/). To use, pass 'sstable_compression': 'org.apache.cassandra.io.compress.ZstdCompressor' to the 'compression' argument when creating a table. You can also specify a 'compression_level'. See Zstd documentation for the available compression levels. Resolves #2613. Signed-off-by: Kamil Braun --- .gitmodules | 3 + configure.py | 32 ++++++++- licenses/zstd-license.txt | 30 +++++++++ sstables/compress.hh | 4 +- zstd | 1 + zstd.cc | 136 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 203 insertions(+), 3 deletions(-) create mode 100644 licenses/zstd-license.txt create mode 160000 zstd create mode 100644 zstd.cc diff --git a/.gitmodules b/.gitmodules index 26ed387eda..1f3494c617 100644 --- a/.gitmodules +++ b/.gitmodules @@ -12,3 +12,6 @@ [submodule "libdeflate"] path = libdeflate url = ../libdeflate +[submodule "zstd"] + path = zstd + url = ../zstd diff --git a/configure.py b/configure.py index f99bc2030a..771ddaf492 100755 --- a/configure.py +++ b/configure.py @@ -495,6 +495,7 @@ scylla_core = (['database.cc', 'keys.cc', 'counters.cc', 'compress.cc', + 'zstd.cc', 'sstables/mp_row_consumer.cc', 'sstables/sstables.cc', 'sstables/sstables_manager.cc', @@ -1088,6 +1089,7 @@ seastar_flags += ['--compiler', args.cxx, '--c-compiler', args.cc, '--cflags=%s' '--c++-dialect=gnu++17', '--use-std-optional-variant-stringview=1', '--optflags=%s' % (modes['release']['cxx_ld_flags']), ] libdeflate_cflags = seastar_cflags +zstd_cflags = seastar_cflags + ' -Wno-implicit-fallthrough' status = subprocess.call([args.python, './configure.py'] + seastar_flags, cwd='seastar') @@ -1117,6 +1119,27 @@ for mode in build_modes: modes[mode]['seastar_cflags'] = seastar_cflags modes[mode]['seastar_libs'] = seastar_libs +MODE_TO_CMAKE_BUILD_TYPE = {'release' : 'RelWithDebInfo', 'debug' : 'Debug', 'dev' : 'Dev', 'sanitize' : 'Sanitize' } + +# We need to use experimental features of the zstd library (to use our own allocators for the (de)compression context), +# which are available only when the library is linked statically. +def configure_zstd(build_dir, mode): + zstd_build_dir = os.path.join(build_dir, mode, 'zstd') + + zstd_cmake_args = [ + '-DCMAKE_BUILD_TYPE={}'.format(MODE_TO_CMAKE_BUILD_TYPE[mode]), + '-DCMAKE_C_COMPILER={}'.format(args.cc), + '-DCMAKE_CXX_COMPILER={}'.format(args.cxx), + '-DCMAKE_C_FLAGS={}'.format(zstd_cflags), + '-DZSTD_BUILD_PROGRAMS=OFF' + ] + + zstd_cmd = ['cmake', '-G', 'Ninja', os.path.relpath('zstd/build/cmake', zstd_build_dir)] + zstd_cmake_args + + print(zstd_cmd) + os.makedirs(zstd_build_dir, exist_ok=True) + subprocess.check_call(zstd_cmd, shell=False, cwd=zstd_build_dir) + args.user_cflags += " " + pkg_config('jsoncpp', '--cflags') args.user_cflags += ' -march=' + args.target libs = ' '.join([maybe_static(args.staticyamlcpp, '-lyaml-cpp'), '-latomic', '-llz4', '-lz', '-lsnappy', pkg_config('jsoncpp', '--libs'), @@ -1167,6 +1190,9 @@ if args.antlr3_exec: else: antlr3_exec = "antlr3" +for mode in build_modes: + configure_zstd(outdir, mode) + # configure.py may run automatically from an already-existing build.ninja. # If the user interrupts configure.py in the middle, we need build.ninja # to remain in a valid state. So we write our output to a temporary @@ -1283,7 +1309,8 @@ with open(buildfile_tmp, 'w') as f: f.write('build $builddir/{}/{}: ar.{} {}\n'.format(mode, binary, mode, str.join(' ', objs))) else: objs.extend(['$builddir/' + mode + '/' + artifact for artifact in [ - 'libdeflate/libdeflate.a' + 'libdeflate/libdeflate.a', + 'zstd/lib/libzstd.a', ]]) objs.append('$builddir/' + mode + '/gen/utils/gz/crc_combine_table.o') if binary.startswith('tests/'): @@ -1397,6 +1424,9 @@ with open(buildfile_tmp, 'w') as f: f.write('rule libdeflate.{mode}\n'.format(**locals())) f.write(' command = make -C libdeflate BUILD_DIR=../build/{mode}/libdeflate/ CFLAGS="{libdeflate_cflags}" CC={args.cc} ../build/{mode}/libdeflate//libdeflate.a\n'.format(**locals())) f.write('build build/{mode}/libdeflate/libdeflate.a: libdeflate.{mode}\n'.format(**locals())) + f.write('build build/{mode}/zstd/lib/libzstd.a: ninja\n'.format(**locals())) + f.write(' subdir = build/{mode}/zstd\n'.format(**locals())) + f.write(' target = libzstd.a\n'.format(**locals())) mode = 'dev' if 'dev' in modes else modes[0] f.write('build checkheaders: phony || {}\n'.format(' '.join(['$builddir/{}/{}.o'.format(mode, hh) for hh in headers]))) diff --git a/licenses/zstd-license.txt b/licenses/zstd-license.txt new file mode 100644 index 0000000000..a793a80289 --- /dev/null +++ b/licenses/zstd-license.txt @@ -0,0 +1,30 @@ +BSD License + +For Zstandard software + +Copyright (c) 2016-present, Facebook, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook nor the names of its contributors may be used to + endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/sstables/compress.hh b/sstables/compress.hh index 739ba8a367..226dda8b41 100644 --- a/sstables/compress.hh +++ b/sstables/compress.hh @@ -33,8 +33,8 @@ // a "compression_metadata" object, which also contains additional information // needed from decompression - such as the chunk size and compressor type. // -// Cassandra supports three different compression algorithms for the chunks, -// LZ4, Snappy, and Deflate - the default (and therefore most important) is +// Cassandra supports four different compression algorithms for the chunks, +// LZ4, Snappy, Deflate, and Zstd - the default (and therefore most important) is // LZ4. Each compressor is an implementation of the "compressor" class. // // Each compressed chunk is followed by a 4-byte checksum of the compressed diff --git a/zstd b/zstd new file mode 160000 index 0000000000..ff304e9e65 --- /dev/null +++ b/zstd @@ -0,0 +1 @@ +Subproject commit ff304e9e65e7cde17a637eea190a874c26c48634 diff --git a/zstd.cc b/zstd.cc new file mode 100644 index 0000000000..38db40db62 --- /dev/null +++ b/zstd.cc @@ -0,0 +1,136 @@ +/* + * Copyright (C) 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include + +// We need to use experimental features of the zstd library (to allocate compression/decompression context), +// which are available only when the library is linked statically. +#define ZSTD_STATIC_LINKING_ONLY +#include "zstd/lib/zstd.h" + +#include "compress.hh" +#include "utils/class_registrator.hh" + +static const sstring COMPRESSION_LEVEL = "compression_level"; +static const sstring COMPRESSOR_NAME = compressor::namespace_prefix + "ZstdCompressor"; + +class zstd_processor : public compressor { + int _compression_level = 3; + + // Manages memory for the compression context. + std::unique_ptr _cctx_raw; + // Compression context. Observer of _cctx_raw. + ZSTD_CCtx* _cctx; + + // Manages memory for the decompression context. + std::unique_ptr _dctx_raw; + // Decompression context. Observer of _dctx_raw. + ZSTD_DCtx* _dctx; +public: + zstd_processor(const opt_getter&); + + size_t uncompress(const char* input, size_t input_len, char* output, + size_t output_len) const override; + size_t compress(const char* input, size_t input_len, char* output, + size_t output_len) const override; + size_t compress_max_size(size_t input_len) const override; + + std::set option_names() const override; + std::map options() const override; +}; + +zstd_processor::zstd_processor(const opt_getter& opts) + : compressor(COMPRESSOR_NAME) { + auto level = opts(COMPRESSION_LEVEL); + if (level) { + try { + _compression_level = std::stoi(*level); + } catch (const std::exception& e) { + throw exceptions::syntax_exception( + format("Invalid integer value {} for {}", *level, COMPRESSION_LEVEL)); + } + + auto min_level = ZSTD_minCLevel(); + auto max_level = ZSTD_maxCLevel(); + if (min_level > _compression_level || _compression_level > max_level) { + throw exceptions::configuration_exception( + format("{} must be between {} and {}, got {}", COMPRESSION_LEVEL, min_level, max_level, _compression_level)); + } + } + + auto chunk_len_kb = opts(compression_parameters::CHUNK_LENGTH_KB); + if (!chunk_len_kb) { + chunk_len_kb = opts(compression_parameters::CHUNK_LENGTH_KB_ERR); + } + auto chunk_len = chunk_len_kb + // This parameter has already been validated. + ? std::stoi(*chunk_len_kb) * 1024 + : compression_parameters::DEFAULT_CHUNK_LENGTH; + + // We assume that the uncompressed input length is always <= chunk_len. + auto cparams = ZSTD_getCParams(_compression_level, chunk_len, 0); + auto cctx_size = ZSTD_estimateCCtxSize_usingCParams(cparams); + // According to the ZSTD documentation, pointer to the context buffer must be 8-bytes aligned. + _cctx_raw = allocate_aligned_buffer(cctx_size, 8); + _cctx = ZSTD_initStaticCCtx(_cctx_raw.get(), cctx_size); + if (!_cctx) { + throw std::runtime_error("Unable to initialize ZSTD compression context"); + } + + auto dctx_size = ZSTD_estimateDCtxSize(); + _dctx_raw = allocate_aligned_buffer(dctx_size, 8); + _dctx = ZSTD_initStaticDCtx(_dctx_raw.get(), dctx_size); + if (!_cctx) { + throw std::runtime_error("Unable to initialize ZSTD decompression context"); + } +} + +size_t zstd_processor::uncompress(const char* input, size_t input_len, char* output, size_t output_len) const { + auto ret = ZSTD_decompressDCtx(_dctx, output, output_len, input, input_len); + if (ZSTD_isError(ret)) { + throw std::runtime_error( format("ZSTD decompression failure: {}", ZSTD_getErrorName(ret))); + } + return ret; +} + + +size_t zstd_processor::compress(const char* input, size_t input_len, char* output, size_t output_len) const { + auto ret = ZSTD_compressCCtx(_cctx, output, output_len, input, input_len, _compression_level); + if (ZSTD_isError(ret)) { + throw std::runtime_error( format("ZSTD compression failure: {}", ZSTD_getErrorName(ret))); + } + return ret; +} + +size_t zstd_processor::compress_max_size(size_t input_len) const { + return ZSTD_compressBound(input_len); +} + +std::set zstd_processor::option_names() const { + return {COMPRESSION_LEVEL}; +} + +std::map zstd_processor::options() const { + return {{COMPRESSION_LEVEL, std::to_string(_compression_level)}}; +} + +static const class_registrator + registrator(COMPRESSOR_NAME);