utils: Add "io-wrappers", useful IO helper types
Mainly to add a somewhat functional file-impl wrapping
a data_sink. This can implement a rudimentary, write-only,
file based on any output sink.
For testing, and because they fit there, place memory
sink and source types there as well.
(cherry picked from commit 98a6d0f79c)
This commit is contained in:
@@ -813,6 +813,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'utils/rjson.cc',
|
||||
'utils/human_readable.cc',
|
||||
'utils/histogram_metrics_helper.cc',
|
||||
'utils/io-wrappers.cc',
|
||||
'utils/on_internal_error.cc',
|
||||
'utils/pretty_printers.cc',
|
||||
'utils/stream_compressor.cc',
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#include <cstdio>
|
||||
#include <sstream>
|
||||
#include <cryptopp/sha.h>
|
||||
#include "utils/io-wrappers.hh"
|
||||
|
||||
future<sstring> generate_file_hash(sstring filename) {
|
||||
auto f = co_await seastar::open_file_dma(filename, seastar::open_flags::ro);
|
||||
@@ -258,3 +259,115 @@ SEASTAR_THREAD_TEST_CASE(test_file_stream_inject_error) {
|
||||
SEASTAR_THREAD_TEST_CASE(test_unsupported_file_ops) {
|
||||
do_test_unsupported_file_ops();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sink_wrapper) {
|
||||
std::vector<temporary_buffer<char>> bufs;
|
||||
|
||||
auto sink = create_memory_sink(bufs);
|
||||
auto file = create_file_for_sink(std::move(sink));
|
||||
|
||||
auto wa = file.disk_write_dma_alignment();
|
||||
|
||||
std::random_device rd;
|
||||
std::mt19937 gen(rd());
|
||||
std::uniform_int_distribution<unsigned> dist(0u, 255u);
|
||||
|
||||
constexpr auto n_bufs = 11;
|
||||
|
||||
std::vector<temporary_buffer<char>> src;
|
||||
src.reserve(n_bufs);
|
||||
for (int i = 0; i < n_bufs; ++i) {
|
||||
temporary_buffer<char> buf(wa);
|
||||
std::generate(buf.get_write(), buf.get_write() + wa, [&] {
|
||||
return (char)dist(gen);
|
||||
});
|
||||
src.emplace_back(std::move(buf));
|
||||
}
|
||||
|
||||
uint64_t pos = 0;
|
||||
for (auto& buf : src) {
|
||||
pos += file.dma_write(pos, buf.get(), buf.size()).get();
|
||||
}
|
||||
|
||||
auto final_len = n_bufs * wa - wa/3;
|
||||
file.truncate(final_len).get();
|
||||
file.flush().get();
|
||||
file.close().get();
|
||||
|
||||
auto res = std::accumulate(bufs.begin(), bufs.end(), size_t(0), [](size_t s, auto& buf) {
|
||||
return s + buf.size();
|
||||
});
|
||||
|
||||
BOOST_REQUIRE_EQUAL(final_len, res);
|
||||
|
||||
temporary_buffer<char> lin1(wa*n_bufs), lin2(wa*n_bufs);
|
||||
|
||||
std::accumulate(src.begin(), src.end(), lin1.get_write(), [](char* dst, temporary_buffer<char>& buf) {
|
||||
return std::copy(buf.begin(), buf.end(), dst);
|
||||
});
|
||||
std::accumulate(bufs.begin(), bufs.end(), lin2.get_write(), [](char* dst, temporary_buffer<char>& buf) {
|
||||
return std::copy(buf.begin(), buf.end(), dst);
|
||||
});
|
||||
|
||||
BOOST_REQUIRE(std::equal(lin1.begin(), lin1.begin() + final_len, lin2.begin()));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sink_wrapper_iovec) {
|
||||
std::vector<temporary_buffer<char>> bufs;
|
||||
|
||||
auto sink = create_memory_sink(bufs);
|
||||
auto file = create_file_for_sink(std::move(sink));
|
||||
|
||||
auto wa = file.disk_write_dma_alignment();
|
||||
|
||||
std::random_device rd;
|
||||
std::mt19937 gen(rd());
|
||||
std::uniform_int_distribution<unsigned> dist(0u, 255u);
|
||||
|
||||
constexpr auto n_bufs = 11;
|
||||
temporary_buffer<char> src(wa * n_bufs);
|
||||
|
||||
std::generate(src.get_write(), src.get_write() + src.size(), [&] {
|
||||
return (char)dist(gen);
|
||||
});
|
||||
|
||||
std::uniform_int_distribution<size_t> sdist(1u, wa);
|
||||
|
||||
auto* p = src.get_write();
|
||||
uint64_t pos = 0, end = src.size();
|
||||
|
||||
while (pos < end) {
|
||||
std::vector<iovec> iovs;
|
||||
auto rem = size_t(end - pos);
|
||||
auto p2 = p + pos;
|
||||
while (iovs.size() < 5 && rem != 0) {
|
||||
auto size = std::min(sdist(gen), rem);
|
||||
iovs.emplace_back(iovec{p2, size});
|
||||
p2 += size;
|
||||
rem -= size;
|
||||
}
|
||||
|
||||
auto written = file.dma_write(pos, std::move(iovs)).get();
|
||||
pos += written;
|
||||
}
|
||||
|
||||
auto final_len = src.size() - wa/3;
|
||||
file.truncate(final_len).get();
|
||||
file.flush().get();
|
||||
file.close().get();
|
||||
|
||||
auto res = std::accumulate(bufs.begin(), bufs.end(), size_t(0), [](size_t s, auto& buf) {
|
||||
return s + buf.size();
|
||||
});
|
||||
|
||||
BOOST_REQUIRE_EQUAL(final_len, res);
|
||||
|
||||
temporary_buffer<char> lin(wa*n_bufs);
|
||||
|
||||
std::accumulate(bufs.begin(), bufs.end(), lin.get_write(), [](char* dst, temporary_buffer<char>& buf) {
|
||||
return std::copy(buf.begin(), buf.end(), dst);
|
||||
});
|
||||
|
||||
BOOST_REQUIRE(std::equal(lin.begin(), lin.begin() + final_len, src.begin()));
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ target_sources(utils
|
||||
histogram_metrics_helper.cc
|
||||
human_readable.cc
|
||||
i_filter.cc
|
||||
io-wrappers.cc
|
||||
large_bitset.cc
|
||||
like_matcher.cc
|
||||
limiting_data_source.cc
|
||||
|
||||
245
utils/io-wrappers.cc
Normal file
245
utils/io-wrappers.cc
Normal file
@@ -0,0 +1,245 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "io-wrappers.hh"
|
||||
#include <seastar/util/internal/iovec_utils.hh>
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
class noop_file_impl : public file_impl {
|
||||
public:
|
||||
[[noreturn]] void not_implemented() const {
|
||||
throw std::logic_error("unsupported operation");
|
||||
}
|
||||
future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, io_intent*) override {
|
||||
not_implemented();
|
||||
}
|
||||
future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, io_intent* intent) override {
|
||||
not_implemented();
|
||||
}
|
||||
future<size_t> read_dma(uint64_t pos, void* buffer, size_t len, io_intent*) override {
|
||||
not_implemented();
|
||||
}
|
||||
future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, io_intent*) override {
|
||||
not_implemented();
|
||||
}
|
||||
future<> flush(void) override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
future<struct stat> stat(void) override {
|
||||
not_implemented();
|
||||
}
|
||||
future<> truncate(uint64_t length) override {
|
||||
not_implemented();
|
||||
}
|
||||
future<> discard(uint64_t offset, uint64_t) override {
|
||||
not_implemented();
|
||||
}
|
||||
future<> allocate(uint64_t position, uint64_t) override {
|
||||
not_implemented();
|
||||
}
|
||||
future<uint64_t> size(void) override {
|
||||
not_implemented();
|
||||
}
|
||||
future<> close() override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
std::unique_ptr<seastar::file_handle_impl> dup() override {
|
||||
not_implemented();
|
||||
}
|
||||
subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) override {
|
||||
not_implemented();
|
||||
}
|
||||
future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t range_size, io_intent* intent) override {
|
||||
not_implemented();
|
||||
}
|
||||
};
|
||||
|
||||
file create_file_for_sink(data_sink sink) {
|
||||
class data_sink_file_impl : public noop_file_impl {
|
||||
data_sink _sink;
|
||||
// keep a buffer to allow truncation and also
|
||||
// partially non-block sized writes (iovec)
|
||||
char _buffer[4096];
|
||||
uint64_t _pos = 0;
|
||||
uint64_t _bufpos = 0;
|
||||
public:
|
||||
data_sink_file_impl(data_sink sink)
|
||||
: _sink(std::move(sink))
|
||||
{
|
||||
assert(this->_disk_write_dma_alignment == sizeof(_buffer));
|
||||
}
|
||||
future<> put(const char* data, size_t len) {
|
||||
temporary_buffer<char> buf(data, len);
|
||||
return _sink.put(std::move(buf));
|
||||
}
|
||||
future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, io_intent*) override {
|
||||
if (len < this->_disk_write_dma_alignment) {
|
||||
throw std::invalid_argument("Invalid buffer length: " + std::to_string(len));
|
||||
}
|
||||
return do_write_dma(pos, buffer, len);
|
||||
}
|
||||
future<size_t> do_write_dma(uint64_t pos, const void* buffer, size_t len) {
|
||||
if (pos != _pos) {
|
||||
throw std::logic_error("Non-sequential write to sink file");
|
||||
}
|
||||
|
||||
auto res = len;
|
||||
const char* buf = reinterpret_cast<const char*>(buffer);
|
||||
|
||||
// if we've added any data to buffer last put, we need to
|
||||
// fill as much as we can and send the active block
|
||||
if (_bufpos != 0) {
|
||||
auto add = std::min(len, sizeof(_buffer) - _bufpos);
|
||||
std::copy(buf, buf + add, _buffer + _bufpos);
|
||||
buf += add;
|
||||
len -= add;
|
||||
_bufpos += add;
|
||||
_pos += add;
|
||||
if (_bufpos == sizeof(_buffer) && len > 0) {
|
||||
co_await put(_buffer, _bufpos);
|
||||
_bufpos = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// more than one block left? send it along.
|
||||
if (len > this->_disk_write_dma_alignment) {
|
||||
auto size = len - this->_disk_write_dma_alignment;
|
||||
co_await put(buf, size);
|
||||
|
||||
buf += size;
|
||||
pos += size;
|
||||
len -= size;
|
||||
}
|
||||
|
||||
assert(len <= sizeof(_buffer));
|
||||
assert(_bufpos == 0 || len == 0);
|
||||
std::copy(buf, buf+len, _buffer);
|
||||
_pos += len;
|
||||
_bufpos += len;
|
||||
|
||||
co_return res;
|
||||
}
|
||||
|
||||
future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, io_intent*) override {
|
||||
internal::sanitize_iovecs(iov, _disk_read_dma_alignment);
|
||||
size_t res = 0;
|
||||
for (auto& iv : iov) {
|
||||
res += co_await do_write_dma(pos + res, iv.iov_base, iv.iov_len);
|
||||
}
|
||||
co_return res;
|
||||
}
|
||||
future<> truncate(uint64_t length) override {
|
||||
if (length < _pos) {
|
||||
// bufpos should be a full buffer here.
|
||||
if (length < (_pos - _bufpos)) {
|
||||
throw std::invalid_argument("Cannot truncate below one disk page of current position");
|
||||
}
|
||||
_bufpos -= (_pos - length);
|
||||
}
|
||||
if (_bufpos != 0) {
|
||||
co_await put(_buffer, _bufpos);
|
||||
_bufpos = 0;
|
||||
}
|
||||
// if we truncate upwards, just write zeroes.
|
||||
uint64_t bp = 0;
|
||||
while (length > _pos) {
|
||||
auto rem = std::min(length - _pos, sizeof(_buffer));
|
||||
if (rem > bp) {
|
||||
std::fill(_buffer, _buffer + rem , 0);
|
||||
bp = rem;
|
||||
}
|
||||
co_await put(_buffer, rem);
|
||||
_pos += rem;
|
||||
}
|
||||
}
|
||||
future<> close() override {
|
||||
co_await truncate(_pos);
|
||||
co_await _sink.flush();
|
||||
co_await _sink.close();
|
||||
}
|
||||
};
|
||||
return file{make_shared<data_sink_file_impl>(std::move(sink))};
|
||||
}
|
||||
|
||||
file create_noop_file() {
|
||||
return file{make_shared<noop_file_impl>()};
|
||||
}
|
||||
|
||||
data_sink create_memory_sink(std::vector<seastar::temporary_buffer<char>>& bufs) {
|
||||
// TODO: move to seastar. Based on memory_data_sink, but allowing us
|
||||
// to actually move away the buffers later. I don't want to modify
|
||||
// util classes in an enterprise patch.
|
||||
class buffer_data_sink_impl : public data_sink_impl {
|
||||
std::vector<temporary_buffer<char>>& _bufs;
|
||||
public:
|
||||
buffer_data_sink_impl(std::vector<temporary_buffer<char>>& bufs)
|
||||
: _bufs(bufs)
|
||||
{}
|
||||
future<> put(net::packet p) override {
|
||||
return put(p.release());
|
||||
}
|
||||
future<> put(std::vector<temporary_buffer<char>> bufs) override {
|
||||
for (auto&& buf : bufs) {
|
||||
_bufs.emplace_back(std::move(buf));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
future<> put(temporary_buffer<char> buf) override {
|
||||
_bufs.emplace_back(std::move(buf));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
future<> flush() override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
future<> close() override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
size_t buffer_size() const noexcept override {
|
||||
return 128*1024;
|
||||
}
|
||||
};
|
||||
return data_sink(std::make_unique<buffer_data_sink_impl>(bufs));
|
||||
}
|
||||
|
||||
data_source create_memory_source(std::vector<seastar::temporary_buffer<char>> bufs) {
|
||||
// TODO: move to seastar. Based on buffer_input... in utils, but
|
||||
// handles potential 1+ buffers
|
||||
class buffer_data_source_impl : public data_source_impl {
|
||||
private:
|
||||
std::vector<temporary_buffer<char>> _bufs;
|
||||
size_t _index = 0;
|
||||
public:
|
||||
buffer_data_source_impl(std::vector<temporary_buffer<char>>&& bufs)
|
||||
: _bufs(std::move(bufs))
|
||||
{}
|
||||
buffer_data_source_impl(buffer_data_source_impl&&) noexcept = default;
|
||||
buffer_data_source_impl& operator=(buffer_data_source_impl&&) noexcept = default;
|
||||
|
||||
future<temporary_buffer<char>> get() override {
|
||||
if (_index < _bufs.size()) {
|
||||
return make_ready_future<temporary_buffer<char>>(std::move(_bufs.at(_index++)));
|
||||
}
|
||||
return make_ready_future<temporary_buffer<char>>();
|
||||
}
|
||||
future<temporary_buffer<char>> skip(uint64_t n) override {
|
||||
while (n > 0 && _index < _bufs.size()) {
|
||||
auto& buf = _bufs.at(_index);
|
||||
auto min = std::min(n, buf.size());
|
||||
buf.trim_front(min);
|
||||
if (buf.empty()) {
|
||||
++_index;
|
||||
}
|
||||
n -= min;
|
||||
}
|
||||
return get();
|
||||
}
|
||||
};
|
||||
return data_source(std::make_unique<buffer_data_source_impl>(std::move(bufs)));
|
||||
}
|
||||
|
||||
47
utils/io-wrappers.hh
Normal file
47
utils/io-wrappers.hh
Normal file
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Copyright (C) 2017-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/iostream.hh>
|
||||
#include <seastar/core/temporary_buffer.hh>
|
||||
#include <seastar/core/file.hh>
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
#include "seastarx.hh"
|
||||
|
||||
/**
|
||||
* Creates a write-only file wrapping a data_sink.
|
||||
*
|
||||
* The resulting file object can do sequential
|
||||
* writes only. It implements truncate, but is limited
|
||||
* to [<current write pos> - <write block size>] for
|
||||
* truncation position.
|
||||
*
|
||||
* Essentially, it is only really fit for
|
||||
* using as wrapped in a file_data_sink_impl under
|
||||
* an output_stream.
|
||||
*/
|
||||
seastar::file create_file_for_sink(seastar::data_sink);
|
||||
|
||||
/**
|
||||
* Creates a file that can only do flush() and close().
|
||||
* Do not write to it.
|
||||
*/
|
||||
seastar::file create_noop_file();
|
||||
|
||||
/**
|
||||
* Creates a data sink which will forward all data
|
||||
* sent to it into the destination vector.
|
||||
*/
|
||||
seastar::data_sink create_memory_sink(std::vector<seastar::temporary_buffer<char>>&);
|
||||
|
||||
/**
|
||||
* Creates a data source that will read data sequentially
|
||||
* from the source vector buffers.
|
||||
*/
|
||||
seastar::data_source create_memory_source(std::vector<seastar::temporary_buffer<char>>);
|
||||
Reference in New Issue
Block a user