checkpoint

This commit is contained in:
Avi Kivity
2014-08-12 23:52:23 +03:00
parent c3292aeb63
commit 919e9c4214
3 changed files with 132 additions and 12 deletions

View File

@@ -17,7 +17,7 @@ static std::string tchar = "[-!#$%&'\\*\\+.^_`|~0-9A-Za-z]";
static std::string token = tchar + "+";
static constexpr auto re_opt = std::regex::ECMAScript | std::regex::optimize;
static std::regex start_line_re { "([A-Z]+) (\\S+) HTTP/([0-9]\\.[0-9])\\r\\n", re_opt };
static std::regex header_re { "(" + token + ")\\s*:\\s*([.*\\S])\\s*\\r\\n", re_opt };
static std::regex header_re { "(" + token + ")\\s*:\\s*(.*\\S)\\s*\\r\\n", re_opt };
static std::regex header_cont_re { "\\s+(.*\\S)\\s*\\r\\n", re_opt };
class http_server {
@@ -48,12 +48,16 @@ public:
sstring _method;
sstring _url;
sstring _version;
sstring _response;
sstring _response_line;
sstring _last_header_name;
sstring _response_body;
std::unordered_map<sstring, sstring> _headers;
std::unordered_map<sstring, sstring> _response_headers;
output_stream_buffer<char> _write_buf;
public:
connection(http_server& server, std::unique_ptr<pollable_fd>&& fd, socket_address addr)
: _server(server), _fd(std::move(fd)), _addr(addr), _read_buf(*_fd, 8192) {}
: _server(server), _fd(std::move(fd)), _addr(addr), _read_buf(*_fd, 8192)
, _write_buf(*_fd, 8192) {}
void read() {
_read_buf.read_until(limit, '\n').then([this] (future<tmp_buf> fut_start_line) {
auto start_line = fut_start_line.get();
@@ -77,7 +81,7 @@ public:
void parse_header(future<tmp_buf> f_header) {
auto header = f_header.get();
if (header.size() == 2 && header[0] == '\r' && header[1] == '\n') {
return;
return generate_response();
}
std::cmatch match;
if (std::regex_match(header.begin(), header.end(), match, header_re)) {
@@ -89,6 +93,7 @@ public:
} else if (std::regex_match(header.begin(), header.end(), match, header_cont_re)) {
_headers[_last_header_name] += " ";
_headers[_last_header_name] += to_sstring(match[1]);
std::cout << "found header: " << _last_header_name << "=" << _headers[_last_header_name] << ".\n";
} else {
return bad();
}
@@ -97,11 +102,57 @@ public:
});
}
void bad() {
_response = "400 BAD REQUEST\r\n\r\n";
_fd->write_all(_response.begin(), _response.size()).then([this] (future<size_t> n) mutable {
delete this;
_response_line = "HTTP/1.1 400 BAD REQUEST\r\n\r\n";
respond();
}
void respond() {
_write_buf.write(_response_line.begin(), _response_line.size()).then(
[this] (future<size_t> n) mutable {
write_response_headers(_response_headers.begin()).then(
[this] (future<size_t> done) {
write_body().then(
[this] (future<size_t> done) {
_write_buf.flush().then(
[this] (future<bool> done) {
delete this;
});
});
});
});
}
future<size_t> write_response_headers(std::unordered_map<sstring, sstring>::iterator hi) {
if (hi == _response_headers.end()) {
return _write_buf.write("\r\n", 2);
}
promise<size_t> pr;
auto fut = pr.get_future();
_write_buf.write(hi->first.begin(), hi->first.size()).then(
[hi, this, pr = std::move(pr)] (future<size_t> done) mutable {
_write_buf.write(": ", 2).then(
[hi, this, pr = std::move(pr)] (future<size_t> done) mutable {
_write_buf.write(hi->second.begin(), hi->second.size()).then(
[hi, this, pr = std::move(pr)] (future<size_t> done) mutable {
_write_buf.write("\r\n", 2).then(
[hi, this, pr = std::move(pr)] (future<size_t> done) mutable {
write_response_headers(++hi).then(
[this, pr = std::move(pr)] (future<size_t> done) mutable {
pr.set_value(done.get());
});
});
});
});
});
return fut;
}
void generate_response() {
_response_line = "HTTP/1.1 200 OK\r\n\r\n";
_response_headers["Content-Type"] = "text/html";
_response_body = "<html><head><title>this is the future</title></head><body><p>Future!!</p></body></html>";
respond();
}
future<size_t> write_body() {
return _write_buf.write(_response_body.begin(), _response_body.size());
}
};
};

View File

@@ -199,10 +199,10 @@ public:
}
template <typename Func, typename Enable>
void then(Func, Enable) &&;
void then(Func, Enable);
template <typename Func>
void then(Func&& func, std::enable_if_t<std::is_same<std::result_of_t<Func(future&&)>, void>::value, void*> = nullptr) && {
void then(Func&& func, std::enable_if_t<std::is_same<std::result_of_t<Func(future&&)>, void>::value, void*> = nullptr) {
auto state = _state;
state->schedule([fut = std::move(*this), func = std::forward<Func>(func)] () mutable {
func(std::move(fut));
@@ -375,6 +375,25 @@ private:
void read_until_part(size_t n, CharType eol, promise<tmp_buf> pr, tmp_buf buf, size_t completed);
};
template <typename CharType>
class output_stream_buffer {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
pollable_fd& _fd;
std::unique_ptr<CharType[]> _buf;
size_t _size;
size_t _begin = 0;
size_t _end = 0;
private:
size_t available() const { return _end - _begin; }
size_t possibly_available() const { return _size - _begin; }
public:
using char_type = CharType;
output_stream_buffer(pollable_fd& fd, size_t size) : _fd(fd), _buf(new char_type[size]), _size(size) {}
future<size_t> write(const char_type* buf, size_t n);
future<bool> flush();
private:
};
inline
future<accept_result>
@@ -546,6 +565,53 @@ input_stream_buffer<CharType>::read_until(size_t limit, CharType eol) {
return fut;
}
template <typename CharType>
future<size_t>
output_stream_buffer<CharType>::write(const char_type* buf, size_t n) {
promise<size_t> pr;
auto fut = pr.get_future();
if (n >= _size) {
flush().then([pr = std::move(pr), this, buf, n] (future<bool> done) mutable {
_fd.write_all(buf, n).then([pr = std::move(pr)] (future<size_t> done) mutable {
pr.set_value(done.get());
});
});
return fut;
}
auto now = std::min(n, _size - _end);
std::copy(buf, buf + now, _buf.get() + _end);
_end += now;
if (now == n) {
pr.set_value(n);
} else {
_fd.write_all(_buf.get(), _end).then(
[pr = std::move(pr), this, now, n, buf] (future<size_t> w) mutable {
std::copy(buf + now, buf + n, _buf.get());
_end = n - now;
pr.set_value(n);
});
}
return fut;
}
template <typename CharType>
future<bool>
output_stream_buffer<CharType>::flush() {
promise<bool> pr;
auto fut = pr.get_future();
if (!_end) {
pr.set_value(true);
} else {
_fd.write_all(_buf.get(), _end).then(
[this, pr = std::move(pr)] (future<size_t> done) mutable {
_end = 0;
pr.set_value(true);
});
}
return fut;
}
template <typename T>
void future_state<T>::make_ready() {
if (_task) {
@@ -553,6 +619,8 @@ void future_state<T>::make_ready() {
}
}
#if 0
future<temporary_buffer<CharType>> read_until(size_t limit, const CharType* eol, size_t eol_len);
#endif

View File

@@ -129,12 +129,13 @@ public:
delete u.external.str;
}
u.internal.size = 0;
u.internal.str[0] = '\0';
}
void swap(basic_sstring& x) noexcept {
basic_sstring tmp;
tmp.u = x.u;
contents tmp;
tmp = x.u;
x.u = u;
u = tmp.u;
u = tmp;
}
const char* c_str() const {
return str();