diff --git a/.gitmodules b/.gitmodules
index 26ed387eda..1f3494c617 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -12,3 +12,6 @@
[submodule "libdeflate"]
path = libdeflate
url = ../libdeflate
+[submodule "zstd"]
+ path = zstd
+ url = ../zstd
diff --git a/configure.py b/configure.py
index f99bc2030a..771ddaf492 100755
--- a/configure.py
+++ b/configure.py
@@ -495,6 +495,7 @@ scylla_core = (['database.cc',
'keys.cc',
'counters.cc',
'compress.cc',
+ 'zstd.cc',
'sstables/mp_row_consumer.cc',
'sstables/sstables.cc',
'sstables/sstables_manager.cc',
@@ -1088,6 +1089,7 @@ seastar_flags += ['--compiler', args.cxx, '--c-compiler', args.cc, '--cflags=%s'
'--c++-dialect=gnu++17', '--use-std-optional-variant-stringview=1', '--optflags=%s' % (modes['release']['cxx_ld_flags']), ]
libdeflate_cflags = seastar_cflags
+zstd_cflags = seastar_cflags + ' -Wno-implicit-fallthrough'
status = subprocess.call([args.python, './configure.py'] + seastar_flags, cwd='seastar')
@@ -1117,6 +1119,27 @@ for mode in build_modes:
modes[mode]['seastar_cflags'] = seastar_cflags
modes[mode]['seastar_libs'] = seastar_libs
+MODE_TO_CMAKE_BUILD_TYPE = {'release' : 'RelWithDebInfo', 'debug' : 'Debug', 'dev' : 'Dev', 'sanitize' : 'Sanitize' }
+
+# We need to use experimental features of the zstd library (to use our own allocators for the (de)compression context),
+# which are available only when the library is linked statically.
+def configure_zstd(build_dir, mode):
+ zstd_build_dir = os.path.join(build_dir, mode, 'zstd')
+
+ zstd_cmake_args = [
+ '-DCMAKE_BUILD_TYPE={}'.format(MODE_TO_CMAKE_BUILD_TYPE[mode]),
+ '-DCMAKE_C_COMPILER={}'.format(args.cc),
+ '-DCMAKE_CXX_COMPILER={}'.format(args.cxx),
+ '-DCMAKE_C_FLAGS={}'.format(zstd_cflags),
+ '-DZSTD_BUILD_PROGRAMS=OFF'
+ ]
+
+ zstd_cmd = ['cmake', '-G', 'Ninja', os.path.relpath('zstd/build/cmake', zstd_build_dir)] + zstd_cmake_args
+
+ print(zstd_cmd)
+ os.makedirs(zstd_build_dir, exist_ok=True)
+ subprocess.check_call(zstd_cmd, shell=False, cwd=zstd_build_dir)
+
args.user_cflags += " " + pkg_config('jsoncpp', '--cflags')
args.user_cflags += ' -march=' + args.target
libs = ' '.join([maybe_static(args.staticyamlcpp, '-lyaml-cpp'), '-latomic', '-llz4', '-lz', '-lsnappy', pkg_config('jsoncpp', '--libs'),
@@ -1167,6 +1190,9 @@ if args.antlr3_exec:
else:
antlr3_exec = "antlr3"
+for mode in build_modes:
+ configure_zstd(outdir, mode)
+
# configure.py may run automatically from an already-existing build.ninja.
# If the user interrupts configure.py in the middle, we need build.ninja
# to remain in a valid state. So we write our output to a temporary
@@ -1283,7 +1309,8 @@ with open(buildfile_tmp, 'w') as f:
f.write('build $builddir/{}/{}: ar.{} {}\n'.format(mode, binary, mode, str.join(' ', objs)))
else:
objs.extend(['$builddir/' + mode + '/' + artifact for artifact in [
- 'libdeflate/libdeflate.a'
+ 'libdeflate/libdeflate.a',
+ 'zstd/lib/libzstd.a',
]])
objs.append('$builddir/' + mode + '/gen/utils/gz/crc_combine_table.o')
if binary.startswith('tests/'):
@@ -1397,6 +1424,9 @@ with open(buildfile_tmp, 'w') as f:
f.write('rule libdeflate.{mode}\n'.format(**locals()))
f.write(' command = make -C libdeflate BUILD_DIR=../build/{mode}/libdeflate/ CFLAGS="{libdeflate_cflags}" CC={args.cc} ../build/{mode}/libdeflate//libdeflate.a\n'.format(**locals()))
f.write('build build/{mode}/libdeflate/libdeflate.a: libdeflate.{mode}\n'.format(**locals()))
+ f.write('build build/{mode}/zstd/lib/libzstd.a: ninja\n'.format(**locals()))
+ f.write(' subdir = build/{mode}/zstd\n'.format(**locals()))
+ f.write(' target = libzstd.a\n'.format(**locals()))
mode = 'dev' if 'dev' in modes else modes[0]
f.write('build checkheaders: phony || {}\n'.format(' '.join(['$builddir/{}/{}.o'.format(mode, hh) for hh in headers])))
diff --git a/licenses/zstd-license.txt b/licenses/zstd-license.txt
new file mode 100644
index 0000000000..a793a80289
--- /dev/null
+++ b/licenses/zstd-license.txt
@@ -0,0 +1,30 @@
+BSD License
+
+For Zstandard software
+
+Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ * Neither the name Facebook nor the names of its contributors may be used to
+ endorse or promote products derived from this software without specific
+ prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/sstables/compress.hh b/sstables/compress.hh
index 739ba8a367..226dda8b41 100644
--- a/sstables/compress.hh
+++ b/sstables/compress.hh
@@ -33,8 +33,8 @@
// a "compression_metadata" object, which also contains additional information
// needed from decompression - such as the chunk size and compressor type.
//
-// Cassandra supports three different compression algorithms for the chunks,
-// LZ4, Snappy, and Deflate - the default (and therefore most important) is
+// Cassandra supports four different compression algorithms for the chunks,
+// LZ4, Snappy, Deflate, and Zstd - the default (and therefore most important) is
// LZ4. Each compressor is an implementation of the "compressor" class.
//
// Each compressed chunk is followed by a 4-byte checksum of the compressed
diff --git a/zstd b/zstd
new file mode 160000
index 0000000000..ff304e9e65
--- /dev/null
+++ b/zstd
@@ -0,0 +1 @@
+Subproject commit ff304e9e65e7cde17a637eea190a874c26c48634
diff --git a/zstd.cc b/zstd.cc
new file mode 100644
index 0000000000..38db40db62
--- /dev/null
+++ b/zstd.cc
@@ -0,0 +1,136 @@
+/*
+ * Copyright (C) 2019 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#include
+
+// We need to use experimental features of the zstd library (to allocate compression/decompression context),
+// which are available only when the library is linked statically.
+#define ZSTD_STATIC_LINKING_ONLY
+#include "zstd/lib/zstd.h"
+
+#include "compress.hh"
+#include "utils/class_registrator.hh"
+
+static const sstring COMPRESSION_LEVEL = "compression_level";
+static const sstring COMPRESSOR_NAME = compressor::namespace_prefix + "ZstdCompressor";
+
+class zstd_processor : public compressor {
+ int _compression_level = 3;
+
+ // Manages memory for the compression context.
+ std::unique_ptr _cctx_raw;
+ // Compression context. Observer of _cctx_raw.
+ ZSTD_CCtx* _cctx;
+
+ // Manages memory for the decompression context.
+ std::unique_ptr _dctx_raw;
+ // Decompression context. Observer of _dctx_raw.
+ ZSTD_DCtx* _dctx;
+public:
+ zstd_processor(const opt_getter&);
+
+ size_t uncompress(const char* input, size_t input_len, char* output,
+ size_t output_len) const override;
+ size_t compress(const char* input, size_t input_len, char* output,
+ size_t output_len) const override;
+ size_t compress_max_size(size_t input_len) const override;
+
+ std::set option_names() const override;
+ std::map options() const override;
+};
+
+zstd_processor::zstd_processor(const opt_getter& opts)
+ : compressor(COMPRESSOR_NAME) {
+ auto level = opts(COMPRESSION_LEVEL);
+ if (level) {
+ try {
+ _compression_level = std::stoi(*level);
+ } catch (const std::exception& e) {
+ throw exceptions::syntax_exception(
+ format("Invalid integer value {} for {}", *level, COMPRESSION_LEVEL));
+ }
+
+ auto min_level = ZSTD_minCLevel();
+ auto max_level = ZSTD_maxCLevel();
+ if (min_level > _compression_level || _compression_level > max_level) {
+ throw exceptions::configuration_exception(
+ format("{} must be between {} and {}, got {}", COMPRESSION_LEVEL, min_level, max_level, _compression_level));
+ }
+ }
+
+ auto chunk_len_kb = opts(compression_parameters::CHUNK_LENGTH_KB);
+ if (!chunk_len_kb) {
+ chunk_len_kb = opts(compression_parameters::CHUNK_LENGTH_KB_ERR);
+ }
+ auto chunk_len = chunk_len_kb
+ // This parameter has already been validated.
+ ? std::stoi(*chunk_len_kb) * 1024
+ : compression_parameters::DEFAULT_CHUNK_LENGTH;
+
+ // We assume that the uncompressed input length is always <= chunk_len.
+ auto cparams = ZSTD_getCParams(_compression_level, chunk_len, 0);
+ auto cctx_size = ZSTD_estimateCCtxSize_usingCParams(cparams);
+ // According to the ZSTD documentation, pointer to the context buffer must be 8-bytes aligned.
+ _cctx_raw = allocate_aligned_buffer(cctx_size, 8);
+ _cctx = ZSTD_initStaticCCtx(_cctx_raw.get(), cctx_size);
+ if (!_cctx) {
+ throw std::runtime_error("Unable to initialize ZSTD compression context");
+ }
+
+ auto dctx_size = ZSTD_estimateDCtxSize();
+ _dctx_raw = allocate_aligned_buffer(dctx_size, 8);
+ _dctx = ZSTD_initStaticDCtx(_dctx_raw.get(), dctx_size);
+ if (!_cctx) {
+ throw std::runtime_error("Unable to initialize ZSTD decompression context");
+ }
+}
+
+size_t zstd_processor::uncompress(const char* input, size_t input_len, char* output, size_t output_len) const {
+ auto ret = ZSTD_decompressDCtx(_dctx, output, output_len, input, input_len);
+ if (ZSTD_isError(ret)) {
+ throw std::runtime_error( format("ZSTD decompression failure: {}", ZSTD_getErrorName(ret)));
+ }
+ return ret;
+}
+
+
+size_t zstd_processor::compress(const char* input, size_t input_len, char* output, size_t output_len) const {
+ auto ret = ZSTD_compressCCtx(_cctx, output, output_len, input, input_len, _compression_level);
+ if (ZSTD_isError(ret)) {
+ throw std::runtime_error( format("ZSTD compression failure: {}", ZSTD_getErrorName(ret)));
+ }
+ return ret;
+}
+
+size_t zstd_processor::compress_max_size(size_t input_len) const {
+ return ZSTD_compressBound(input_len);
+}
+
+std::set zstd_processor::option_names() const {
+ return {COMPRESSION_LEVEL};
+}
+
+std::map zstd_processor::options() const {
+ return {{COMPRESSION_LEVEL, std::to_string(_compression_level)}};
+}
+
+static const class_registrator
+ registrator(COMPRESSOR_NAME);