d6e6ec96c4
IOCompletionPorts are the modern way to handle asynchronous IO without affected the system too much. Synchronization, work allocation and spreading, etc is all handled by the OS for us, which reduces the work we have to do in order to be NUMA aware. While this is far from perfect, it should perform better than a naive threaded approach. ToDo: - Add documentation generation - Add Github Actions integration - Write tests for everything. - Update 'benchmark' sample to work again. - Figure out a useful way to deal with connect/disconnect/error events. - Figure out the broken pipe error, caused by an additional connected event where none should have been.
354 lines
10 KiB
C++
354 lines
10 KiB
C++
/*
|
|
Low Latency IPC Library for high-speed traffic
|
|
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published
|
|
by the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "windows-socket.hpp"
|
|
|
|
void datapath::windows::socket::read(::datapath::io_callback_t callback, ::datapath::io_callback_data_t callback_data)
|
|
{
|
|
auto res = queue_read(callback, callback_data);
|
|
switch (res) {
|
|
case error::Success:
|
|
return;
|
|
case error::SocketClosed: {
|
|
auto status = ::datapath::error::SocketClosed;
|
|
auto self = shared_from_this();
|
|
events.closed(status, self);
|
|
internal_events.closed(status, self);
|
|
throw std::runtime_error("Socket closed");
|
|
}
|
|
default:
|
|
throw std::runtime_error("TODO");
|
|
}
|
|
}
|
|
|
|
void datapath::windows::socket::write(const ::datapath::io_data_t& data, ::datapath::io_callback_t callback,
|
|
::datapath::io_callback_data_t callback_data)
|
|
{
|
|
auto res = queue_write(data.data(), data.size(), callback, callback_data);
|
|
switch (res) {
|
|
case error::Success:
|
|
return;
|
|
case error::SocketClosed: {
|
|
auto status = ::datapath::error::SocketClosed;
|
|
auto self = shared_from_this();
|
|
events.closed(status, self);
|
|
internal_events.closed(status, self);
|
|
throw std::runtime_error("Socket closed");
|
|
}
|
|
default:
|
|
throw std::runtime_error("TODO");
|
|
}
|
|
}
|
|
|
|
void datapath::windows::socket::write(const std::uint8_t* data, const size_t data_length,
|
|
::datapath::io_callback_t callback, ::datapath::io_callback_data_t callback_data)
|
|
{
|
|
auto res = queue_write(data, data_length, callback, callback_data);
|
|
switch (res) {
|
|
case error::Success:
|
|
return;
|
|
case error::SocketClosed: {
|
|
auto status = ::datapath::error::SocketClosed;
|
|
auto self = shared_from_this();
|
|
events.closed(status, self);
|
|
internal_events.closed(status, self);
|
|
throw std::runtime_error("Socket closed");
|
|
}
|
|
default:
|
|
throw std::runtime_error("TODO");
|
|
}
|
|
}
|
|
|
|
::datapath::error datapath::windows::socket::queue_read(::datapath::io_callback_t cb,
|
|
::datapath::io_callback_data_t cbd)
|
|
{
|
|
// Early-Exit if the socket is already closed.
|
|
if (!is_open())
|
|
return ::datapath::error::NotSupported;
|
|
|
|
// Prevent multiple thread from modifying the same objects.
|
|
std::lock_guard<std::recursive_mutex> lg(_read_queue_lock);
|
|
|
|
// Queue the read operation.
|
|
_read_queue.push(read_data_t{cb, cbd});
|
|
|
|
// If this is the only operation, instantly perform it.
|
|
if (_read_queue.size() == 1) {
|
|
return perform_read();
|
|
} else {
|
|
return ::datapath::error::Success;
|
|
}
|
|
}
|
|
|
|
::datapath::error datapath::windows::socket::queue_write(const std::uint8_t* data, std::size_t length,
|
|
::datapath::io_callback_t cb,
|
|
::datapath::io_callback_data_t cbd)
|
|
{
|
|
// Early-Exit if the socket is already closed.
|
|
if (!is_open())
|
|
return ::datapath::error::NotSupported;
|
|
|
|
// Build actual packet.
|
|
io_data_t packet;
|
|
packet.resize(length + sizeof(packet_size_t));
|
|
memcpy(packet.data() + sizeof(packet_size_t), data, length);
|
|
*reinterpret_cast<packet_size_t*>(packet.data()) = length;
|
|
|
|
// Prevent multiple thread from modifying the same objects.
|
|
std::lock_guard<std::recursive_mutex> lg(_write_queue_lock);
|
|
|
|
// Queue the read request.
|
|
_write_queue.push(std::move(write_data_t{std::move(packet), cb, cbd}));
|
|
|
|
// If this is the only operation, instantly perform it.
|
|
if (_write_queue.size() == 1) {
|
|
return perform_write();
|
|
} else {
|
|
return ::datapath::error::Success;
|
|
}
|
|
}
|
|
|
|
::datapath::error datapath::windows::socket::perform_read()
|
|
{
|
|
DWORD bytes;
|
|
|
|
// Prevent multiple thread from modifying the same objects.
|
|
std::lock_guard<std::recursive_mutex> lg(_read_queue_lock);
|
|
|
|
// Early-exit if there is nothing to do.
|
|
if (_read_queue.size() == 0)
|
|
return ::datapath::error::Failure;
|
|
|
|
// Resize buffer, then issue read request.
|
|
_read_buffer.resize(sizeof(uint32_t));
|
|
_ov_read.reset();
|
|
_ov_read.set_callback(std::bind(&::datapath::windows::socket::on_read_header, this, std::placeholders::_1,
|
|
std::placeholders::_2, std::placeholders::_3));
|
|
|
|
// Issue read request (return value can be ignored, it will always be false).
|
|
SetLastError(ERROR_SUCCESS);
|
|
DWORD res =
|
|
ReadFile(_ov_read.get_handle(), _read_buffer.data(), _read_buffer.size(), &bytes, _ov_read.get_overlapped());
|
|
|
|
// Report result to caller.
|
|
DWORD ec = GetLastError();
|
|
switch (ec) {
|
|
case ERROR_SUCCESS:
|
|
case ERROR_IO_PENDING:
|
|
return ::datapath::error::Success;
|
|
case ERROR_PIPE_NOT_CONNECTED:
|
|
case ERROR_BROKEN_PIPE:
|
|
return ::datapath::error::SocketClosed;
|
|
default:
|
|
return ::datapath::error::Failure;
|
|
}
|
|
}
|
|
|
|
::datapath::error datapath::windows::socket::perform_read_packet(packet_size_t size)
|
|
{
|
|
DWORD bytes;
|
|
|
|
// Prevent multiple thread from modifying the same objects.
|
|
std::lock_guard<std::recursive_mutex> lg(_read_queue_lock);
|
|
|
|
// Early-exit if there is nothing to do.
|
|
if (_read_queue.size() == 0)
|
|
return ::datapath::error::Failure;
|
|
|
|
// Resize Buffer, then issue the 2nd read request.
|
|
_read_buffer.resize(size);
|
|
_ov_read.reset();
|
|
_ov_read.set_callback(std::bind(&::datapath::windows::socket::on_read, this, std::placeholders::_1,
|
|
std::placeholders::_2, std::placeholders::_3));
|
|
|
|
// Issue read request (return value can be ignored, it will always be false).
|
|
SetLastError(ERROR_SUCCESS);
|
|
DWORD res =
|
|
ReadFile(_ov_read.get_handle(), _read_buffer.data(), _read_buffer.size(), &bytes, _ov_read.get_overlapped());
|
|
|
|
// Report result to caller.
|
|
DWORD ec = GetLastError();
|
|
switch (ec) {
|
|
case ERROR_SUCCESS:
|
|
case ERROR_IO_PENDING:
|
|
return ::datapath::error::Success;
|
|
case ERROR_PIPE_NOT_CONNECTED:
|
|
case ERROR_BROKEN_PIPE:
|
|
return ::datapath::error::SocketClosed;
|
|
default:
|
|
return ::datapath::error::Failure;
|
|
}
|
|
}
|
|
|
|
::datapath::error datapath::windows::socket::perform_write()
|
|
{
|
|
DWORD bytes;
|
|
|
|
// Prevent multiple thread from modifying the same objects.
|
|
std::lock_guard<std::recursive_mutex> lg(_write_queue_lock);
|
|
|
|
// Early-exit if there is nothing to do.
|
|
if (_write_queue.size() == 0)
|
|
return ::datapath::error::Failure;
|
|
|
|
// Lock the queue, and grab the front element.
|
|
auto& front = _write_queue.front();
|
|
auto& data = std::get<io_data_t>(front);
|
|
|
|
// Reset the overlapped object.
|
|
_ov_write.reset();
|
|
|
|
// Issue write request (return value can be ignored, always false).
|
|
SetLastError(ERROR_SUCCESS);
|
|
DWORD res = WriteFile(_ov_write.get_handle(), data.data(), data.size(), &bytes, _ov_write.get_overlapped());
|
|
|
|
// Report result to caller.
|
|
DWORD ec = GetLastError();
|
|
switch (ec) {
|
|
case ERROR_SUCCESS:
|
|
case ERROR_IO_PENDING:
|
|
return ::datapath::error::Success;
|
|
case ERROR_PIPE_NOT_CONNECTED:
|
|
case ERROR_BROKEN_PIPE:
|
|
return ::datapath::error::SocketClosed;
|
|
default:
|
|
return ::datapath::error::Failure;
|
|
}
|
|
}
|
|
|
|
void datapath::windows::socket::on_read_header(::datapath::windows::overlapped& ov, std::size_t bytes_read, void* ptr)
|
|
{
|
|
io_data_t data;
|
|
|
|
// Sanity Check: Did we actually read the entire header?
|
|
if (bytes_read != sizeof(packet_size_t)) {
|
|
// Assume remote is a bad actor, or system is in a bad state.
|
|
{
|
|
std::lock_guard<std::recursive_mutex> lg(_read_queue_lock);
|
|
|
|
auto el = std::move(_read_queue.front());
|
|
if (el.first) {
|
|
el.first(shared_from_this(), ::datapath::error::BadHeader, data, el.second);
|
|
}
|
|
_read_queue.pop();
|
|
}
|
|
close();
|
|
return;
|
|
}
|
|
|
|
// Read the given size.
|
|
packet_size_t size = *reinterpret_cast<packet_size_t*>(_read_buffer.data());
|
|
|
|
// Sanity Check: Is the packet bigger than we allow it to be?
|
|
if (size > ::datapath::maximum_packet_size) {
|
|
// Soft-fail, remote may be testing our capabilities, is outdated, or may be newer than us.
|
|
std::lock_guard<std::recursive_mutex> lg(_read_queue_lock);
|
|
|
|
auto el = std::move(_read_queue.front());
|
|
if (el.first) {
|
|
el.first(shared_from_this(), ::datapath::error::BadSize, data, el.second);
|
|
}
|
|
_read_queue.pop();
|
|
|
|
switch (perform_read()) {
|
|
case error::SocketClosed: {
|
|
auto status = ::datapath::error::SocketClosed;
|
|
auto self = shared_from_this();
|
|
events.closed(status, self);
|
|
internal_events.closed(status, self);
|
|
return;
|
|
}
|
|
default:
|
|
return;
|
|
}
|
|
}
|
|
|
|
switch (perform_read_packet(size)) {
|
|
case error::SocketClosed: {
|
|
auto status = ::datapath::error::SocketClosed;
|
|
auto self = shared_from_this();
|
|
events.closed(status, self);
|
|
internal_events.closed(status, self);
|
|
return;
|
|
}
|
|
default:
|
|
return;
|
|
}
|
|
}
|
|
|
|
void datapath::windows::socket::on_read(::datapath::windows::overlapped& ov, std::size_t size, void* ptr)
|
|
{ // Read completed, hopefully.
|
|
::datapath::error status = ov.status();
|
|
read_data_t el;
|
|
io_data_t data{_read_buffer.data(), _read_buffer.data() + _read_buffer.size()};
|
|
|
|
// Lock the queue from outside modifications.
|
|
{
|
|
std::lock_guard<std::recursive_mutex> lg(_read_queue_lock);
|
|
el = std::move(_read_queue.front());
|
|
_read_queue.pop();
|
|
|
|
switch (perform_read()) {
|
|
case error::SocketClosed: {
|
|
auto status = ::datapath::error::SocketClosed;
|
|
auto self = shared_from_this();
|
|
events.closed(status, self);
|
|
internal_events.closed(status, self);
|
|
return;
|
|
}
|
|
default:
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Call the callback (if there is one).
|
|
if (el.first) {
|
|
el.first(shared_from_this(), ::datapath::error::Success, data, el.second);
|
|
}
|
|
}
|
|
|
|
void datapath::windows::socket::on_write(::datapath::windows::overlapped& ov, std::size_t size, void* ptr)
|
|
{
|
|
// Write completed. Hopefully.
|
|
|
|
::datapath::error status = ov.status();
|
|
write_data_t el;
|
|
|
|
{ // Remove from queue, and spawn new write request if something is still queued.
|
|
std::lock_guard<std::recursive_mutex> lg(_write_queue_lock);
|
|
el = std::move(_write_queue.front());
|
|
_write_queue.pop();
|
|
|
|
switch (perform_write()) {
|
|
case error::SocketClosed: {
|
|
auto status = ::datapath::error::SocketClosed;
|
|
auto self = shared_from_this();
|
|
events.closed(status, self);
|
|
internal_events.closed(status, self);
|
|
return;
|
|
}
|
|
default:
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Call callback.
|
|
std::get<::datapath::io_callback_t>(el)(shared_from_this(), status, std::get<::datapath::io_data_t>(el),
|
|
std::get<::datapath::io_callback_data_t>(el));
|
|
}
|