d6e6ec96c4
IOCompletionPorts are the modern way to handle asynchronous IO without affected the system too much. Synchronization, work allocation and spreading, etc is all handled by the OS for us, which reduces the work we have to do in order to be NUMA aware. While this is far from perfect, it should perform better than a naive threaded approach. ToDo: - Add documentation generation - Add Github Actions integration - Write tests for everything. - Update 'benchmark' sample to work again. - Figure out a useful way to deal with connect/disconnect/error events. - Figure out the broken pipe error, caused by an additional connected event where none should have been.
173 lines
5.3 KiB
C++
173 lines
5.3 KiB
C++
/*
|
|
Low Latency IPC Library for high-speed traffic
|
|
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published
|
|
by the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "windows-client-socket.hpp"
|
|
#include "windows-utility.hpp"
|
|
|
|
::std::shared_ptr<::datapath::socket> datapath::socket::create()
|
|
{
|
|
return std::make_shared<::datapath::windows::client_socket>();
|
|
}
|
|
|
|
datapath::windows::client_socket::client_socket()
|
|
: socket(), _lock(), _opened(false), _path(), _worker_count(0), _iocp(), _handle()
|
|
{
|
|
// Create IOCP
|
|
HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, reinterpret_cast<ULONG_PTR>(this), 0);
|
|
if (iocp == INVALID_HANDLE_VALUE)
|
|
throw std::runtime_error("Failed to create IOCompletionPort.");
|
|
_iocp = std::shared_ptr<void>(iocp, ::datapath::windows::utility::shared_ptr_handle_deleter);
|
|
}
|
|
|
|
datapath::windows::client_socket::~client_socket()
|
|
{
|
|
close();
|
|
}
|
|
|
|
void datapath::windows::client_socket::set_path(std::string path)
|
|
{
|
|
std::lock_guard<std::mutex> lg(_lock);
|
|
|
|
if (_opened.load())
|
|
throw std::runtime_error("TODO"); // ToDo: Better exceptions.
|
|
|
|
// Create a proper path for Windows.
|
|
if (!::datapath::windows::utility::make_pipe_path(path))
|
|
throw std::runtime_error("TODO"); // ToDo: Better exceptions.
|
|
|
|
// Store the new path.
|
|
_path = utility::make_wide_string(path);
|
|
}
|
|
|
|
void datapath::windows::client_socket::open()
|
|
{
|
|
try {
|
|
// Close the server just to be safe.
|
|
close();
|
|
|
|
// Guard against multiple invocations.
|
|
std::lock_guard<std::mutex> lg(_lock);
|
|
|
|
// Create handle
|
|
SetLastError(ERROR_SUCCESS);
|
|
HANDLE handle = CreateFileW(_path.c_str(), GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING,
|
|
FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH, NULL);
|
|
if (handle == INVALID_HANDLE_VALUE) {
|
|
throw std::runtime_error("TODO");
|
|
}
|
|
_handle = std::shared_ptr<void>(handle, ::datapath::windows::utility::shared_ptr_handle_deleter);
|
|
|
|
// Create IOCP
|
|
HANDLE iocp = CreateIoCompletionPort(handle, _iocp.get(), reinterpret_cast<ULONG_PTR>(this), 0);
|
|
if (iocp == INVALID_HANDLE_VALUE)
|
|
throw std::runtime_error("Failed to create IOCompletionPort.");
|
|
if (iocp != _iocp.get()) {
|
|
CloseHandle(iocp);
|
|
throw std::runtime_error("Failed to link IOCompletionPort.");
|
|
}
|
|
|
|
// Update the pipe state, just to be safe.
|
|
DWORD mode = PIPE_READMODE_BYTE;
|
|
SetNamedPipeHandleState(_handle.get(), &mode, NULL, NULL);
|
|
|
|
// Update the overlapped objects to the new handle.
|
|
_ov_read.set_handle(_handle.get());
|
|
_ov_write.set_handle(_handle.get());
|
|
|
|
// Update state.
|
|
_opened.store(true);
|
|
} catch (std::exception const& ex) {
|
|
// If there was any problem during all of this, close the server,
|
|
// and throw the exception to the parent caller.
|
|
close();
|
|
throw ex;
|
|
}
|
|
}
|
|
|
|
void datapath::windows::client_socket::close()
|
|
{
|
|
// Guard against multiple invocations.
|
|
std::lock_guard<std::mutex> lg(_lock);
|
|
|
|
// Update state.
|
|
_opened.store(false);
|
|
|
|
// Kill Workers
|
|
for (std::size_t idx = 0, edx = _worker_count.load(); idx < edx; idx++) {
|
|
PostQueuedCompletionStatus(_iocp.get(), NULL, NULL, NULL);
|
|
}
|
|
|
|
// Close handles.
|
|
_iocp.reset();
|
|
_handle.reset();
|
|
}
|
|
|
|
bool datapath::windows::client_socket::is_open()
|
|
{
|
|
return _opened.load();
|
|
}
|
|
|
|
void datapath::windows::client_socket::work(std::chrono::milliseconds time_limit)
|
|
{
|
|
// Thread local state.
|
|
std::shared_ptr<void> iocp;
|
|
auto limit = time_limit.count();
|
|
DWORD w_time_limit = (limit > 2147483648) ? INFINITE : static_cast<DWORD>(limit);
|
|
DWORD num_bytes_transferred = 0;
|
|
ULONG_PTR ptr = 0;
|
|
LPOVERLAPPED overlapped = 0;
|
|
DWORD error_code = ERROR_SUCCESS;
|
|
|
|
{ // Copy necessary state.
|
|
std::lock_guard<std::mutex> lg(_lock);
|
|
if (!_opened.load()) {
|
|
return;
|
|
}
|
|
|
|
iocp = _iocp;
|
|
}
|
|
|
|
// Try and get IOCP status.
|
|
_worker_count++;
|
|
DWORD status = GetQueuedCompletionStatus(iocp.get(), &num_bytes_transferred, &ptr, &overlapped, w_time_limit);
|
|
_worker_count--;
|
|
|
|
// Check if we suceeded.
|
|
if (status == TRUE) {
|
|
// Stop request or invalid status.
|
|
if (!overlapped) {
|
|
return;
|
|
}
|
|
|
|
// The OVERLAPPED object we will get here is actually always an instance
|
|
// of datapath::windows::overlapped. This allows us to go from it back
|
|
// to a proper object, and call the necessary callback, easily allowing
|
|
// new functionality to be added later.
|
|
//
|
|
// Note that this is technically very unsafe, however there is no other
|
|
// way to actually do this without writing a new Operating System.
|
|
windows::overlapped* ovp = windows::overlapped::from_overlapped(overlapped);
|
|
ovp->callback(static_cast<size_t>(num_bytes_transferred), reinterpret_cast<void*>(ptr));
|
|
}
|
|
|
|
// Request failed, check error code.
|
|
// ToDo: Deal with other error codes?
|
|
error_code = GetLastError();
|
|
return;
|
|
}
|