windows: Rewrite onto IOCompletionPorts

IOCompletionPorts are the modern way to handle asynchronous IO without affected the system too much. Synchronization, work allocation and spreading, etc is all handled by the OS for us, which reduces the work we have to do in order to be NUMA aware. While this is far from perfect, it should perform better than a naive threaded approach.

ToDo:
- Add documentation generation
- Add Github Actions integration
- Write tests for everything.
- Update 'benchmark' sample to work again.
- Figure out a useful way to deal with connect/disconnect/error events.
- Figure out the broken pipe error, caused by an additional connected event where none should have been.
This commit is contained in:
Michael Fabian 'Xaymar' Dirks
2020-01-23 01:51:15 +01:00
parent f9acd6984a
commit d6e6ec96c4
45 changed files with 4659 additions and 1945 deletions
+101 -97
View File
@@ -24,7 +24,8 @@ set(VERSION_MAJOR 0)
set(VERSION_MINOR 1)
set(VERSION_PATCH 0)
set(VERSION_TWEAK 0)
set(PROJECT_COMMIT "N/A")
set(VERSION_COMMIT "")
set(VERSION_SUFFIX "")
if(EXISTS "${CMAKE_CURRENT_LIST_DIR}/.git")
set(GIT_RESULT "")
set(GIT_OUTPUT "")
@@ -46,13 +47,14 @@ if(EXISTS "${CMAKE_CURRENT_LIST_DIR}/.git")
OUTPUT_STRIP_TRAILING_WHITESPACE ERROR_STRIP_TRAILING_WHITESPACE ERROR_QUIET
)
if(GIT_RESULT EQUAL 0)
set(PROJECT_COMMIT ${GIT_OUTPUT})
set(VERSION_COMMIT ${GIT_OUTPUT})
endif()
endif()
# Define Project
project(
datapath
DataPath
LANGUAGES CXX
VERSION ${VERSION_MAJOR}.${VERSION_MINOR}.${VERSION_PATCH}.${VERSION_TWEAK}
)
set(PROJECT_FULL_NAME "DataPath IPC Library")
@@ -91,59 +93,67 @@ endif()
# Options
#================================================================================#
# Static or Dynamic?
option(${PropertyPrefix}MAKE_DYNAMIC "Create dynamically linked library instead of static library." OFF)
option(${PropertyPrefix}MAKE_MODULE "Create dynamically linked module instead of dynamically linked library." OFF)
option(${PropertyPrefix}BUILD_SAMPLES "Build Samples" ON)
option(${PropertyPrefix}BUILD_SHARED "Create dynamically linked library instead of statically linked library." ON)
option(${PropertyPrefix}BUILD_SAMPLES "Build Samples" OFF)
#================================================================================#
# Sources
#================================================================================#
# Configure Version Header
configure_file(
"${PROJECT_SOURCE_DIR}/cmake/version.hpp.in"
"${PROJECT_BINARY_DIR}/generated/version.hpp"
)
# Source Files
set(PROJECT_PUBLIC "")
list(APPEND PROJECT_PUBLIC
"include/datapath.hpp"
"include/error.hpp"
"include/bitmask.hpp"
"include/event.hpp"
"include/isocket.hpp"
"include/iserver.hpp"
"include/itask.hpp"
"include/waitable.hpp"
"include/permissions.hpp"
"include/threadpool.hpp"
)
set(PROJECT_PUBLIC_INCLUDES "")
set(PROJECT_PRIVATE "")
list(APPEND PROJECT_PRIVATE
"source/threadpool.cpp"
)
set(PROJECT_PRIVATE_INCLUDES "")
set(PROJECT_TEMPLATES "")
list(APPEND PROJECT_TEMPLATES
"${PROJECT_SOURCE_DIR}/cmake/version.hpp.in"
)
set(PROJECT_GENERATED "")
list(APPEND PROJECT_GENERATED
"${PROJECT_BINARY_DIR}/generated/version.hpp"
)
set(PROJECT_DATA "")
set(PROJECT_LIBRARIES "")
set(PROJECT_DEFINES "")
# Generic Source files
list(APPEND PROJECT_PUBLIC
"include/datapath/datapath.hpp"
"include/datapath/error.hpp"
"include/datapath/event.hpp"
"include/datapath/server.hpp"
"include/datapath/socket.hpp"
)
list(APPEND PROJECT_PUBLIC_INCLUDES
"${PROJECT_SOURCE_DIR}/include"
"${PROJECT_BINARY_DIR}/generated"
)
list(APPEND PROJECT_PRIVATE_INCLUDES
"${PROJECT_SOURCE_DIR}/include/datapath"
"${PROJECT_BINARY_DIR}/generated/datapath"
"${PROJECT_SOURCE_DIR}/source"
)
list(APPEND PROJECT_TEMPLATES
"cmake/config.hpp.in"
)
list(APPEND PROJECT_GENERATED
"${PROJECT_BINARY_DIR}/generated/datapath/config.hpp"
)
list(APPEND PROJECT_DATA
"README.md"
"LICENSE"
)
set(PROJECT_LIBRARIES "")
set(PROJECT_DEFINES "")
# Create config file.
set(GEN_VERSION_MAJOR ${VERSION_MAJOR})
set(GEN_VERSION_MINOR ${VERSION_MINOR})
set(GEN_VERSION_PATCH ${VERSION_PATCH})
set(GEN_VERSION_TWEAK ${VERSION_TWEAK})
set(GEN_VERSION_SUFFIX ${VERSION_SUFFIX})
set(GEN_VERSION_COMMIT ${VERSION_COMMIT})
if(BUILD_SHARED)
set(DATAPATH_SHARED_LIBRARY TRUE)
else()
set(DATAPATH_SHARED_LIBRARY FALSE)
endif()
configure_file(
"${PROJECT_SOURCE_DIR}/cmake/config.hpp.in"
"${PROJECT_BINARY_DIR}/generated/datapath/config.hpp"
@ONLY
)
# Platforms
if(WIN32)
@@ -199,17 +209,17 @@ if(WIN32)
)
list(APPEND PROJECT_PRIVATE
"source/windows/datapath.cpp"
"source/windows/overlapped.hpp"
"source/windows/overlapped.cpp"
"source/windows/socket.hpp"
"source/windows/socket.cpp"
"source/windows/server.hpp"
"source/windows/server.cpp"
"source/windows/task.hpp"
"source/windows/task.cpp"
"source/windows/utility.hpp"
"source/windows/waitable.cpp"
"source/windows/windows-overlapped.hpp"
"source/windows/windows-overlapped.cpp"
"source/windows/windows-socket.hpp"
"source/windows/windows-socket.cpp"
"source/windows/windows-client-socket.hpp"
"source/windows/windows-client-socket.cpp"
"source/windows/windows-server.hpp"
"source/windows/windows-server.cpp"
"source/windows/windows-server-socket.hpp"
"source/windows/windows-server-socket.cpp"
"source/windows/windows-utility.hpp"
)
elseif(APPLE)
# MacOSX
@@ -232,32 +242,28 @@ endif()
source_group(TREE "${PROJECT_SOURCE_DIR}" PREFIX "Data Files" FILES ${PROJECT_DATA})
source_group(TREE "${PROJECT_SOURCE_DIR}/cmake" PREFIX "Template Files" FILES ${PROJECT_TEMPLATES})
source_group(TREE "${PROJECT_BINARY_DIR}/generated" PREFIX "Generated Files" FILES ${PROJECT_GENERATED})
source_group(TREE "${PROJECT_SOURCE_DIR}/include" PREFIX "Exported Files" FILES ${PROJECT_PUBLIC})
# Filter Sources
set(_TMP_SOURCE ${PROJECT_PRIVATE})
list(FILTER _TMP_SOURCE INCLUDE REGEX "\.(c|cpp)$")
source_group(TREE "${PROJECT_SOURCE_DIR}/source" PREFIX "Source Files" FILES ${_TMP_SOURCE})
# Filter Headers
set(_TMP_HEADER ${PROJECT_PRIVATE})
list(FILTER _TMP_HEADER INCLUDE REGEX "\.(h|hpp)$")
source_group(TREE "${PROJECT_SOURCE_DIR}/source" PREFIX "Header Files" FILES ${_TMP_HEADER})
source_group(TREE "${PROJECT_SOURCE_DIR}/include" PREFIX "Public Files" FILES ${PROJECT_PUBLIC})
source_group(TREE "${PROJECT_SOURCE_DIR}/source" PREFIX "Private Files" FILES ${PROJECT_PRIVATE})
#================================================================================#
# Building
#================================================================================#
# Library definition
set(_BUILD_TYPE)
if(${PropertyPrefix}MAKE_DYNAMIC)
if(${PropertyPrefix}MAKE_MODULE)
set(_BUILD_TYPE MODULE)
# Define the target
set(_BUILD_TYPE "STATIC")
if(${PropertyPrefix}BUILD_SHARED)
set(_BUILD_TYPE "SHARED")
if(MSVC)
list(APPEND PROJECT_DEFINES
"DATAPATH_INTERFACE=__declspec(dllexport)"
)
else()
set(_BUILD_TYPE SHARED)
list(APPEND PROJECT_DEFINES
"DATAPATH_INTERFACE=__attribute__((dllexport))"
)
endif()
else()
set(_BUILD_TYPE STATIC)
endif()
add_library(${PROJECT_NAME} ${_BUILD_TYPE}
@@ -267,39 +273,24 @@ add_library(${PROJECT_NAME} ${_BUILD_TYPE}
${PROJECT_TEMPLATES}
${PROJECT_DATA}
)
# Clang
if("${PropertyPrefix}" STREQUAL "")
clang_format(
TARGETS ${PROJECT_NAME}
DEPENDENCY
VERSION 9.0.0
)
endif()
# Includes
target_include_directories(${PROJECT_NAME}
INTERFACE
"${PROJECT_SOURCE_DIR}/include"
PUBLIC
"${PROJECT_SOURCE_DIR}/include"
${PROJECT_PUBLIC_INCLUDES}
PRIVATE
"${PROJECT_SOURCE_DIR}/source"
${PROJECT_PRIVATE_INCLUDES}
)
# Defines
target_compile_definitions(${PROJECT_NAME}
PRIVATE ${PROJECT_DEFINES}
)
# Linking Directories
link_directories(
)
# Linking
target_link_libraries(${PROJECT_NAME}
${PROJECT_LIBRARIES}
)
set_target_properties(
${PROJECT_NAME}
PROPERTIES
CXX_STANDARD 17
CXX_EXTENSIONS OFF
)
# File Version
if(WIN32)
@@ -309,7 +300,7 @@ if(WIN32)
VERSION ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}.${PROJECT_VERSION_TWEAK}
SOVERSION ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}.${PROJECT_VERSION_TWEAK}
)
else()
elseif(UNIX AND NOT APPLE)
set_target_properties(
${PROJECT_NAME}
PROPERTIES
@@ -318,9 +309,22 @@ else()
)
endif()
# Clang
if("${PropertyPrefix}" STREQUAL "")
clang_format(
TARGETS ${PROJECT_NAME}
DEPENDENCY
VERSION 9.0.0
)
endif()
#================================================================================#
# Samples
#================================================================================#
if(${PropertyPrefix}BUILD_SAMPLES)
add_subdirectory(${PROJECT_SOURCE_DIR}/samples)
if(${PropertyPrefix}BUILD_TESTS)
add_subdirectory(tests)
endif()
if(${PropertyPrefix}BUILD_SAMPLES)
add_subdirectory(samples)
endif()
+2582
View File
File diff suppressed because it is too large Load Diff
+54
View File
@@ -0,0 +1,54 @@
/*
* Ultra fast IPC library written in C++
* Copyright (C) 2017 Michael Fabian Dirks
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
*/
#pragma once
#include <cstdint>
// Flags
#cmakedefine DATAPATH_SHARED_LIBRARY
// Interface
#if !defined(DATAPATH_INTERFACE) && defined(DATAPATH_SHARED_LIBRARY)
#if defined(_MSC_VER)
#define DATAPATH_INTERFACE __declspec(dllimport)
#else
#define DATAPATH_INTERFACE __attribute__((dllimport))
#endif
#endif
#if !defined(DATAPATH_INTERFACE)
#define DATAPATH_INTERFACE
#endif
// Version
#define DATAPATH_VERSION_MAJOR std::uint8_least8_t(@GEN_VERSION_MAJOR@)
#define DATAPATH_VERSION_MINOR std::uint8_least8_t(@GEN_VERSION_MINOR@)
#define DATAPATH_VERSION_PATCH std::uint8_least8_t(@GEN_VERSION_PATCH@)
#define DATAPATH_VERSION_TWEAK std::uint8_least8_t(@GEN_VERSION_TWEAK@)
#define DATAPATH_VERSION_SUFFIX "@GEN_VERSION_SUFFIX@"
#define DATAPATH_VERSION_COMMIT "@GEN_VERSION_COMMIT@"
#define DATAPATH_VERSION_STRING "@GEN_VERSION_MAJOR@.@GEN_VERSION_MINOR@.@GEN_VERSION_PATCH@.@GEN_VERSION_TWEAK@@GEN_VERSION_SUFFIX@@GEN_VERSION_COMMIT@"
#define DATAPATH_VERSION (std::uint8_least32_t(DATAPATH_VERSION_MAJOR) << 24) | (std::uint8_least32_t(DATAPATH_VERSION_MINOR) << 16) | (std::uint8_least32_t(DATAPATH_VERSION_PATCH) << 8) | (std::uint8_least32_t(DATAPATH_VERSION_TWEAK))
// Version Masks
// Detect compatibility changes (1.0 -> 2.0)
#define DATAPATH_VERSION_MASK_REALEASE 0xFF000000
// Detect release changes that may have added features (1.0 -> 1.1)
#define DATAPATH_VERSION_MASK_FEATURES 0xFFFF0000
// Detect release changes that may have fixed features (1.0.0 -> 1.0.1)
#define DATAPATH_VERSION_MASK_FIXES 0xFFFFFF00
-35
View File
@@ -1,35 +0,0 @@
/*
* Ultra fast IPC library written in C++
* Copyright (C) 2017 Michael Fabian Dirks
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
*/
#pragma once
#include <inttypes.h>
#define MAKE_VERSION(major,minor,patch,tweak) ((uint64_t(major) & 0xFFFFull) << 48ull) | ((uint64_t(minor) & 0xFFFFull) << 32ull) | ((uint64_t(patch) & 0xFFFFull) << 16ull) | ((uint64_t(patch) & 0xFFFFull))
const uint16_t PROJECT_VERSION_MAJOR = @PROJECT_VERSION_MAJOR@;
const uint16_t PROJECT_VERSION_MINOR = @PROJECT_VERSION_MINOR@;
const uint16_t PROJECT_VERSION_PATCH = @PROJECT_VERSION_PATCH@;
const uint16_t PROJECT_VERSION_TWEAK = @PROJECT_VERSION_TWEAK@;
const uint64_t PROJECT_VERSION = MAKE_VERSION(PROJECT_VERSION_MAJOR, PROJECT_VERSION_MINOR, PROJECT_VERSION_PATCH, PROJECT_VERSION_TWEAK);
const uint16_t PLUGIN_VERSION_MAJOR = @PROJECT_VERSION_MAJOR@;
const uint16_t PLUGIN_VERSION_MINOR = @PROJECT_VERSION_MINOR@;
const uint16_t PLUGIN_VERSION_PATCH = @PROJECT_VERSION_PATCH@;
const uint16_t PLUGIN_VERSION_TWEAK = @PROJECT_VERSION_TWEAK@;
const uint64_t PLUGIN_VERSION_FULL = (((uint64_t)(PLUGIN_VERSION_MAJOR & 0xFFFF) << 48ull) | ((uint64_t)(PLUGIN_VERSION_MINOR & 0xFFFF) << 32ull) | ((uint64_t)(PLUGIN_VERSION_PATCH) & 0xFFFFFFFF));
-45
View File
@@ -1,45 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <type_traits>
template<typename Enum>
struct enable_bitmask_operators {
static const bool enable = false;
};
template<typename Enum>
typename std::enable_if<enable_bitmask_operators<Enum>::enable, Enum>::type operator|(Enum lhs, Enum rhs)
{
using underlying = typename std::underlying_type<Enum>::type;
return static_cast<Enum>(static_cast<underlying>(lhs) | static_cast<underlying>(rhs));
}
template<typename Enum>
typename std::enable_if<enable_bitmask_operators<Enum>::enable, Enum>::type operator&(Enum lhs, Enum rhs)
{
using underlying = typename std::underlying_type<Enum>::type;
return static_cast<Enum>(static_cast<underlying>(lhs) & static_cast<underlying>(rhs));
}
#define ENABLE_BITMASK_OPERATORS(x) \
template<> \
struct enable_bitmask_operators<x> { \
static const bool enable = true; \
};
-31
View File
@@ -1,31 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <string>
#include "error.hpp"
#include "iserver.hpp"
#include "isocket.hpp"
#include "permissions.hpp"
namespace datapath {
datapath::error connect(std::shared_ptr<datapath::isocket>& socket, std::string path);
datapath::error host(std::shared_ptr<datapath::iserver>& server, std::string path,
datapath::permissions permissions, size_t max_clients = 0);
} // namespace datapath
@@ -17,26 +17,24 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <vector>
#include "error.hpp"
#include "event.hpp"
#include "waitable.hpp"
#include "datapath/config.hpp"
#include "datapath/error.hpp"
namespace datapath {
class itask : public waitable {
public /*event*/:
datapath::event<datapath::error> _on_failure;
constexpr std::size_t maximum_packet_size = 1048576;
datapath::event<datapath::error, const std::vector<char>&> _on_success;
class server;
class socket;
public:
virtual datapath::error cancel() = 0;
typedef std::vector<std::uint8_t> io_data_t;
typedef std::shared_ptr<void> io_callback_data_t;
virtual bool is_completed() = 0;
virtual size_t length() = 0;
virtual const std::vector<char>& data() = 0;
};
typedef std::function<void(std::shared_ptr<::datapath::socket>, ::datapath::error, const ::datapath::io_data_t&,
::datapath::io_callback_data_t)>
io_callback_t;
} // namespace datapath
@@ -21,28 +21,34 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
namespace datapath {
enum class error : int32_t {
// Unknown
// Unknown error.
Unknown = -1,
// Success
// Operation was successful.
Success,
// Failure (Generic)
// Operation failed with one or more recoverable errors.
Failure,
// Failure (Critical Generic)
// Operation failed with one or more unrecoverable errors. The object is now in an undetermined state.
CriticalFailure,
// Socket Closed
Closed,
// Timed Out
// Operation timed out.
TimedOut,
// Invalid Path
// Operation is not supported.
NotSupported,
// Socket Closed
SocketClosed,
// The given path is invalid.
InvalidPath,
// Operation Not Supported
NotSupported,
// The header sent by the remote was malformed or corrupted.
BadHeader,
// The size included in the header is too big or invalid.
BadSize,
};
}
@@ -21,6 +21,8 @@
#include <functional>
#include <list>
#include "datapath.hpp"
namespace datapath {
template<typename... _args>
class event {
@@ -53,6 +55,7 @@ namespace datapath {
event<_args...>& operator=(event<_args...>&& rhs)
{
std::swap(_listeners, rhs._listeners);
return *this;
};
public /* Status */:
+98
View File
@@ -0,0 +1,98 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <chrono>
#include <cstdint>
#include <memory>
#include <string>
#include "datapath.hpp"
#include "event.hpp"
#include "socket.hpp"
namespace datapath {
class DATAPATH_INTERFACE server {
public:
/*!
* Create a new server for inter-process communications.
*
* @return A new server object ready to be used.
* @throws TODO
*/
static std::shared_ptr<::datapath::server> create();
public /* Configuration */:
/*!
* Changes the path for the server to listen on. The path must be unique and not in use by another process or
* server. This path is what sockets have to connect to.
*
* @param path Unique path for the server to be listening on.
* @throws TODO
*/
virtual void set_path(std::string path) = 0;
public /* State Change */:
/*!
* Start listening for connections from clients on the set path, closing the server if it was open at the time
* of calling this.
*
* @throws TODO
*/
virtual void open() = 0;
/*!
* Stop listening for connections from clients. Does nothing if already closed.
*
* @throws TODO
*/
virtual void close() = 0;
public /* State Checking */:
/*!
* Check if the server is currently open or not.
*
* @return `true` if the server is open, otherwise returns `false`.
*/
virtual bool is_open() = 0;
public /* Concurrency */:
/*!
* Perform any work that is pending.
*
* By default the server does not create any threads to do work for it, in order to maintain compatibility with
* all platforms. So you have to call this from your own code in order to make anything actually happen.
*
* @param time_limit Optional time limit for the work (only if platform supports it).
* @throws TODO
*/
virtual void work(std::chrono::milliseconds time_limit = std::chrono::milliseconds(0xFFFFFFFFFF)) = 0;
public:
struct {
/*!
* Event for new client connections, called whenever a client is connecting.
*
* @param bool& Set to 'true' to allow the connection, or 'false' to reject.
* @param std::shared_ptr<datapath::socket> The actual socket, must be managed.
* @return void
*/
::datapath::event<bool&, std::shared_ptr<::datapath::socket>> connected;
} events;
};
} // namespace datapath
+123
View File
@@ -0,0 +1,123 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <memory>
#include <string>
#include "datapath.hpp"
#include "event.hpp"
namespace datapath {
class DATAPATH_INTERFACE socket {
public:
/*!
* Create a new socket for inter-process communications.
*
* @return A new socket object ready to be used.
*/
static std::shared_ptr<::datapath::socket> create();
public /* Configuration */:
/*!
* Changes the path for the socket to connect to. This path must have a server listening on it.
*
* @param path Unique path for the socket to connect to.
* @throws TODO
*/
virtual void set_path(std::string path) = 0;
public /* State Change */:
/*!
* Try to connect to the given path, which must have a server listening on it. If the socket was already
* connected, it will first disconnect and then connect again.
*
* @throws TODO
*/
virtual void open() = 0;
/*!
* Disconnect from the currently connected server, if there are any.
*
* @throws TODO
*/
virtual void close() = 0;
public /* State Checking */:
/*!
* Check if the socket is currently open or not.
*
* @return `true` if the scoket is open, otherwise returns `false`.
*/
virtual bool is_open() = 0;
public /* Concurrency */:
/*!
* Perform any work that is pending.
*
* By default the socket does not create any threads to do work for it, in order to maintain compatibility with
* all platforms. So you have to call this from your own code in order to make anything actually happen.
*
* Note: This has no effect when called on a socket created from a server.
*
* @param time_limit Optional time limit for the work (only if platform supports it).
* @throws TODO
*/
virtual void work(std::chrono::milliseconds time_limit = std::chrono::milliseconds(0xFFFFFFFFFF)) = 0;
public /* Input/Output */:
/*!
* Enqueue an asynchronous read operation from this socket.
*
* @param callback The callback function to call when this read operation is executed.
* @param callback_data Data to pass to the callback.
* @throws TODO
**/
virtual void read(::datapath::io_callback_t callback, ::datapath::io_callback_data_t callback_data) = 0;
/*!
* Enqueue an asynchronous write operation to this socket.
*
* @param data Data to write to the socket.
* @param callback The callback function to call when this write operation is executed.
* @param callback_data Data to pass to the callback.
* @throws TODO
**/
virtual void write(const io_data_t& data, ::datapath::io_callback_t callback,
::datapath::io_callback_data_t callback_data) = 0;
/*!
* Enqueue an asynchronous write operation to this socket.
*
* @param data Data to write to the socket.
* @param data_length Length of the data to write to the socket.
* @param callback The callback function to call when this write operation is executed.
* @param callback_data Data to pass to the callback.
* @throws TODO
**/
virtual void write(const std::uint8_t* data, const size_t data_length, ::datapath::io_callback_t callback,
::datapath::io_callback_data_t callback_data) = 0;
public:
struct {
::datapath::event<::datapath::error, std::shared_ptr<::datapath::socket>> opened;
::datapath::event<::datapath::error, std::shared_ptr<::datapath::socket>> closed;
} events;
};
} // namespace datapath
-41
View File
@@ -1,41 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <memory>
#include "error.hpp"
#include "event.hpp"
#include "isocket.hpp"
namespace datapath {
class iserver {
public /*event*/:
/** Accepted Connection Event
* This event is called if a new connection is pending evaluation.
*
* @param bool& Set to true to accept, false to decline.
* @param std::shared_ptr<datapath::isocket> Socket.
* @return void
*/
datapath::event<bool&, std::shared_ptr<datapath::isocket>> on_accept;
public:
virtual datapath::error close() = 0;
};
} // namespace datapath
-38
View File
@@ -1,38 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include "error.hpp"
#include "event.hpp"
#include "itask.hpp"
namespace datapath {
class isocket {
public /*events*/:
datapath::event<const std::vector<char>&> on_message;
datapath::event<> on_close;
public:
virtual bool good() = 0;
virtual datapath::error close() = 0;
virtual datapath::error write(std::shared_ptr<datapath::itask>& task, const std::vector<char>& data) = 0;
};
} // namespace datapath
-26
View File
@@ -1,26 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <cinttypes>
#include "bitmask.hpp"
namespace datapath {
enum class permissions : int8_t { None, User, Group, World, Reserved };
ENABLE_BITMASK_OPERATORS(datapath::permissions);
} // namespace datapath
-73
View File
@@ -1,73 +0,0 @@
/*
* Low Latency IPC Library for high-speed traffic
* Copyright (C) 2017-2019 Michael Fabian Dirks <info@xaymar.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
*/
#pragma once
#include <condition_variable>
#include <functional>
#include <list>
#include <map>
#include <mutex>
#include <queue>
#include <thread>
#include "error.hpp"
namespace datapath {
namespace threadpool {
typedef uint64_t affinity_t;
constexpr affinity_t default_mask = std::numeric_limits<affinity_t>::max();
struct task {
std::function<void()> function;
affinity_t mask = default_mask;
};
class pool {
struct worker {
affinity_t affinity;
bool should_stop = false;
std::thread thread;
std::mutex mutex;
std::queue<std::shared_ptr<task>> queue;
std::condition_variable signal;
worker(affinity_t affinity);
~worker();
void runner();
void clear();
void push(std::shared_ptr<task> task);
};
std::map<affinity_t, std::shared_ptr<worker>> _workers;
public:
pool();
~pool();
bool push(std::shared_ptr<task> task);
void clear(affinity_t mask = default_mask);
};
} // namespace threadpool
} // namespace datapath
-63
View File
@@ -1,63 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <chrono>
#include <vector>
#include "error.hpp"
#include "event.hpp"
namespace datapath {
class waitable {
public /*events*/:
datapath::event<datapath::error> _on_wait_error;
datapath::event<datapath::error> _on_wait_success;
public:
virtual void* get_waitable() = 0;
inline datapath::error wait(std::chrono::nanoseconds duration = std::chrono::nanoseconds(0))
{
return datapath::waitable::wait(this, duration);
}
public /*static*/:
static datapath::error wait(datapath::waitable* obj,
std::chrono::nanoseconds duration = std::chrono::nanoseconds(0));
static datapath::error wait(datapath::waitable** objs, size_t count,
std::chrono::nanoseconds duration = std::chrono::nanoseconds(0));
static inline datapath::error wait(std::vector<datapath::waitable*> objs,
std::chrono::nanoseconds duration = std::chrono::nanoseconds(0))
{
return datapath::waitable::wait(objs.data(), objs.size(), duration);
}
static datapath::error wait_any(datapath::waitable** objs, size_t count, size_t& index,
std::chrono::nanoseconds duration = std::chrono::nanoseconds(0));
static inline datapath::error wait_any(std::vector<datapath::waitable*> objs, size_t& index,
std::chrono::nanoseconds duration = std::chrono::nanoseconds(0))
{
return datapath::waitable::wait_any(objs.data(), objs.size(), index, duration);
}
};
} // namespace datapath
+102
View File
@@ -0,0 +1,102 @@
# Implementations
## Windows
Client/Server Socket:
- Constant reads on a thread?
- Solves the needs to queue up reads.
- Adds slight overhead and complexity.
- Doesn't work when there is no callback for reading.
- Not really the most optimal solution
- Can writes be happening in parallel?
- In that case clients would benefit from IOCP with no limit.
- Otherwise go with a queue approach (further reduces delay)
- Allow queueing reads? (can't have more than one read at at time)
- Allow multiple parallel writes?
- Read & Write async?
Client Socket:
- IOCP with <Hardware Concurrency> Threads, or limit to 2?
- Hardware Concurrency is only useful when more than one write is happening (but that may not work anyway, needs testing
- Async connect or sync connect?
- Former is modern and expected.
Server & Socket:
- IOCP with <Hardware Concurrency> Threads, shared with Sockets
- Connect on Server
- Without callback, defaults to instantly disconnect clients.
- Callback overrides that behavior.
- Should socket status be in Socket or Server? Or both?
- If in server or both, some sort of callback to the server has to be done.
- If only in socket, server still needs to track the backlog of sockets.
- Socket calls back into server on disconnect to update status?
# Structures
```
Task {
Events {
completed(bytes, void* data)
}
static Task* create();
}
```
Basics:
- Server-Client-Model, One to Many connection (one Server may have any number of Clients)
- Everything is asynchronous, nothing should be synchronous to reduce CPU locks.
- Multiplatform (Linux, Mac, Windows, Android?, iOS?)
- Only handles data, there is no code for RPC.
Reading and Writing:
- These will return an object that is an asynchronous request, which may complete immediately or in the near future.
- Main code will only keep a soft reference, and the user will be holding the hard reference.
- Losing the hard reference means ignoring this request and simply using the next one, if there is one. If not, then different behavior is invoked depending on the task:
- If a read succeeded but the hard reference to the actual task was lost, store it temporarily for the next read call.
- If a write was being calculated, but hard reference is lost, we abort right there and continue with the next one.
- Windows: Writing to the socket does not require any synchronization, reading however does due to the varying packet sizes and the need for a header.
Server:
- Can accept new connections (callback with bool return value?)
- Can not connect by itself, can only accept connections.
- Can't read or write data without a connection.
- Doesn't care who or what is connected.
Client/Handler:
- Can disconnect.
- Can read and write data.
- Can't reconnect.
Client:
- Can connect.
Task Model:
- read, write and accept all return "tasks".
- Tasks can be cancelled, waited on and the result can be used later.
- Read, Write use IOTasks.
- Accept uses AcceptTask.
- Due to the asynchronous model, no data is kept around on the socket or server if possible. All associated data with the task is lost on dereferencing it.
- THIS WONT WORK if the task is killed while the OS is using it - it will cause segfaults. Need another solution!
IOTask (Windows)
- OVERLAPPED
- std::vector<char> data (buffer containing read/write data)
- completed() returns status (Success, Unknown or Failure).
- data() returns the data std::vector.
- wait() for waiting endlessly.
- wait_for() for waiting for a specific amount of time.
- wait_until() for waiting until a specific moment in time.
+1 -1
View File
@@ -1,2 +1,2 @@
add_subdirectory(benchmark)
#add_subdirectory(benchmark)
add_subdirectory(single-process-ipc)
+1 -1
View File
@@ -6,7 +6,7 @@ SET(PROJECT_SOURCES
)
SET(PROJECT_LIBRARIES
datapath
DataPath
)
# Includes
+143 -248
View File
@@ -16,293 +16,188 @@ You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include <cinttypes>
#include <datapath/datapath.hpp>
#include <datapath/server.hpp>
#include <datapath/socket.hpp>
#include <atomic>
#include <cstdarg>
#include <functional>
#include <cstdio>
#include <iostream>
#include <map>
#include <list>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "datapath.hpp"
std::vector<std::string> messages = {
"This is an introduction to the basics of IPC. It will simply send strings to the other side, with the other side "
"verifying the string and then sending that string back unmodified.",
"As this is just a sample, there is no need for this to be as accurate as possible."};
constexpr const char* socket_path = "sample-simple-process-ipc";
static auto log_time = std::chrono::high_resolution_clock::now();
void log(std::string format, ...)
void do_log(const char* format, ...)
{
// Time
uint32_t hour, minute, second;
uint64_t nanosecond;
static std::mutex lock;
static std::vector<char> buffer;
std::lock_guard<std::mutex> lg(lock);
{
auto log_now = std::chrono::high_resolution_clock::now();
va_list args;
va_start(args, format);
va_list args2;
va_copy(args2, args);
nanosecond = (log_now - log_time).count();
second = nanosecond / 1000000000;
nanosecond %= 1000000000;
minute = second / 60;
second %= 60;
hour = minute / 60;
minute %= 60;
}
buffer.resize(std::vsnprintf(nullptr, 0, format, args));
va_end(args);
std::vsnprintf(buffer.data(), buffer.size(), format, args2);
va_end(args2);
// Message
std::vector<char> msg;
{
va_list args;
va_start(args, format);
msg.resize(vsnprintf(nullptr, 0, format.c_str(), args));
vsnprintf(msg.data(), msg.size(), format.c_str(), args);
va_end(args);
}
printf("[%02" PRIu32 ":%02" PRIu32 ":%02" PRIu32 ".%09" PRIu64 "] %.*s\n", hour, minute, second, nanosecond,
(int)msg.size(), msg.data());
std::cout.write(buffer.data(), buffer.size());
}
// For simplicity, client and server will be isolated.
class server {
std::shared_ptr<datapath::iserver> dp;
std::shared_ptr<::datapath::server> _server;
std::shared_ptr<::datapath::socket> _conn;
struct client {
std::shared_ptr<datapath::isocket> dp;
server* parent;
std::mutex task_lock;
std::list<std::shared_ptr<datapath::itask>> tasks;
struct {
std::thread task;
bool shutdown;
} watcher;
void _watcher()
{
while (!this->watcher.shutdown) {
size_t wait_cnt = 0;
{
std::unique_lock<std::mutex> ul(this->task_lock);
wait_cnt = this->tasks.size();
}
if (wait_cnt == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} else {
std::vector<datapath::waitable*> waits;
{
std::unique_lock<std::mutex> ul(this->task_lock);
waits.reserve(this->tasks.size());
for (auto task : this->tasks) {
waits.push_back(&(*task));
}
}
size_t index = 0;
datapath::error ec =
datapath::waitable::wait_any(waits, index, std::chrono::milliseconds(0));
if (ec != datapath::error::Success) {
ec = datapath::waitable::wait_any(waits, index,
std::chrono::milliseconds(1));
}
if (ec == datapath::error::Success) {
std::unique_lock<std::mutex> ul(this->task_lock);
std::list<std::shared_ptr<datapath::itask>>::iterator task;
for (auto itr = this->tasks.begin(); itr != this->tasks.end(); itr++) {
if (&*(*itr) == waits[index]) {
task = itr;
break;
}
}
this->tasks.erase(task);
}
}
}
}
void handle_close()
{
std::unique_lock<std::mutex> ul(this->parent->socket_lock);
this->parent->sockets.erase(dp);
}
void handle_message(const std::vector<char>& data)
{
std::shared_ptr<datapath::itask> task;
datapath::error ec = dp->write(task, data);
if (ec == datapath::error::Success) {
std::unique_lock<std::mutex> ul(this->task_lock);
tasks.push_back(task);
}
}
public:
client(server* parent, std::shared_ptr<datapath::isocket> socket)
{
this->parent = parent;
dp = socket;
socket->on_close.add(std::bind(&server::client::handle_close, this));
socket->on_message.add(std::bind(&server::client::handle_message, this, std::placeholders::_1));
this->watcher.shutdown = false;
this->watcher.task = std::thread(std::bind(&server::client::_watcher, this));
}
~client()
{
this->watcher.shutdown = true;
if (this->watcher.task.joinable()) {
this->watcher.task.join();
}
dp->close();
};
};
std::mutex socket_lock;
std::map<std::shared_ptr<datapath::isocket>, std::shared_ptr<server::client>> sockets;
private:
void handle_accept(bool& should_accept, std::shared_ptr<datapath::isocket> socket)
{
if (!socket->good()) {
should_accept = false;
return;
}
{
std::unique_lock<std::mutex> ul(this->socket_lock);
this->sockets.insert({socket, std::make_shared<server::client>(this, socket)});
}
}
std::list<std::thread> _threads;
public:
server(std::string path)
server()
{
datapath::error ec = datapath::host(dp, path,
datapath::permissions::User | datapath::permissions::Group
| datapath::permissions::World);
if (ec != datapath::error::Success) {
throw std::exception();
_server = ::datapath::server::create();
_server->set_path(socket_path);
_server->events.connected +=
std::bind(&::server::on_connected, this, std::placeholders::_1, std::placeholders::_2);
_server->open();
for (size_t idx = 0, edx = 4; idx < edx; idx++) {
_threads.push_back(std::move(std::thread(std::bind(&::server::work, this))));
}
dp->on_accept.add(
std::bind(&server::handle_accept, this, std::placeholders::_1, std::placeholders::_2));
do_log("[SERVER] Listening on '%s'...\n", socket_path);
}
~server()
{
if (dp) {
dp->close();
do_log("[SERVER] Stopping...\n");
_server->close();
for (auto& thread : _threads) {
thread.join();
}
do_log("[SERVER] Stopped.\n");
}
void work()
{
do_log("[SERVER/THREAD] Working...\n");
while (_server->is_open()) {
_server->work();
}
do_log("[SERVER/THREAD] Work done.\n");
}
void on_connected(bool& allow, std::shared_ptr<::datapath::socket> socket)
{
allow = true;
_conn = socket;
_conn->events.closed +=
std::bind(&::server::on_disconnected, this, std::placeholders::_1, std::placeholders::_2);
_conn->read(std::bind(&::server::on_read_completed, this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4),
nullptr);
do_log("[SERVER] New client connected!\n");
}
void on_disconnected(::datapath::error, std::shared_ptr<::datapath::socket>)
{
do_log("[SERVER] Client left us.\n");
}
void on_read_completed(std::shared_ptr<::datapath::socket>, ::datapath::error, const ::datapath::io_data_t& data,
::datapath::io_callback_data_t)
{
do_log("[SERVER] Client sent %llu bytes, with content: %.*s\n", data.size(), data.size(), data.data());
_conn->read(std::bind(&::server::on_read_completed, this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4),
nullptr);
}
};
class client {
std::shared_ptr<datapath::isocket> dp;
struct {
std::thread task;
bool shutdown;
} watcher;
private:
void _watcher()
{
std::shared_ptr<datapath::itask> task;
while (!this->watcher.shutdown) {
auto start = std::chrono::high_resolution_clock::now();
if (!task) {
std::vector<char> msg;
msg.resize(sizeof(uint64_t));
reinterpret_cast<uint64_t&>(msg[0]) =
std::chrono::high_resolution_clock::now().time_since_epoch().count();
datapath::error ec = dp->write(task, msg);
}
datapath::error ec = task->wait(std::chrono::milliseconds(0));
if (ec != datapath::error::Success) {
ec = task->wait(std::chrono::milliseconds(100));
}
if (ec == datapath::error::Success) {
log("Sent Message, error code %lu...", uint32_t(ec));
task.reset();
} else if (ec != datapath::error::TimedOut) {
log("Failed, error code %lu...", uint32_t(ec));
task.reset();
}
auto end = std::chrono::high_resolution_clock::now();
auto dur = end - start;
if (!task) {
if (dur < std::chrono::milliseconds(250)) {
std::this_thread::sleep_for(std::chrono::milliseconds(250) - dur);
}
}
}
}
void handle_message(const std::vector<char>& data)
{
// Message is a timestamp of when it was sent.
uint64_t cur = std::chrono::high_resolution_clock::now().time_since_epoch().count();
uint64_t msg = reinterpret_cast<const uint64_t&>(data[0]);
uint64_t dlt = cur - msg;
log("Message RTT of %llu ns.", dlt);
}
void handle_close()
{
this->watcher.shutdown = true;
}
void close()
{
this->watcher.shutdown = true;
if (this->watcher.task.joinable()) {
this->watcher.task.join();
}
}
std::shared_ptr<::datapath::socket> _client;
std::list<std::thread> _threads;
std::vector<char> _data;
std::atomic_bool _stop = false;
public:
client(std::string path)
client()
{
datapath::error ec = datapath::connect(dp, path);
if (ec != datapath::error::Success) {
throw std::exception();
const char* str = "Hello World";
_data.resize(strlen(str));
memcpy(_data.data(), str, _data.size());
do_log("[CLIENT] Connecting to '%s'...\n", socket_path);
_client = ::datapath::socket::create();
_client->set_path(socket_path);
_client->events.opened +=
std::bind(&::client::on_connected, this, std::placeholders::_1, std::placeholders::_2);
for (size_t idx = 0, edx = 4; idx < edx; idx++) {
_threads.push_back(std::move(std::thread(std::bind(&::client::work, this))));
}
this->watcher.shutdown = false;
this->watcher.task = std::thread(std::bind(&client::_watcher, this));
dp->on_close.add(std::bind(&client::handle_close, this));
dp->on_message.add(std::bind(&client::handle_message, this, std::placeholders::_1));
_client->open();
_client->write(reinterpret_cast<uint8_t*>(_data.data()), _data.size(),
std::bind(&::client::on_write_completed, this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4),
nullptr);
}
~client()
{
close();
do_log("[CLIENT] Stopping...\n");
_client->close();
_stop = true;
for (auto& thread : _threads) {
thread.join();
}
do_log("[CLIENT] Stopped.\n");
}
void work()
{
do_log("[CLIENT/THREAD] Working...\n");
while (!_stop) {
_client->work();
}
do_log("[CLIENT/THREAD] Work done.\n");
}
void on_connected(::datapath::error, std::shared_ptr<::datapath::socket>)
{
do_log("[CLIENT] We are in!\n");
for (size_t idx = 0; idx < 100; idx++) {
_client->write(reinterpret_cast<uint8_t*>(_data.data()), _data.size(),
std::bind(&::client::on_write_completed, this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4),
nullptr);
}
}
void on_write_completed(std::shared_ptr<::datapath::socket>, ::datapath::error, const ::datapath::io_data_t& data,
::datapath::io_callback_data_t)
{
do_log("[CLIENT] Sent %llu bytes with content: %.*s\n", data.size(), data.size(), data.data());
}
};
int main(int argc, const char* argv[])
{
std::shared_ptr<server> myServer = std::make_shared<server>("single-process-ipc");
std::shared_ptr<client> myClient = std::make_shared<client>("single-process-ipc");
std::cin.get();
myClient.reset();
myServer.reset();
try {
auto my_server = std::make_shared<::server>();
std::this_thread::sleep_for(std::chrono::milliseconds(200));
try {
auto my_client = std::make_shared<::client>();
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
} catch (std::exception const& ex) {
std::cerr << ex.what() << std::endl;
return 1;
}
} catch (std::exception const& ex) {
std::cerr << ex.what() << std::endl;
return 1;
}
return 0;
}
-155
View File
@@ -1,155 +0,0 @@
/*
* Low Latency IPC Library for high-speed traffic
* Copyright (C) 2017-2019 Michael Fabian Dirks <info@xaymar.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
*/
#include "threadpool.hpp"
#ifdef _WIN32
#include <windows.h>
#endif
datapath::threadpool::pool::worker::worker(affinity_t affinity) : affinity(affinity), should_stop(false)
{
std::unique_lock<std::mutex> qlock(this->mutex);
this->thread = std::thread(&datapath::threadpool::pool::worker::runner, this);
}
datapath::threadpool::pool::worker::~worker()
{
this->should_stop = true;
this->signal.notify_all();
if (this->thread.joinable()) {
this->thread.join();
}
}
void datapath::threadpool::pool::worker::runner()
{
std::shared_ptr<datapath::threadpool::task> my_task;
// Assign affinity
#ifdef _WIN32
SetThreadAffinityMask(reinterpret_cast<HANDLE>(this->thread.native_handle()), this->affinity);
#else
#endif
while (!this->should_stop) {
{ // Grab any available work.
std::unique_lock<std::mutex> lock(this->mutex);
if (this->queue.size() > 0)
my_task = this->queue.front();
}
if (my_task) // Execute work if we have any.
if (my_task->function)
my_task->function();
{ // Remove it from the queue and wait.
std::unique_lock<std::mutex> slock(this->mutex);
if (my_task) {
this->queue.pop();
my_task.reset();
}
if (this->queue.size() == 0) {
this->signal.wait(slock, [this]() { return (this->should_stop) || (this->queue.size() > 0); });
}
}
}
}
void datapath::threadpool::pool::worker::clear()
{
std::unique_lock<std::mutex> slock(this->mutex);
while (this->queue.size() > 0)
this->queue.pop();
}
void datapath::threadpool::pool::worker::push(std::shared_ptr<task> task)
{
{
std::unique_lock<std::mutex> slock(this->mutex);
this->queue.push(task);
}
this->signal.notify_all();
}
datapath::threadpool::pool::pool()
{
// Spawn x number of threads for working.
uint64_t num_hw_concurrency = std::thread::hardware_concurrency();
for (uint64_t idx = 0; idx < num_hw_concurrency; idx++) {
auto worker = std::make_shared<datapath::threadpool::pool::worker>(1 << idx);
this->_workers.insert({idx, worker});
}
}
datapath::threadpool::pool::~pool()
{
this->_workers.clear();
}
bool datapath::threadpool::pool::push(std::shared_ptr<task> task)
{
// Early-Exit tests.
/// Check for null or invalid tasks.
if (!task) {
throw std::invalid_argument("task must not be nullptr");
}
if (!task->function) {
throw std::invalid_argument("task->function must not be nullptr");
}
/// Check for invalid affinity masks.
if ((task->mask & (this->_workers.size() - 1)) == 0) {
throw std::invalid_argument("mask does not fit any thread");
}
affinity_t lowest_id;
size_t lowest_count = std::numeric_limits<size_t>::max();
for (auto kv : _workers) {
if ((kv.second->affinity & task->mask) == 0) {
continue;
}
std::unique_lock<std::mutex> lock(kv.second->mutex);
if (kv.second->queue.size() < lowest_count) {
lowest_id = kv.first;
lowest_count = kv.second->queue.size();
}
}
if (lowest_count == std::numeric_limits<size_t>::max()) {
return false;
}
this->_workers[lowest_id]->push(task);
return true;
}
void datapath::threadpool::pool::clear(affinity_t mask)
{
// Early-Exit tests.
if ((mask & (this->_workers.size() - 1)) == 0) {
throw std::invalid_argument("mask does not fit any thread");
}
for (auto kv : _workers) {
if ((kv.second->affinity & mask) == 0) {
continue;
}
kv.second->clear();
}
}
-32
View File
@@ -1,32 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "datapath.hpp"
#include "windows/server.hpp"
#include "windows/socket.hpp"
datapath::error datapath::connect(std::shared_ptr<datapath::isocket>& socket, std::string path)
{
return datapath::windows::socket::connect(socket, path);
}
datapath::error datapath::host(std::shared_ptr<datapath::iserver>& server, std::string path,
datapath::permissions permissions, size_t max_clients)
{
return datapath::windows::server::host(server, path, permissions, max_clients);
}
-98
View File
@@ -1,98 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "overlapped.hpp"
datapath::windows::overlapped::overlapped() : overlapped_ptr(nullptr), data(nullptr)
{
size_t memory_size = sizeof(OVERLAPPED) + sizeof(void*);
buffer.resize(memory_size);
reinterpret_cast<void*&>(buffer[sizeof(OVERLAPPED)]) = this;
handle = INVALID_HANDLE_VALUE;
// Initialize OVERLAPPED
overlapped_ptr = &reinterpret_cast<OVERLAPPED&>(buffer[0]);
memset(overlapped_ptr, 0, sizeof(OVERLAPPED));
overlapped_ptr->hEvent = CreateEventW(NULL, FALSE, FALSE, NULL);
}
datapath::windows::overlapped::~overlapped()
{
cancel();
buffer.clear();
}
OVERLAPPED* datapath::windows::overlapped::get_overlapped()
{
return this->overlapped_ptr;
}
HANDLE datapath::windows::overlapped::get_handle()
{
return this->handle;
}
void datapath::windows::overlapped::set_handle(HANDLE handle)
{
this->handle = handle;
}
void* datapath::windows::overlapped::get_data()
{
return this->data;
}
void datapath::windows::overlapped::set_data(void* data)
{
this->data = data;
}
void datapath::windows::overlapped::cancel()
{
if (overlapped_ptr) {
CancelIoEx(handle, overlapped_ptr);
ResetEvent(overlapped_ptr->hEvent);
CloseHandle(overlapped_ptr->hEvent);
}
}
bool datapath::windows::overlapped::is_completed()
{
if (overlapped_ptr) {
return HasOverlappedIoCompleted(overlapped_ptr);
}
return false;
}
void datapath::windows::overlapped::reset()
{
cancel();
if (overlapped_ptr) {
overlapped_ptr->Internal = 0;
overlapped_ptr->InternalHigh = 0;
overlapped_ptr->Offset = 0;
overlapped_ptr->OffsetHigh = 0;
overlapped_ptr->Pointer = 0;
}
}
void* datapath::windows::overlapped::get_waitable()
{
return reinterpret_cast<void*>(overlapped_ptr->hEvent);
}
-309
View File
@@ -1,309 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "server.hpp"
#include "socket.hpp"
#include "utility.hpp"
// Buffer Size that Windows just ignores, for the most part.
#define WIN_BUFFER_SIZE 64 * 1024 * 1024
#define WIN_WAIT_TIME 100
#define WIN_BACKLOG_NUM 8
datapath::error datapath::windows::server::create(std::string path, datapath::permissions permissions,
size_t max_clients)
{
// If old sockets are available, close them.
this->close();
// Apply options
this->max_clients = max_clients;
this->path = path;
// Generate Security Attributes.
std::memset(&this->security_attributes, 0, sizeof(SECURITY_ATTRIBUTES));
this->security_attributes.nLength = sizeof(SECURITY_ATTRIBUTES);
this->security_attributes.lpSecurityDescriptor = nullptr;
this->security_attributes.bInheritHandle = true;
// TODO: Respect permissions.
// Spawn x backlog connections
// TODO: Add parameter for this.
for (size_t n = 0; n < WIN_BACKLOG_NUM; n++) {
HANDLE handle = _create_socket(path, n == 0);
if (handle == INVALID_HANDLE_VALUE) {
// Clean up again.
this->close();
return datapath::error::CriticalFailure;
}
{
std::unique_lock<std::mutex> ul(this->lock);
sockets.push_back(handle);
waiting_sockets.push_back(handle);
}
}
// Watcher Thread
{
std::unique_lock<std::mutex> ul(this->watcher.lock);
this->watcher.shutdown = false;
this->watcher.task = std::thread(std::bind(&datapath::windows::server::_watcher, this));
}
is_created = true;
return datapath::error::Success;
}
HANDLE datapath::windows::server::_create_socket(std::string path, bool initial)
{
if (!datapath::windows::utility::make_pipe_path(path)) {
return INVALID_HANDLE_VALUE;
}
std::wstring wpath = datapath::windows::utility::make_wide_string(path);
DWORD file_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_WRITE_THROUGH | FILE_FLAG_OVERLAPPED;
if (initial) {
file_flags |= FILE_FLAG_FIRST_PIPE_INSTANCE;
}
DWORD pipe_flags = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
HANDLE handle = CreateNamedPipeW(wpath.c_str(), file_flags, pipe_flags, PIPE_UNLIMITED_INSTANCES, WIN_BUFFER_SIZE,
WIN_BUFFER_SIZE, WIN_WAIT_TIME, &this->security_attributes);
return handle;
}
void datapath::windows::server::_watcher()
{
std::map<HANDLE, std::shared_ptr<datapath::windows::overlapped>> ovmap;
while (!this->watcher.shutdown) {
// Wait for any overlapped objects.
if (ovmap.size() > 0) {
// No lock as we aren't touching any list or map yet.
std::vector<datapath::waitable*> waits;
waits.reserve(ovmap.size());
for (auto kv : ovmap) {
waits.push_back(&(*kv.second));
}
size_t index = 0;
datapath::error ec = datapath::waitable::wait_any(waits, index, std::chrono::milliseconds(0));
if (ec != datapath::error::Success) {
datapath::waitable::wait_any(waits, index, std::chrono::milliseconds(1));
}
}
// Update list of overlappeds to track.
{
std::unique_lock<std::mutex> ul(this->lock);
std::list<HANDLE> to_kill;
for (auto itr = this->waiting_sockets.begin(); itr != this->waiting_sockets.end(); itr++) {
if (ovmap.count(*itr) == 0) {
HANDLE handle = *itr;
auto ov = std::make_shared<datapath::windows::overlapped>();
ov->set_handle(handle);
ov->set_data(this);
ov->_on_wait_success.add([this, &ovmap, &itr, &handle](datapath::error ec) {
std::unique_lock<std::mutex> ul(this->lock);
this->waiting_sockets.erase(itr);
this->pending_sockets.push_back(handle);
ovmap.erase(handle);
});
SetLastError(ERROR_SUCCESS);
BOOL suc = ConnectNamedPipe(handle, ov->get_overlapped());
if (suc) {
ovmap.insert({handle, ov});
} else {
if (GetLastError() == ERROR_PIPE_CONNECTED) {
to_kill.push_back(handle);
this->pending_sockets.push_back(handle);
} else {
continue;
}
}
}
}
for (auto hnd : to_kill) {
this->waiting_sockets.remove(hnd);
}
}
// Notify of pending sockets.
{
std::unique_lock<std::mutex> ul(this->lock);
std::list<HANDLE> to_kill;
if (this->on_accept) {
for (auto itr = this->pending_sockets.begin(); itr != this->pending_sockets.end(); itr++) {
HANDLE handle = *itr;
bool accept = true;
auto sock = std::make_shared<datapath::windows::socket>();
sock->_connect(handle);
auto isock = std::dynamic_pointer_cast<datapath::isocket>(sock);
this->on_accept(accept, isock);
if (accept) {
to_kill.push_back(handle);
this->active_sockets.insert({handle, sock});
if ((this->waiting_sockets.size() + this->pending_sockets.size()) < WIN_BACKLOG_NUM) {
if ((this->sockets.size() <= this->max_clients) && (this->max_clients > 0)) {
HANDLE handle = _create_socket(this->path, false);
if (handle != INVALID_HANDLE_VALUE) {
this->sockets.push_back(handle);
this->waiting_sockets.push_back(handle);
}
}
}
} else {
// Force close and return to waiting.
sock->close();
to_kill.push_back(handle);
this->waiting_sockets.push_back(handle);
}
}
}
for (auto hnd : to_kill) {
this->pending_sockets.remove(hnd);
}
}
// Verify existing connections.
{
std::list<HANDLE> to_kill;
std::unique_lock<std::mutex> ul(this->lock);
for (auto itr = this->active_sockets.begin(); itr != this->active_sockets.end(); itr++) {
if (itr->second.expired()) {
// Enforce backlog size
if ((this->waiting_sockets.size() + this->pending_sockets.size()) < WIN_BACKLOG_NUM) {
this->waiting_sockets.push_back(itr->first);
} else {
DisconnectNamedPipe(itr->first);
CloseHandle(itr->first);
to_kill.push_back(itr->first);
}
this->active_sockets.erase(itr);
continue;
}
auto obj = itr->second.lock();
if (!obj->good()) {
// Enforce backlog size
if ((this->waiting_sockets.size() + this->pending_sockets.size()) < WIN_BACKLOG_NUM) {
this->waiting_sockets.push_back(itr->first);
} else {
DisconnectNamedPipe(itr->first);
CloseHandle(itr->first);
to_kill.push_back(itr->first);
}
this->active_sockets.erase(itr);
continue;
}
}
for (auto hnd : to_kill) {
this->active_sockets.erase(hnd);
}
}
if (ovmap.size() == 0) {
// Just sleep 1ms to not use too much CPU.
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
datapath::windows::server::server() {}
datapath::windows::server::~server() {}
/*datapath::error datapath::windows::server::accept(std::shared_ptr<datapath::isocket>& socket)
{
std::unique_lock<std::mutex> ul(this->lock);
if (this->pending_sockets.size() == 0) {
return datapath::error::Failure;
}
HANDLE handle = this->pending_sockets.front();
this->pending_sockets.pop_front();
if (!socket) {
socket = std::dynamic_pointer_cast<datapath::isocket>(std::make_shared<datapath::windows::socket>());
}
std::shared_ptr<datapath::windows::socket> obj = std::dynamic_pointer_cast<datapath::windows::socket>(socket);
obj->_connect(handle);
// Stock up on backlog and total sockets.
if ((this->waiting_sockets.size() + this->pending_sockets.size()) < WIN_BACKLOG_NUM) {
if ((this->sockets.size() <= this->max_clients) && (this->max_clients > 0)) {
HANDLE handle = _create_socket(this->path, false);
if (handle != INVALID_HANDLE_VALUE) {
this->sockets.push_back(handle);
this->waiting_sockets.push_back(handle);
}
}
}
this->active_sockets.insert({handle, socket});
return datapath::error::Success;
}*/
datapath::error datapath::windows::server::close()
{
// Watcher Thread
{
std::unique_lock<std::mutex> ul(this->watcher.lock);
this->watcher.shutdown = true;
if (this->watcher.task.joinable()) {
this->watcher.task.join();
}
}
// Kill all sockets.
std::unique_lock<std::mutex> ul(this->lock);
for (HANDLE socket : sockets) {
DisconnectNamedPipe(socket);
CloseHandle(socket);
}
// Notify Sockets of being dead.
// Clear all lists.
sockets.clear();
waiting_sockets.clear();
pending_sockets.clear();
active_sockets.clear();
return datapath::error::Success;
}
datapath::error datapath::windows::server::host(std::shared_ptr<datapath::iserver>& server, std::string path,
datapath::permissions permissions, size_t max_clients)
{
if (!server) {
server = std::dynamic_pointer_cast<datapath::iserver>(std::make_shared<datapath::windows::server>());
}
std::shared_ptr<datapath::windows::server> obj = std::dynamic_pointer_cast<datapath::windows::server>(server);
return obj->create(path, permissions, max_clients);
}
-78
View File
@@ -1,78 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <list>
#include <map>
#include <mutex>
#include <string>
#include <thread>
#include "iserver.hpp"
#include "permissions.hpp"
extern "C" {
#include <Windows.h>
}
namespace datapath {
namespace windows {
class server : public iserver, public std::enable_shared_from_this<datapath::windows::server> {
bool is_created = false;
size_t max_clients = -1;
std::string path;
private /*critical data*/:
// Lock for critical data.
std::mutex lock;
SECURITY_ATTRIBUTES security_attributes;
std::list<HANDLE> sockets;
std::list<HANDLE> waiting_sockets;
std::list<HANDLE> pending_sockets;
std::map<HANDLE, std::weak_ptr<datapath::isocket>> active_sockets;
struct {
std::thread task;
std::mutex lock;
bool shutdown = false;
} watcher;
protected:
datapath::error create(std::string path, datapath::permissions permissions, size_t max_clients);
HANDLE _create_socket(std::string path, bool initial = false);
void _watcher();
public:
server();
virtual ~server();
server(const server&) = delete;
server& operator=(const server&) = delete;
public /*virtual override*/:
virtual datapath::error close() override;
public:
static datapath::error host(std::shared_ptr<datapath::iserver>& server, std::string path,
datapath::permissions permissions, size_t max_clients);
};
} // namespace windows
} // namespace datapath
-233
View File
@@ -1,233 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "socket.hpp"
#include <cinttypes>
#include "task.hpp"
#include "utility.hpp"
#define SIZE_ELEMENT uint32_t
void datapath::windows::socket::_connect(HANDLE handle)
{
this->socket_handle = handle;
if (handle != INVALID_HANDLE_VALUE) {
this->is_connected = true;
{
std::unique_lock<std::mutex> ul(this->watcher.lock);
this->watcher.shutdown = false;
this->watcher.task = std::thread(std::bind(&datapath::windows::socket::_watcher, this));
}
}
}
void datapath::windows::socket::_disconnect()
{
if (this->on_close) {
this->on_close();
}
{
{
std::unique_lock<std::mutex> ul(this->watcher.lock);
this->watcher.shutdown = true;
}
if (this->watcher.task.joinable()) {
this->watcher.task.join();
}
}
this->is_connected = false;
}
void datapath::windows::socket::_watcher()
{
enum class readstate { Unknown, Header, Content } state = readstate::Unknown;
std::vector<char> read_buffer;
std::shared_ptr<datapath::windows::overlapped> read_header_ov = std::make_shared<datapath::windows::overlapped>();
std::shared_ptr<datapath::windows::overlapped> read_content_ov = std::make_shared<datapath::windows::overlapped>();
std::shared_ptr<datapath::windows::overlapped> waitable;
read_header_ov->_on_wait_error.add([&state, &waitable](datapath::error ec) {
// There was an error waiting on the header.
state = readstate::Unknown;
waitable.reset();
});
read_header_ov->_on_wait_success.add([this, &read_buffer, &read_content_ov, &state, &waitable](datapath::error ec) {
read_content_ov->set_handle(this->socket_handle);
read_content_ov->set_data(this);
// ToDo: Add optional message size limit, messages above this size kill the connection for attempting DoS.
size_t msg_size = reinterpret_cast<SIZE_ELEMENT&>(read_buffer[0]);
read_buffer.resize(msg_size);
// Read content.
if (ReadFileEx(this->socket_handle, read_buffer.data(), DWORD(read_buffer.size()),
read_content_ov->get_overlapped(), &datapath::windows::utility::def_io_completion_routine)) {
state = readstate::Content;
waitable = read_content_ov;
} else {
state = readstate::Unknown;
waitable.reset();
}
});
read_content_ov->_on_wait_error.add([&state, &waitable](datapath::error ec) {
// There was an error waiting on the content.
state = readstate::Unknown;
waitable.reset();
});
read_content_ov->_on_wait_success.add([this, &read_buffer, &state, &waitable](datapath::error ec) {
// We have content!
if (this->on_message) {
this->on_message(read_buffer);
state = readstate::Unknown;
} else {
// We're buffering the message in read_buffer until there is a hook to on_message.
}
waitable.reset();
});
while (!this->watcher.shutdown) {
if (this->socket_handle == INVALID_HANDLE_VALUE) {
break;
}
if (!this->is_connected) {
break;
}
if (state == readstate::Unknown) {
// Read the header of the next message.
// The header simply contains the length of the message.
// ToDo: Figure out if Message transfer/read mode and WaitCommEvent work together.
read_header_ov->set_handle(this->socket_handle);
read_header_ov->set_data(this);
read_buffer.resize(sizeof(SIZE_ELEMENT));
// Read content.
if (ReadFileEx(this->socket_handle, read_buffer.data(), DWORD(read_buffer.size()),
read_header_ov->get_overlapped(), &datapath::windows::utility::def_io_completion_routine)) {
state = readstate::Header;
waitable = read_header_ov;
} else {
state = readstate::Unknown;
waitable.reset();
}
} else if (state == readstate::Header) {
// This logic is in the on_wait_success handler.
} else if (state == readstate::Content) {
// This logic is in the on_wait_success handler, and continued here.
if (!waitable) {
// We currently have a message buffered, but there was no handler last time we checked.
if (this->on_message) {
this->on_message(read_buffer);
state = readstate::Unknown;
}
}
}
if (!waitable) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} else {
datapath::error err = waitable->wait(std::chrono::milliseconds(0));
if (err != datapath::error::Success) {
err = waitable->wait(std::chrono::milliseconds(1));
}
if (err == datapath::error::Closed) {
_disconnect();
continue;
}
}
}
}
datapath::windows::socket::socket() : is_connected(false), socket_handle(INVALID_HANDLE_VALUE) {}
datapath::windows::socket::~socket()
{
close();
}
bool datapath::windows::socket::good()
{
return this->is_connected;
}
datapath::error datapath::windows::socket::close()
{
if (this->is_connected) {
DisconnectNamedPipe(this->socket_handle);
_disconnect();
return datapath::error::Success;
}
return datapath::error::Closed;
}
datapath::error datapath::windows::socket::write(std::shared_ptr<datapath::itask>& task, const std::vector<char>& data)
{
if (!task) {
task = std::dynamic_pointer_cast<datapath::itask>(std::make_shared<datapath::windows::task>());
}
std::shared_ptr<datapath::windows::task> obj = std::dynamic_pointer_cast<datapath::windows::task>(task);
std::shared_ptr<datapath::windows::overlapped> ov = std::make_shared<datapath::windows::overlapped>();
obj->_assign(data, ov);
BOOL suc = WriteFileEx(socket_handle, obj->data().data(), DWORD(obj->data().size()), ov->get_overlapped(),
&datapath::windows::utility::def_io_completion_routine);
if (suc) {
return datapath::error::Success;
} else {
return datapath::error::Failure;
}
}
datapath::error datapath::windows::socket::connect(std::shared_ptr<datapath::isocket>& socket, std::string path)
{
if (!datapath::windows::utility::make_pipe_path(path)) {
return datapath::error::InvalidPath;
}
std::wstring wpath = datapath::windows::utility::make_wide_string(path);
SetLastError(ERROR_SUCCESS);
HANDLE handle = CreateFileW(wpath.c_str(), GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING,
FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH, NULL);
if ((handle == INVALID_HANDLE_VALUE) || (GetLastError() != ERROR_SUCCESS)) {
return datapath::error::Failure;
}
DWORD pipe_read_mode = PIPE_WAIT | PIPE_READMODE_BYTE;
SetLastError(ERROR_SUCCESS);
if (!SetNamedPipeHandleState(handle, &pipe_read_mode, NULL, NULL)) {
// ToDo. This doesn't actually affect us as the default mode is the one we're setting
}
if (!socket) {
socket = std::dynamic_pointer_cast<datapath::isocket>(std::make_shared<datapath::windows::socket>());
}
std::shared_ptr<datapath::windows::socket> obj = std::dynamic_pointer_cast<datapath::windows::socket>(socket);
obj->_connect(handle);
return datapath::error::Success;
}
-75
View File
@@ -1,75 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <memory>
#include <thread>
#include <vector>
#include "event.hpp"
#include "isocket.hpp"
#include "overlapped-queue.hpp"
#include "overlapped.hpp"
#include "server.hpp"
extern "C" {
#include <Windows.h>
}
namespace datapath {
namespace windows {
class socket : public isocket, public std::enable_shared_from_this<datapath::windows::socket> {
bool is_connected;
HANDLE socket_handle;
struct {
std::thread task;
std::mutex lock;
bool shutdown = false;
} watcher;
protected:
void _connect(HANDLE handle);
void _disconnect();
void _watcher();
public:
socket();
virtual ~socket();
public:
socket(const socket&) = delete;
socket& operator=(const socket&) = delete;
public /*virtual override*/:
virtual bool good() override;
virtual datapath::error close() override;
virtual datapath::error write(std::shared_ptr<datapath::itask>& task,
const std::vector<char>& data) override;
public:
static datapath::error connect(std::shared_ptr<datapath::isocket>& socket, std::string path);
friend class datapath::windows::server;
};
} // namespace windows
} // namespace datapath
-178
View File
@@ -1,178 +0,0 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "waitable.hpp"
#include <assert.h>
extern "C" {
#include <Windows.h>
}
datapath::error datapath::waitable::wait(datapath::waitable* obj, std::chrono::nanoseconds duration)
{
assert(obj != nullptr);
HANDLE handle = (HANDLE)obj->get_waitable();
int64_t timeout = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
if (timeout < 0) {
timeout = 0;
} else if (timeout > std::numeric_limits<int32_t>::max()) {
timeout = std::numeric_limits<int32_t>::max();
}
do {
auto start = std::chrono::high_resolution_clock::now();
DWORD result = WaitForSingleObjectEx(handle, DWORD(timeout), TRUE);
switch (result) {
case WAIT_OBJECT_0:
obj->_on_wait_success(datapath::error::Success);
return datapath::error::Success;
case WAIT_TIMEOUT:
return datapath::error::TimedOut;
case WAIT_ABANDONED:
obj->_on_wait_error(datapath::error::Closed);
return datapath::error::Closed;
case WAIT_IO_COMPLETION:
duration = (std::chrono::high_resolution_clock::now() - start);
timeout -= std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
if (timeout <= 0) {
timeout = 0;
}
continue;
default:
return datapath::error::Failure;
}
} while (timeout >= 0);
return datapath::error::TimedOut;
}
datapath::error datapath::waitable::wait(datapath::waitable** objs, size_t count, std::chrono::nanoseconds duration)
{
assert(objs != nullptr);
assert((count > 0) && (count <= MAXIMUM_WAIT_OBJECTS));
int64_t timeout = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
if (timeout < 0) {
timeout = 0;
} else if (timeout > std::numeric_limits<int32_t>::max()) {
timeout = std::numeric_limits<int32_t>::max();
}
// Rebuild a valid obj+index translation list.
std::vector<HANDLE> handles(count);
std::vector<size_t> indexes(count);
size_t valid_handles = 0;
for (size_t idx = 0; idx < count; idx++) {
datapath::waitable* obj = objs[idx];
if (obj) {
handles[valid_handles] = reinterpret_cast<HANDLE>(obj->get_waitable());
indexes[valid_handles] = idx;
valid_handles++;
}
}
do {
auto start = std::chrono::high_resolution_clock::now();
DWORD result = WaitForMultipleObjectsEx(handles.size(), handles.data(), TRUE, DWORD(timeout), TRUE);
if ((result >= WAIT_OBJECT_0) && (result < (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS))) {
for (auto idx : indexes) {
objs[idx]->_on_wait_success(datapath::error::Success);
}
return datapath::error::Success;
} else if ((result >= WAIT_ABANDONED_0) && (result < (WAIT_ABANDONED_0 + MAXIMUM_WAIT_OBJECTS))) {
for (auto idx : indexes) {
objs[idx]->_on_wait_error(datapath::error::Closed);
}
return datapath::error::Closed;
} else if (result == WAIT_TIMEOUT) {
return datapath::error::TimedOut;
} else if (result == WAIT_IO_COMPLETION) {
duration = (std::chrono::high_resolution_clock::now() - start);
timeout -= std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
if (timeout <= 0) {
timeout = 0;
}
continue;
} else {
return datapath::error::Failure;
}
} while (timeout >= 0);
return datapath::error::TimedOut;
}
datapath::error datapath::waitable::wait_any(datapath::waitable** objs, size_t count, size_t& index,
std::chrono::nanoseconds duration)
{
assert(objs != nullptr);
assert((count > 0) && (count <= MAXIMUM_WAIT_OBJECTS));
int64_t timeout = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
if (timeout < 0) {
timeout = 0;
} else if (timeout > std::numeric_limits<int32_t>::max()) {
timeout = std::numeric_limits<int32_t>::max();
}
// Rebuild a valid obj+index translation list.
std::vector<HANDLE> handles(count);
std::vector<size_t> indexes(count);
size_t valid_handles = 0;
for (size_t idx = 0; idx < count; idx++) {
datapath::waitable* obj = objs[idx];
if (obj) {
handles[valid_handles] = reinterpret_cast<HANDLE>(obj->get_waitable());
indexes[valid_handles] = idx;
valid_handles++;
}
}
do {
auto start = std::chrono::high_resolution_clock::now();
DWORD result = WaitForMultipleObjectsEx(handles.size(), handles.data(), FALSE, DWORD(timeout), TRUE);
if ((result >= WAIT_OBJECT_0) && (result < (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS))) {
index = indexes[result - WAIT_OBJECT_0];
objs[index]->_on_wait_success(datapath::error::Success);
return datapath::error::Success;
} else if ((result >= WAIT_ABANDONED_0) && (result < (WAIT_ABANDONED_0 + MAXIMUM_WAIT_OBJECTS))) {
index = indexes[result - WAIT_OBJECT_0];
objs[index]->_on_wait_error(datapath::error::Closed);
return datapath::error::Closed;
} else if (result == WAIT_TIMEOUT) {
return datapath::error::TimedOut;
} else if (result == WAIT_IO_COMPLETION) {
duration = (std::chrono::high_resolution_clock::now() - start);
timeout -= std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
if (timeout <= 0) {
timeout = 0;
}
continue;
} else {
return datapath::error::Failure;
}
} while (timeout >= 0);
return datapath::error::TimedOut;
}
+172
View File
@@ -0,0 +1,172 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "windows-client-socket.hpp"
#include "windows-utility.hpp"
::std::shared_ptr<::datapath::socket> datapath::socket::create()
{
return std::make_shared<::datapath::windows::client_socket>();
}
datapath::windows::client_socket::client_socket()
: socket(), _lock(), _opened(false), _path(), _worker_count(0), _iocp(), _handle()
{
// Create IOCP
HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, reinterpret_cast<ULONG_PTR>(this), 0);
if (iocp == INVALID_HANDLE_VALUE)
throw std::runtime_error("Failed to create IOCompletionPort.");
_iocp = std::shared_ptr<void>(iocp, ::datapath::windows::utility::shared_ptr_handle_deleter);
}
datapath::windows::client_socket::~client_socket()
{
close();
}
void datapath::windows::client_socket::set_path(std::string path)
{
std::lock_guard<std::mutex> lg(_lock);
if (_opened.load())
throw std::runtime_error("TODO"); // ToDo: Better exceptions.
// Create a proper path for Windows.
if (!::datapath::windows::utility::make_pipe_path(path))
throw std::runtime_error("TODO"); // ToDo: Better exceptions.
// Store the new path.
_path = utility::make_wide_string(path);
}
void datapath::windows::client_socket::open()
{
try {
// Close the server just to be safe.
close();
// Guard against multiple invocations.
std::lock_guard<std::mutex> lg(_lock);
// Create handle
SetLastError(ERROR_SUCCESS);
HANDLE handle = CreateFileW(_path.c_str(), GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING,
FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH, NULL);
if (handle == INVALID_HANDLE_VALUE) {
throw std::runtime_error("TODO");
}
_handle = std::shared_ptr<void>(handle, ::datapath::windows::utility::shared_ptr_handle_deleter);
// Create IOCP
HANDLE iocp = CreateIoCompletionPort(handle, _iocp.get(), reinterpret_cast<ULONG_PTR>(this), 0);
if (iocp == INVALID_HANDLE_VALUE)
throw std::runtime_error("Failed to create IOCompletionPort.");
if (iocp != _iocp.get()) {
CloseHandle(iocp);
throw std::runtime_error("Failed to link IOCompletionPort.");
}
// Update the pipe state, just to be safe.
DWORD mode = PIPE_READMODE_BYTE;
SetNamedPipeHandleState(_handle.get(), &mode, NULL, NULL);
// Update the overlapped objects to the new handle.
_ov_read.set_handle(_handle.get());
_ov_write.set_handle(_handle.get());
// Update state.
_opened.store(true);
} catch (std::exception const& ex) {
// If there was any problem during all of this, close the server,
// and throw the exception to the parent caller.
close();
throw ex;
}
}
void datapath::windows::client_socket::close()
{
// Guard against multiple invocations.
std::lock_guard<std::mutex> lg(_lock);
// Update state.
_opened.store(false);
// Kill Workers
for (std::size_t idx = 0, edx = _worker_count.load(); idx < edx; idx++) {
PostQueuedCompletionStatus(_iocp.get(), NULL, NULL, NULL);
}
// Close handles.
_iocp.reset();
_handle.reset();
}
bool datapath::windows::client_socket::is_open()
{
return _opened.load();
}
void datapath::windows::client_socket::work(std::chrono::milliseconds time_limit)
{
// Thread local state.
std::shared_ptr<void> iocp;
auto limit = time_limit.count();
DWORD w_time_limit = (limit > 2147483648) ? INFINITE : static_cast<DWORD>(limit);
DWORD num_bytes_transferred = 0;
ULONG_PTR ptr = 0;
LPOVERLAPPED overlapped = 0;
DWORD error_code = ERROR_SUCCESS;
{ // Copy necessary state.
std::lock_guard<std::mutex> lg(_lock);
if (!_opened.load()) {
return;
}
iocp = _iocp;
}
// Try and get IOCP status.
_worker_count++;
DWORD status = GetQueuedCompletionStatus(iocp.get(), &num_bytes_transferred, &ptr, &overlapped, w_time_limit);
_worker_count--;
// Check if we suceeded.
if (status == TRUE) {
// Stop request or invalid status.
if (!overlapped) {
return;
}
// The OVERLAPPED object we will get here is actually always an instance
// of datapath::windows::overlapped. This allows us to go from it back
// to a proper object, and call the necessary callback, easily allowing
// new functionality to be added later.
//
// Note that this is technically very unsafe, however there is no other
// way to actually do this without writing a new Operating System.
windows::overlapped* ovp = windows::overlapped::from_overlapped(overlapped);
ovp->callback(static_cast<size_t>(num_bytes_transferred), reinterpret_cast<void*>(ptr));
}
// Request failed, check error code.
// ToDo: Deal with other error codes?
error_code = GetLastError();
return;
}
+59
View File
@@ -0,0 +1,59 @@
/*
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <atomic>
#include <memory>
#include <string>
#include "error.hpp"
#include "windows-socket.hpp"
// Windows Client:
// - Also uses IOCP (best support, least issues unlike completion routines).
namespace datapath::windows {
class client_socket : public ::datapath::windows::socket,
public std::enable_shared_from_this<datapath::windows::client_socket> {
// Data
std::mutex _lock;
std::atomic_bool _opened;
std::wstring _path;
std::atomic_size_t _worker_count;
// Windows
std::shared_ptr<void> _handle;
std::shared_ptr<void> _iocp;
public:
client_socket();
virtual ~client_socket();
public /* Virtual Implementations */:
virtual void set_path(std::string path) override;
virtual void open() override;
virtual void close() override;
virtual bool is_open() override;
virtual void work(std::chrono::milliseconds time_limit) override;
};
} // namespace datapath::windows
+110
View File
@@ -0,0 +1,110 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "windows-overlapped.hpp"
datapath::windows::overlapped::overlapped() : _overlapped(nullptr), _data(nullptr)
{
size_t memory_size = sizeof(OVERLAPPED) + sizeof(void*);
_buffer.resize(memory_size);
reinterpret_cast<void*&>(_buffer[sizeof(OVERLAPPED)]) = this;
_handle = INVALID_HANDLE_VALUE;
// Initialize OVERLAPPED
_overlapped = &reinterpret_cast<OVERLAPPED&>(_buffer[0]);
memset(_overlapped, 0, sizeof(OVERLAPPED));
//overlapped_ptr->hEvent = CreateEventW(NULL, FALSE, FALSE, NULL);
}
datapath::windows::overlapped::~overlapped()
{
cancel();
}
OVERLAPPED* datapath::windows::overlapped::get_overlapped()
{
return _overlapped;
}
HANDLE datapath::windows::overlapped::get_handle()
{
return _handle;
}
void datapath::windows::overlapped::set_handle(HANDLE handle)
{
_handle = handle;
}
std::shared_ptr<void> datapath::windows::overlapped::get_data()
{
return _data;
}
void datapath::windows::overlapped::set_data(std::shared_ptr<void> data)
{
_data = data;
}
void datapath::windows::overlapped::set_callback(std::function<void(overlapped&, size_t, void*)> cb)
{
_callback = cb;
}
void datapath::windows::overlapped::callback(size_t bytes, void* ptr)
{
if (_callback)
_callback(*this, bytes, ptr);
}
void datapath::windows::overlapped::cancel()
{
CancelIoEx(_handle, _overlapped);
}
bool datapath::windows::overlapped::is_completed()
{
return HasOverlappedIoCompleted(_overlapped);
}
void datapath::windows::overlapped::reset()
{
cancel();
_overlapped->Pointer = _overlapped->hEvent = nullptr;
_overlapped->Internal = _overlapped->InternalHigh = _overlapped->Offset = _overlapped->OffsetHigh = 0;
}
::datapath::error datapath::windows::overlapped::status()
{
DWORD transferredBytes;
DWORD res = GetOverlappedResult(_handle, _overlapped, &transferredBytes, 0);
if (res) {
return ::datapath::error::Success;
} else {
DWORD err = GetLastError();
switch (err) {
case ERROR_PIPE_NOT_CONNECTED:
return ::datapath::error::SocketClosed;
case ERROR_IO_PENDING:
return ::datapath::error::Success;
default:
return ::datapath::error::Failure;
}
}
}
@@ -17,9 +17,12 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <cinttypes>
#include <cstdint>
#include <functional>
#include <memory>
#include <vector>
#include "waitable.hpp"
#include "datapath.hpp"
extern "C" {
#include <windows.h>
@@ -27,11 +30,13 @@ extern "C" {
namespace datapath {
namespace windows {
class overlapped : public datapath::waitable {
std::vector<char> buffer;
OVERLAPPED* overlapped_ptr;
HANDLE handle;
void* data;
class overlapped {
std::vector<char> _buffer;
OVERLAPPED* _overlapped;
HANDLE _handle;
std::shared_ptr<void> _data;
std::function<void(overlapped&, size_t, void*)> _callback;
public:
overlapped();
@@ -40,10 +45,13 @@ namespace datapath {
OVERLAPPED* get_overlapped();
HANDLE get_handle();
void set_handle(HANDLE handle);
void set_handle(HANDLE _handle);
void* get_data();
void set_data(void* data);
std::shared_ptr<void> get_data();
void set_data(std::shared_ptr<void> _data);
void set_callback(std::function<void(overlapped&, size_t, void*)> cb);
void callback(size_t bytes, void* ptr);
void cancel();
@@ -51,8 +59,13 @@ namespace datapath {
void reset();
public /*virtual override*/:
virtual void* get_waitable() override;
::datapath::error status();
public:
static overlapped* from_overlapped(OVERLAPPED* ptr)
{
return *reinterpret_cast<overlapped**>(reinterpret_cast<int8_t*>(ptr) + sizeof(OVERLAPPED));
}
};
} // namespace windows
} // namespace datapath
+186
View File
@@ -0,0 +1,186 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "windows-server-socket.hpp"
#include "windows-utility.hpp"
// Server sockets behave differently from client sockets:
// 1. The Named Pipe is immediately created in the constructor.
// 2. The Named Pipe is destroyed only in the destructor.
// 3. It does not have its own threads to work with (shares threads with Server).
// 4. The opened callback is handled by the Server.
// 5. Open/Close are used to reset the socket.
//
// Identical Stuff
// - Read/Write must be queued.
// - Packets are
// 0..3 4-byte unsigned packet size
// 4... Packet Data
//
// Queue Behavior
// - All reads/writes are inserted to the queue.
// - Only the front of the queue can be worked on (for stability reasons).
//
#define DATAPATH_PIPE_FLAGS PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED
#define DATAPATH_PIPE_MODE PIPE_TYPE_BYTE | PIPE_READMODE_BYTE
#define BUFFER_SIZE 64 * 1024
#define WAIT_TIME_OUT 10
datapath::windows::server_socket::server_socket(std::shared_ptr<::datapath::windows::server> parent, bool initial)
: socket(), _lock(), _opened(false), _iocp(), _handle(), _ov_open(), _parent(parent)
{
// Create an asynchronous duplex byte name pipe.
DWORD flags = DATAPATH_PIPE_FLAGS;
if (initial) // If this is supposed to be the first instance, set the flag for it.
flags |= FILE_FLAG_FIRST_PIPE_INSTANCE;
_handle =
std::shared_ptr<void>(CreateNamedPipeW(parent->path().c_str(), flags, DATAPATH_PIPE_MODE,
PIPE_UNLIMITED_INSTANCES, BUFFER_SIZE, BUFFER_SIZE, WAIT_TIME_OUT, NULL),
utility::shared_ptr_handle_deleter);
if (_handle.get() == INVALID_HANDLE_VALUE) {
throw std::runtime_error("Failed to create socket.");
}
{ // Create IOCP.
_iocp = parent->iocp();
HANDLE handle = CreateIoCompletionPort(_handle.get(), _iocp.get(), reinterpret_cast<ULONG_PTR>(this), 0);
if ((handle != _iocp.get()) || (handle == INVALID_HANDLE_VALUE)) {
CloseHandle(handle);
throw std::runtime_error("Failed to IOCompletionPort.");
}
}
// Set up OVERLAPPED.
_ov_open.set_handle(_handle.get());
_ov_open.set_callback(std::bind(&::datapath::windows::server_socket::on_open, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
_ov_read.set_handle(_handle.get());
_ov_write.set_handle(_handle.get());
}
datapath::windows::server_socket::~server_socket()
{
close();
}
void datapath::windows::server_socket::set_path(std::string path)
{
throw std::runtime_error("Operation not supported.");
}
void datapath::windows::server_socket::open()
{
try {
// Close the server just to be safe.
close();
// Guard against multiple invocations.
std::lock_guard<std::mutex> lg(_lock);
// Enqueue an asynchronous attempt at connecting.
SetLastError(ERROR_SUCCESS);
ConnectNamedPipe(_handle.get(), _ov_open.get_overlapped());
switch (GetLastError()) {
case ERROR_SUCCESS: // Client was waiting for us.
case ERROR_PIPE_CONNECTED: // Client was waiting for us.
// Occasionally ConnectNamedPipe can instantly finish, for example if a client has connected in the last
// tick, but we haven't made a call to ConnectNamedPipe in that time.
//on_open(_ov_open, 0, nullptr);
return;
case ERROR_IO_PENDING: // Waiting for client to connect.
return;
case ERROR_PIPE_LISTENING: // Client is already connected.
return; // Technically not a valid return code.
default:
throw std::runtime_error("TODO");
}
} catch (std::exception const& ex) {
// If there was any problem during all of this, close the server,
// and throw the exception to the parent caller.
close();
throw ex;
}
}
void datapath::windows::server_socket::close()
{
// Guard against multiple invocations.
std::lock_guard<std::mutex> lg(_lock);
// If the socket is not yet opened, but has a pending connect operation, cancel it.
SetLastError(ERROR_SUCCESS);
if (!_opened) {
CancelIoEx(_handle.get(), _ov_open.get_overlapped());
DWORD res = DisconnectNamedPipe(_handle.get());
if (res == 0) {
DWORD ec = GetLastError();
}
} else {
// If it is open, disconnect the client.
DWORD res = DisconnectNamedPipe(_handle.get());
_opened.store(false);
if (res == 0) {
DWORD ec = GetLastError();
switch (ec) {
case ERROR_SUCCESS:
return;
case ERROR_INVALID_HANDLE: // Something messed with the handle, so we can't use it anymore.
throw std::runtime_error("Invalid error returned from system call.");
default:
throw std::runtime_error("Generic failure.");
}
}
/* TODO: Should manually calling close actually call callbacks? Doesn't really make any sense.
{
auto status = ::datapath::error::SocketClosed;
auto self = shared_from_this();
events.closed(status, self);
internal_events.closed(status, self);
}
*/
}
}
bool datapath::windows::server_socket::is_open()
{
return _opened;
}
void datapath::windows::server_socket::work(std::chrono::milliseconds time_limit)
{
return;
}
void datapath::windows::server_socket::on_open(::datapath::windows::overlapped& ov, std::size_t size, void* ptr)
{
auto status = ov.status();
auto self = shared_from_this();
if (status == error::Success) {
_opened.store(true);
} else {
_opened.store(false);
}
internal_events.opened(status, self);
events.opened(status, self);
}
+67
View File
@@ -0,0 +1,67 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <atomic>
#include <queue>
#include <string>
#include <utility>
#include "datapath.hpp"
#include "windows-overlapped.hpp"
#include "windows-server.hpp"
#include "windows-socket.hpp"
namespace datapath::windows {
class server_socket : public ::datapath::windows::socket {
// Data
std::mutex _lock;
std::atomic_bool _opened;
std::weak_ptr<server> _parent;
// Windows
std::shared_ptr<void> _handle;
std::shared_ptr<void> _iocp;
// Asynchronous Operation
::datapath::windows::overlapped _ov_open;
// Parent/Child relationship
public:
server_socket(std::shared_ptr<::datapath::windows::server> parent, bool initial);
virtual ~server_socket();
public /* Virtual Implementations */:
virtual void set_path(std::string path) override;
virtual void open() override;
virtual void close() override;
virtual bool is_open() override;
virtual void work(std::chrono::milliseconds time_limit) override;
private:
void on_open(::datapath::windows::overlapped& ov, std::size_t size, void* ptr);
protected:
friend class ::datapath::windows::server;
};
} // namespace datapath::windows
+226
View File
@@ -0,0 +1,226 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "windows-server.hpp"
#include "socket.hpp"
#include "windows-server-socket.hpp"
#include "windows-utility.hpp"
#include <algorithm>
#include <thread>
const std::size_t BACKLOG = 8;
// Windows implementation of 'server.hpp:L32'.
std::shared_ptr<::datapath::server> datapath::server::create()
{
return std::make_shared<::datapath::windows::server>();
}
datapath::windows::server::server() : _opened(false), _path(), _lock(), _sockets(), _sockets_free(), _iocp() {}
datapath::windows::server::~server()
{
close();
}
void datapath::windows::server::set_path(std::string path)
{
std::lock_guard<std::mutex> lg(_lock);
if (_opened.load())
throw std::runtime_error("TODO"); // ToDo: Better exceptions.
// Create a proper path for Windows.
if (!::datapath::windows::utility::make_pipe_path(path))
throw std::runtime_error("TODO"); // ToDo: Better exceptions.
// Store the new path.
_path = utility::make_wide_string(path);
}
void datapath::windows::server::open()
{
try {
// Close the server just to be safe.
close();
// Guard against multiple invocations.
std::lock_guard<std::mutex> lg(_lock);
// Create a core IO Completion Port.
HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, reinterpret_cast<ULONG_PTR>(this), 0);
if (iocp == INVALID_HANDLE_VALUE)
throw std::runtime_error("Failed to create IOCompletionPort.");
_iocp = std::shared_ptr<void>(iocp, ::datapath::windows::utility::shared_ptr_handle_deleter);
// Spawn a backlog of sockets.
for (std::size_t idx = 0; idx < BACKLOG; idx++) {
auto socket = create_socket();
_sockets_free++;
_sockets.push_back(socket);
}
// Change state.
_opened.store(true);
} catch (std::exception const& ex) {
// If there was any problem during all of this, close the server,
// and throw the exception to the parent caller.
close();
throw ex;
}
}
void datapath::windows::server::close()
{
// Guard against multiple invocations.
std::lock_guard<std::mutex> lg(_lock);
// Update state.
_opened.store(false);
// Disconnect all clients
for (auto socket : _sockets) {
socket->close();
}
_sockets.clear();
// Kill Workers
for (std::size_t idx = 0, edx = _worker_count.load(); idx < edx; idx++) {
PostQueuedCompletionStatus(_iocp.get(), NULL, NULL, NULL);
}
// Close handles.
_iocp.reset();
}
bool datapath::windows::server::is_open()
{
return _opened.load();
}
void datapath::windows::server::work(std::chrono::milliseconds time_limit)
{
// Thread local state.
std::shared_ptr<void> iocp;
auto limit = time_limit.count();
DWORD w_time_limit = (limit > 2147483648) ? INFINITE : static_cast<DWORD>(limit);
DWORD num_bytes_transferred = 0;
ULONG_PTR ptr = 0;
LPOVERLAPPED overlapped = 0;
DWORD error_code = ERROR_SUCCESS;
windows::overlapped* ovp = nullptr;
{ // Copy necessary state.
std::lock_guard<std::mutex> lg(_lock);
if (!_opened.load()) {
return;
}
iocp = _iocp;
}
// Try and get IOCP status.
_worker_count++;
DWORD status = GetQueuedCompletionStatus(iocp.get(), &num_bytes_transferred, &ptr, &overlapped, w_time_limit);
_worker_count--;
// Check if we suceeded.
if (status == TRUE) {
// Stop request or invalid status.
if (!overlapped) {
return;
}
// The OVERLAPPED object we will get here is actually always an instance
// of datapath::windows::overlapped. This allows us to go from it back
// to a proper object, and call the necessary callback, easily allowing
// new functionality to be added later.
//
// Note that this is technically very unsafe, however there is no other
// way to actually do this without writing a new Operating System.
ovp = windows::overlapped::from_overlapped(overlapped);
ovp->callback(static_cast<size_t>(num_bytes_transferred), reinterpret_cast<void*>(ptr));
}
// Request failed, check error code.
// ToDo: Deal with other error codes?
error_code = GetLastError();
return;
}
std::shared_ptr<datapath::windows::server_socket> datapath::windows::server::create_socket()
{
auto socket = std::make_shared<::datapath::windows::server_socket>(shared_from_this(), _sockets.size() == 0);
socket->internal_events.opened +=
std::bind(&::datapath::windows::server::on_socket_opened, this, std::placeholders::_1, std::placeholders::_2);
socket->internal_events.closed +=
std::bind(&::datapath::windows::server::on_socket_closed, this, std::placeholders::_1, std::placeholders::_2);
socket->open();
return socket;
}
void datapath::windows::server::on_socket_opened(::datapath::error error, std::shared_ptr<::datapath::socket> socket)
{
// ToDo: Deal with other error codes.
if (error != ::datapath::error::Success)
return;
// Check if the connection should be allowed.
bool allow = false;
events.connected(allow, socket);
if (allow) {
// If the connection was allowed, reduce the free socket count.
_sockets_free--;
// We're on or over the socket limit, so don't bother filling up.
if (_sockets_free >= BACKLOG)
return;
{
std::lock_guard<std::mutex> lg(_lock);
_sockets.push_back(create_socket());
}
_sockets_free++;
} else {
// If the connection was not allowed, reset and wait for new connection.
socket->close();
}
}
void datapath::windows::server::on_socket_closed(::datapath::error error, std::shared_ptr<::datapath::socket> socket)
{
if (_sockets_free < BACKLOG) {
socket->open();
_sockets_free++;
} else {
std::lock_guard<std::mutex> lg(_lock);
_sockets.remove(std::dynamic_pointer_cast<::datapath::windows::server_socket>(socket));
}
}
std::wstring& datapath::windows::server::path()
{
return _path;
}
std::shared_ptr<void> datapath::windows::server::iocp()
{
return _iocp;
}
+92
View File
@@ -0,0 +1,92 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <atomic>
#include <list>
#include <map>
#include <mutex>
#include <string>
#include <thread>
#include "server.hpp"
#include "windows-overlapped.hpp"
extern "C" {
#include <Windows.h>
}
// Server is simple with IOCP:
// - Create 1 IOCP per Server.
// - Create a ThreadPool per IOCP.
// - Spawn x Tasks to fill the ThreadPool and wait on IOCP, handler simply calls callbacks in OVERLAPPED.
// - Create x Sockets (up to Hardware Concurrency) ahead of time.
// - Create x Clients ahead of time (equal to Sockets).
// - When a client connects, fill up the free sockets list with a new socket (up to Hardware Concurrency).
// - When a client disconnects, return the socket to the free state and reset the client.
namespace datapath::windows {
class server_socket;
class server : public ::datapath::server, public std::enable_shared_from_this<datapath::windows::server> {
// Data
std::mutex _lock;
std::atomic_bool _opened;
std::wstring _path;
std::atomic_size_t _worker_count;
// Sockets
std::list<std::shared_ptr<::datapath::windows::server_socket>> _sockets;
std::atomic_size_t _sockets_free;
// Windows
std::shared_ptr<void> _iocp;
public:
server();
virtual ~server();
// Move/Copy Operator
server(const server&) = delete;
server& operator=(const server&) = delete;
public /* Virtual Implementations */:
virtual void set_path(std::string path) override;
virtual void open() override;
virtual void close() override;
virtual bool is_open() override;
virtual void work(std::chrono::milliseconds time_limit) override;
private:
std::shared_ptr<::datapath::windows::server_socket> create_socket();
void on_socket_opened(::datapath::error error, std::shared_ptr<::datapath::socket> socket);
void on_socket_closed(::datapath::error error, std::shared_ptr<::datapath::socket> socket);
protected /* Socket <-> Server Interface */:
std::wstring& path();
std::shared_ptr<void> iocp();
friend class ::datapath::windows::server_socket;
};
} // namespace datapath::windows
+353
View File
@@ -0,0 +1,353 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "windows-socket.hpp"
void datapath::windows::socket::read(::datapath::io_callback_t callback, ::datapath::io_callback_data_t callback_data)
{
auto res = queue_read(callback, callback_data);
switch (res) {
case error::Success:
return;
case error::SocketClosed: {
auto status = ::datapath::error::SocketClosed;
auto self = shared_from_this();
events.closed(status, self);
internal_events.closed(status, self);
throw std::runtime_error("Socket closed");
}
default:
throw std::runtime_error("TODO");
}
}
void datapath::windows::socket::write(const ::datapath::io_data_t& data, ::datapath::io_callback_t callback,
::datapath::io_callback_data_t callback_data)
{
auto res = queue_write(data.data(), data.size(), callback, callback_data);
switch (res) {
case error::Success:
return;
case error::SocketClosed: {
auto status = ::datapath::error::SocketClosed;
auto self = shared_from_this();
events.closed(status, self);
internal_events.closed(status, self);
throw std::runtime_error("Socket closed");
}
default:
throw std::runtime_error("TODO");
}
}
void datapath::windows::socket::write(const std::uint8_t* data, const size_t data_length,
::datapath::io_callback_t callback, ::datapath::io_callback_data_t callback_data)
{
auto res = queue_write(data, data_length, callback, callback_data);
switch (res) {
case error::Success:
return;
case error::SocketClosed: {
auto status = ::datapath::error::SocketClosed;
auto self = shared_from_this();
events.closed(status, self);
internal_events.closed(status, self);
throw std::runtime_error("Socket closed");
}
default:
throw std::runtime_error("TODO");
}
}
::datapath::error datapath::windows::socket::queue_read(::datapath::io_callback_t cb,
::datapath::io_callback_data_t cbd)
{
// Early-Exit if the socket is already closed.
if (!is_open())
return ::datapath::error::NotSupported;
// Prevent multiple thread from modifying the same objects.
std::lock_guard<std::recursive_mutex> lg(_read_queue_lock);
// Queue the read operation.
_read_queue.push(read_data_t{cb, cbd});
// If this is the only operation, instantly perform it.
if (_read_queue.size() == 1) {
return perform_read();
} else {
return ::datapath::error::Success;
}
}
::datapath::error datapath::windows::socket::queue_write(const std::uint8_t* data, std::size_t length,
::datapath::io_callback_t cb,
::datapath::io_callback_data_t cbd)
{
// Early-Exit if the socket is already closed.
if (!is_open())
return ::datapath::error::NotSupported;
// Build actual packet.
io_data_t packet;
packet.resize(length + sizeof(packet_size_t));
memcpy(packet.data() + sizeof(packet_size_t), data, length);
*reinterpret_cast<packet_size_t*>(packet.data()) = length;
// Prevent multiple thread from modifying the same objects.
std::lock_guard<std::recursive_mutex> lg(_write_queue_lock);
// Queue the read request.
_write_queue.push(std::move(write_data_t{std::move(packet), cb, cbd}));
// If this is the only operation, instantly perform it.
if (_write_queue.size() == 1) {
return perform_write();
} else {
return ::datapath::error::Success;
}
}
::datapath::error datapath::windows::socket::perform_read()
{
DWORD bytes;
// Prevent multiple thread from modifying the same objects.
std::lock_guard<std::recursive_mutex> lg(_read_queue_lock);
// Early-exit if there is nothing to do.
if (_read_queue.size() == 0)
return ::datapath::error::Failure;
// Resize buffer, then issue read request.
_read_buffer.resize(sizeof(uint32_t));
_ov_read.reset();
_ov_read.set_callback(std::bind(&::datapath::windows::socket::on_read_header, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
// Issue read request (return value can be ignored, it will always be false).
SetLastError(ERROR_SUCCESS);
DWORD res =
ReadFile(_ov_read.get_handle(), _read_buffer.data(), _read_buffer.size(), &bytes, _ov_read.get_overlapped());
// Report result to caller.
DWORD ec = GetLastError();
switch (ec) {
case ERROR_SUCCESS:
case ERROR_IO_PENDING:
return ::datapath::error::Success;
case ERROR_PIPE_NOT_CONNECTED:
case ERROR_BROKEN_PIPE:
return ::datapath::error::SocketClosed;
default:
return ::datapath::error::Failure;
}
}
::datapath::error datapath::windows::socket::perform_read_packet(packet_size_t size)
{
DWORD bytes;
// Prevent multiple thread from modifying the same objects.
std::lock_guard<std::recursive_mutex> lg(_read_queue_lock);
// Early-exit if there is nothing to do.
if (_read_queue.size() == 0)
return ::datapath::error::Failure;
// Resize Buffer, then issue the 2nd read request.
_read_buffer.resize(size);
_ov_read.reset();
_ov_read.set_callback(std::bind(&::datapath::windows::socket::on_read, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
// Issue read request (return value can be ignored, it will always be false).
SetLastError(ERROR_SUCCESS);
DWORD res =
ReadFile(_ov_read.get_handle(), _read_buffer.data(), _read_buffer.size(), &bytes, _ov_read.get_overlapped());
// Report result to caller.
DWORD ec = GetLastError();
switch (ec) {
case ERROR_SUCCESS:
case ERROR_IO_PENDING:
return ::datapath::error::Success;
case ERROR_PIPE_NOT_CONNECTED:
case ERROR_BROKEN_PIPE:
return ::datapath::error::SocketClosed;
default:
return ::datapath::error::Failure;
}
}
::datapath::error datapath::windows::socket::perform_write()
{
DWORD bytes;
// Prevent multiple thread from modifying the same objects.
std::lock_guard<std::recursive_mutex> lg(_write_queue_lock);
// Early-exit if there is nothing to do.
if (_write_queue.size() == 0)
return ::datapath::error::Failure;
// Lock the queue, and grab the front element.
auto& front = _write_queue.front();
auto& data = std::get<io_data_t>(front);
// Reset the overlapped object.
_ov_write.reset();
// Issue write request (return value can be ignored, always false).
SetLastError(ERROR_SUCCESS);
DWORD res = WriteFile(_ov_write.get_handle(), data.data(), data.size(), &bytes, _ov_write.get_overlapped());
// Report result to caller.
DWORD ec = GetLastError();
switch (ec) {
case ERROR_SUCCESS:
case ERROR_IO_PENDING:
return ::datapath::error::Success;
case ERROR_PIPE_NOT_CONNECTED:
case ERROR_BROKEN_PIPE:
return ::datapath::error::SocketClosed;
default:
return ::datapath::error::Failure;
}
}
void datapath::windows::socket::on_read_header(::datapath::windows::overlapped& ov, std::size_t bytes_read, void* ptr)
{
io_data_t data;
// Sanity Check: Did we actually read the entire header?
if (bytes_read != sizeof(packet_size_t)) {
// Assume remote is a bad actor, or system is in a bad state.
{
std::lock_guard<std::recursive_mutex> lg(_read_queue_lock);
auto el = std::move(_read_queue.front());
if (el.first) {
el.first(shared_from_this(), ::datapath::error::BadHeader, data, el.second);
}
_read_queue.pop();
}
close();
return;
}
// Read the given size.
packet_size_t size = *reinterpret_cast<packet_size_t*>(_read_buffer.data());
// Sanity Check: Is the packet bigger than we allow it to be?
if (size > ::datapath::maximum_packet_size) {
// Soft-fail, remote may be testing our capabilities, is outdated, or may be newer than us.
std::lock_guard<std::recursive_mutex> lg(_read_queue_lock);
auto el = std::move(_read_queue.front());
if (el.first) {
el.first(shared_from_this(), ::datapath::error::BadSize, data, el.second);
}
_read_queue.pop();
switch (perform_read()) {
case error::SocketClosed: {
auto status = ::datapath::error::SocketClosed;
auto self = shared_from_this();
events.closed(status, self);
internal_events.closed(status, self);
return;
}
default:
return;
}
}
switch (perform_read_packet(size)) {
case error::SocketClosed: {
auto status = ::datapath::error::SocketClosed;
auto self = shared_from_this();
events.closed(status, self);
internal_events.closed(status, self);
return;
}
default:
return;
}
}
void datapath::windows::socket::on_read(::datapath::windows::overlapped& ov, std::size_t size, void* ptr)
{ // Read completed, hopefully.
::datapath::error status = ov.status();
read_data_t el;
io_data_t data{_read_buffer.data(), _read_buffer.data() + _read_buffer.size()};
// Lock the queue from outside modifications.
{
std::lock_guard<std::recursive_mutex> lg(_read_queue_lock);
el = std::move(_read_queue.front());
_read_queue.pop();
switch (perform_read()) {
case error::SocketClosed: {
auto status = ::datapath::error::SocketClosed;
auto self = shared_from_this();
events.closed(status, self);
internal_events.closed(status, self);
return;
}
default:
return;
}
}
// Call the callback (if there is one).
if (el.first) {
el.first(shared_from_this(), ::datapath::error::Success, data, el.second);
}
}
void datapath::windows::socket::on_write(::datapath::windows::overlapped& ov, std::size_t size, void* ptr)
{
// Write completed. Hopefully.
::datapath::error status = ov.status();
write_data_t el;
{ // Remove from queue, and spawn new write request if something is still queued.
std::lock_guard<std::recursive_mutex> lg(_write_queue_lock);
el = std::move(_write_queue.front());
_write_queue.pop();
switch (perform_write()) {
case error::SocketClosed: {
auto status = ::datapath::error::SocketClosed;
auto self = shared_from_this();
events.closed(status, self);
internal_events.closed(status, self);
return;
}
default:
return;
}
}
// Call callback.
std::get<::datapath::io_callback_t>(el)(shared_from_this(), status, std::get<::datapath::io_data_t>(el),
std::get<::datapath::io_callback_data_t>(el));
}
+82
View File
@@ -0,0 +1,82 @@
/*
Low Latency IPC Library for high-speed traffic
Copyright (C) 2019 Michael Fabian Dirks <info@xaymar.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <cstdint>
#include <memory>
#include <mutex>
#include <queue>
#include "datapath.hpp"
#include "socket.hpp"
#include "windows-overlapped.hpp"
namespace datapath::windows {
typedef uint32_t packet_size_t;
class socket : public ::datapath::socket, public std::enable_shared_from_this<::datapath::windows::socket> {
typedef std::pair<::datapath::io_callback_t, ::datapath::io_callback_data_t> read_data_t;
typedef std::tuple<::datapath::io_data_t, ::datapath::io_callback_t, ::datapath::io_callback_data_t>
write_data_t;
// Read Data
std::recursive_mutex _read_queue_lock;
std::queue<read_data_t> _read_queue;
io_data_t _read_buffer;
// Write Data
std::recursive_mutex _write_queue_lock;
std::queue<write_data_t> _write_queue;
protected: // Asynchronous IO
::datapath::windows::overlapped _ov_read;
::datapath::windows::overlapped _ov_write;
public /* Input/Output */:
virtual void read(::datapath::io_callback_t callback, ::datapath::io_callback_data_t callback_data) override;
virtual void write(const io_data_t& data, ::datapath::io_callback_t callback,
::datapath::io_callback_data_t callback_data) override;
virtual void write(const std::uint8_t* data, const size_t data_length, ::datapath::io_callback_t callback,
::datapath::io_callback_data_t callback_data) override;
public /* Internal Events */:
struct {
::datapath::event<::datapath::error, std::shared_ptr<::datapath::socket>> opened;
::datapath::event<::datapath::error, std::shared_ptr<::datapath::socket>> closed;
} internal_events;
private:
::datapath::error queue_read(::datapath::io_callback_t cb, ::datapath::io_callback_data_t cbd);
::datapath::error queue_write(const std::uint8_t* data, std::size_t length, ::datapath::io_callback_t cb,
::datapath::io_callback_data_t cbd);
::datapath::error perform_read();
::datapath::error perform_read_packet(packet_size_t size);
::datapath::error perform_write();
void on_read_header(::datapath::windows::overlapped& ov, std::size_t size, void* ptr);
void on_read(::datapath::windows::overlapped& ov, std::size_t size, void* ptr);
void on_write(::datapath::windows::overlapped& ov, std::size_t size, void* ptr);
};
} // namespace datapath::windows
@@ -18,20 +18,10 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
#include "task.hpp"
#define SIZE_ELEMENT uint32_t
void datapath::windows::task::_assign(const std::vector<char>& data, std::shared_ptr<datapath::windows::overlapped> ov)
{
this->buffer.resize(data.size() + sizeof(SIZE_ELEMENT));
std::memcpy(buffer.data() + sizeof(SIZE_ELEMENT), data.data(), data.size());
reinterpret_cast<SIZE_ELEMENT&>(buffer[0]) = SIZE_ELEMENT(data.size());
this->overlapped = ov;
}
datapath::windows::task::task()
{
this->_on_wait_error.add([this](datapath::error ec) { this->_on_failure(ec); });
this->_on_wait_success.add([this](datapath::error ec) { this->_on_success(ec, this->data()); });
_overlapped.set_callback(std::bind(&datapath::windows::task::handle_overlapped, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
}
datapath::windows::task::~task()
@@ -39,40 +29,40 @@ datapath::windows::task::~task()
cancel();
}
::datapath::windows::overlapped& datapath::windows::task::overlapped()
{
return _overlapped;
}
std::vector<char>& datapath::windows::task::get_data()
{
return _data;
}
void datapath::windows::task::handle_overlapped(::datapath::windows::overlapped& ov, size_t bytes, void* ptr)
{
DWORD _bytes = 0;
if (GetOverlappedResult(_overlapped.get_handle(), _overlapped.get_overlapped(), &_bytes, false)) {
}
}
datapath::error datapath::windows::task::cancel()
{
if (!overlapped) {
return datapath::error::Unknown;
}
if (!overlapped->is_completed()) {
overlapped->cancel();
return datapath::error::Success;
}
return datapath::error::Failure;
_overlapped.cancel();
return datapath::error::Success;
}
bool datapath::windows::task::is_completed()
{
if (!overlapped) {
return false;
}
return overlapped->is_completed();
return _overlapped.is_completed();
}
size_t datapath::windows::task::length()
{
return buffer.size();
return _data.size();
}
const std::vector<char>& datapath::windows::task::data()
{
return buffer;
}
void* datapath::windows::task::get_waitable()
{
if (!overlapped) {
return nullptr;
}
return overlapped->get_waitable();
return _data;
}
@@ -20,8 +20,6 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
#include <memory>
#include "itask.hpp"
#include "overlapped.hpp"
#include "server.hpp"
#include "socket.hpp"
extern "C" {
#include <Windows.h>
@@ -30,16 +28,19 @@ extern "C" {
namespace datapath {
namespace windows {
class task : public itask {
std::shared_ptr<datapath::windows::overlapped> overlapped;
std::vector<char> buffer;
protected:
void _assign(const std::vector<char>& data, std::shared_ptr<datapath::windows::overlapped> ov);
::datapath::windows::overlapped _overlapped;
std::vector<char> _data;
public:
task();
~task();
::datapath::windows::overlapped& overlapped();
std::vector<char>& get_data();
void handle_overlapped(::datapath::windows::overlapped& ov, size_t bytes, void* ptr);
public /*virtual override*/ /*itask*/:
virtual datapath::error cancel() override;
@@ -48,12 +49,6 @@ namespace datapath {
virtual size_t length() override;
virtual const std::vector<char>& data() override;
public /*virtual override*/ /*waitable*/:
virtual void* get_waitable() override;
friend class datapath::windows::socket;
friend class datapath::windows::server;
};
} // namespace windows
} // namespace datapath
@@ -28,6 +28,8 @@ extern "C" {
namespace datapath {
namespace windows {
namespace utility {
typedef uint64_t packet_size_t;
static inline bool make_pipe_path(std::string& string)
{
// Convert path to WinAPI compatible string.
@@ -56,6 +58,18 @@ namespace datapath {
SetEvent(lpOverlapped->hEvent);
}
static void shared_ptr_handle_deleter(HANDLE v)
{
CloseHandle(v);
}
inline void build_packet(const std::vector<char>& source, std::vector<char>& target)
{
target.resize(sizeof(packet_size_t) + source.size());
memcpy(target.data() + sizeof(packet_size_t), source.data(), source.size());
reinterpret_cast<packet_size_t&>(*target.data()) = source.size();
}
} // namespace utility
} // namespace windows
} // namespace datapath
View File