commit dbfc015d7256505645ee1c9f9b42ccb7175897a8 Author: xbazzi Date: Sun Jul 27 21:47:49 2025 -0600 Initial commit :rocket: diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..1d953f4 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use nix diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..950f9c5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +build/ +.direnv/ +.vscode \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..e2df7ab --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,96 @@ +cmake_minimum_required(VERSION 3.25) +project(grpc_trading CXX) + +include(cmake/common.cmake) + +find_package(gRPC REQUIRED) +find_package(Protobuf REQUIRED) +find_package(nlohmann_json REQUIRED) + +include_directories(${Protobuf_INCLUDE_DIRS}) + +# Proto file +get_filename_component(trading_proto "proto/trading.proto" ABSOLUTE) +get_filename_component(trading_proto_path "${trading_proto}" PATH) + +# Generate sources +set(trading_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/trading.pb.cc") +set(trading_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/trading.pb.h") +set(trading_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/trading.grpc.pb.cc") +set(trading_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/trading.grpc.pb.h") + +add_custom_command( + OUTPUT "${trading_proto_srcs}" "${trading_proto_hdrs}" "${trading_grpc_srcs}" "${trading_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${trading_proto_path}" + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${trading_proto}" + DEPENDS "${trading_proto}" +) + +# Include generated *.pb.h files +include_directories("${CMAKE_SOURCE_DIR}/include") + +# Glob all source files in src/ +file(GLOB_RECURSE SRC_FILES CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/src/*.cc") + +set(CMAKE_CXX_STANDARD 23) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +add_executable(fastinahurry ${SRC_FILES} main.cc) +set_target_properties(fastinahurry PROPERTIES + CXX_STANDARD 23 + CXX_STANDARD_REQUIRED YES +) + +# add_executable(fastinahurry ${SRC_FILES} main.cc) +# add_executable(fastinahurry ${SRC_FILES} main.cc) + +target_link_libraries(fastinahurry PRIVATE + trading_grpc_proto + nlohmann_json::nlohmann_json + ${_REFLECTION} + ${_GRPC_GRPCPP} + ${_PROTOBUF_LIBPROTOBUF} +) + +target_include_directories(fastinahurry PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_CURRENT_BINARY_DIR} # For generated *.pb.h +) + +add_library(trading_grpc_proto + ${trading_grpc_srcs} + ${trading_grpc_hdrs} + ${trading_proto_srcs} + ${trading_proto_hdrs}) + +target_compile_features(fastinahurry PRIVATE cxx_std_23) + +# target_link_libraries(trading_grpc_proto +# # absl::check +# ${_REFLECTION} +# ${_GRPC_GRPCPP} +# ${_PROTOBUF_LIBPROTOBUF}) + +# add_executable(server src/grpc_server.cc) +# target_link_libraries(server trading_grpc_proto gRPC::grpc++) +# target_include_directories(server PRIVATE "${CMAKE_CURRENT_BINARY_DIR}") + +# add_executable(client src/grpc_client.cc) +# target_link_libraries(client trading_grpc_proto gRPC::grpc++) +# target_include_directories(client PRIVATE "${CMAKE_CURRENT_BINARY_DIR}") + +# foreach(_target +# greeter_client greeter_server +# greeter_callback_client greeter_callback_server +# greeter_async_client greeter_async_client2 greeter_async_server) +# add_executable(${_target} "{CMAKE_BUILD_DIR}${_target}.cc") +# target_link_libraries(${_target} +# trading_grpc_proto +# ${_REFLECTION} +# ${_GRPC_GRPCPP} +# ${_PROTOBUF_LIBPROTOBUF}) +# endforeach() \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6d7f5d1 --- /dev/null +++ b/Makefile @@ -0,0 +1,3 @@ +all: + mkdir -p build + cd build && cmake .. && make -j$(nproc) \ No newline at end of file diff --git a/cmake/common.cmake b/cmake/common.cmake new file mode 100644 index 0000000..3451d7f --- /dev/null +++ b/cmake/common.cmake @@ -0,0 +1,125 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# cmake build file for C++ route_guide example. +# Assumes protobuf and gRPC have been installed using cmake. +# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build +# that automatically builds all the dependencies before building route_guide. + +cmake_minimum_required(VERSION 3.16) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED True) + +if(MSVC) + add_definitions(-D_WIN32_WINNT=0x600) +endif() + +find_package(Threads REQUIRED) + +if(GRPC_AS_SUBMODULE) + # One way to build a projects that uses gRPC is to just include the + # entire gRPC project tree via "add_subdirectory". + # This approach is very simple to use, but the are some potential + # disadvantages: + # * it includes gRPC's CMakeLists.txt directly into your build script + # without and that can make gRPC's internal setting interfere with your + # own build. + # * depending on what's installed on your system, the contents of submodules + # in gRPC's third_party/* might need to be available (and there might be + # additional prerequisites required to build them). Consider using + # the gRPC_*_PROVIDER options to fine-tune the expected behavior. + # + # A more robust approach to add dependency on gRPC is using + # cmake's ExternalProject_Add (see cmake_externalproject/CMakeLists.txt). + + # Include the gRPC's cmake build (normally grpc source code would live + # in a git submodule called "third_party/grpc", but this example lives in + # the same repository as gRPC sources, so we just look a few directories up) + add_subdirectory(../../.. ${CMAKE_CURRENT_BINARY_DIR}/grpc EXCLUDE_FROM_ALL) + message(STATUS "Using gRPC via add_subdirectory.") + + # After using add_subdirectory, we can now use the grpc targets directly from + # this build. + set(_PROTOBUF_LIBPROTOBUF libprotobuf) + set(_REFLECTION grpc++_reflection) + set(_ORCA_SERVICE grpcpp_orca_service) + if(CMAKE_CROSSCOMPILING) + find_program(_PROTOBUF_PROTOC protoc) + else() + set(_PROTOBUF_PROTOC $) + endif() + set(_GRPC_GRPCPP grpc++) + if(CMAKE_CROSSCOMPILING) + find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin) + else() + set(_GRPC_CPP_PLUGIN_EXECUTABLE $) + endif() +elseif(GRPC_FETCHCONTENT) + # Another way is to use CMake's FetchContent module to clone gRPC at + # configure time. This makes gRPC's source code available to your project, + # similar to a git submodule. + message(STATUS "Using gRPC via add_subdirectory (FetchContent).") + include(FetchContent) + FetchContent_Declare( + grpc + GIT_REPOSITORY https://github.com/grpc/grpc.git + # when using gRPC, you will actually set this to an existing tag, such as + # v1.25.0, v1.26.0 etc.. + # For the purpose of testing, we override the tag used to the commit + # that's currently under test. + GIT_TAG vGRPC_TAG_VERSION_OF_YOUR_CHOICE) + FetchContent_MakeAvailable(grpc) + + # Since FetchContent uses add_subdirectory under the hood, we can use + # the grpc targets directly from this build. + set(_PROTOBUF_LIBPROTOBUF libprotobuf) + set(_REFLECTION grpc++_reflection) + set(_PROTOBUF_PROTOC $) + set(_GRPC_GRPCPP grpc++) + if(CMAKE_CROSSCOMPILING) + find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin) + else() + set(_GRPC_CPP_PLUGIN_EXECUTABLE $) + endif() +else() + # This branch assumes that gRPC and all its dependencies are already installed + # on this system, so they can be located by find_package(). + + # Find Protobuf installation + # Looks for protobuf-config.cmake file installed by Protobuf's cmake installation. + option(protobuf_MODULE_COMPATIBLE TRUE) + find_package(Protobuf CONFIG REQUIRED) + message(STATUS "Using protobuf ${Protobuf_VERSION}") + + set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf) + set(_REFLECTION gRPC::grpc++_reflection) + if(CMAKE_CROSSCOMPILING) + find_program(_PROTOBUF_PROTOC protoc) + else() + set(_PROTOBUF_PROTOC $) + endif() + + # Find gRPC installation + # Looks for gRPCConfig.cmake file installed by gRPC's cmake installation. + find_package(gRPC CONFIG REQUIRED) + message(STATUS "Using gRPC ${gRPC_VERSION}") + + set(_GRPC_GRPCPP gRPC::grpc++) + if(CMAKE_CROSSCOMPILING) + find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin) + else() + set(_GRPC_CPP_PLUGIN_EXECUTABLE $) + endif() +endif() diff --git a/common.cmake b/common.cmake new file mode 100644 index 0000000..3451d7f --- /dev/null +++ b/common.cmake @@ -0,0 +1,125 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# cmake build file for C++ route_guide example. +# Assumes protobuf and gRPC have been installed using cmake. +# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build +# that automatically builds all the dependencies before building route_guide. + +cmake_minimum_required(VERSION 3.16) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED True) + +if(MSVC) + add_definitions(-D_WIN32_WINNT=0x600) +endif() + +find_package(Threads REQUIRED) + +if(GRPC_AS_SUBMODULE) + # One way to build a projects that uses gRPC is to just include the + # entire gRPC project tree via "add_subdirectory". + # This approach is very simple to use, but the are some potential + # disadvantages: + # * it includes gRPC's CMakeLists.txt directly into your build script + # without and that can make gRPC's internal setting interfere with your + # own build. + # * depending on what's installed on your system, the contents of submodules + # in gRPC's third_party/* might need to be available (and there might be + # additional prerequisites required to build them). Consider using + # the gRPC_*_PROVIDER options to fine-tune the expected behavior. + # + # A more robust approach to add dependency on gRPC is using + # cmake's ExternalProject_Add (see cmake_externalproject/CMakeLists.txt). + + # Include the gRPC's cmake build (normally grpc source code would live + # in a git submodule called "third_party/grpc", but this example lives in + # the same repository as gRPC sources, so we just look a few directories up) + add_subdirectory(../../.. ${CMAKE_CURRENT_BINARY_DIR}/grpc EXCLUDE_FROM_ALL) + message(STATUS "Using gRPC via add_subdirectory.") + + # After using add_subdirectory, we can now use the grpc targets directly from + # this build. + set(_PROTOBUF_LIBPROTOBUF libprotobuf) + set(_REFLECTION grpc++_reflection) + set(_ORCA_SERVICE grpcpp_orca_service) + if(CMAKE_CROSSCOMPILING) + find_program(_PROTOBUF_PROTOC protoc) + else() + set(_PROTOBUF_PROTOC $) + endif() + set(_GRPC_GRPCPP grpc++) + if(CMAKE_CROSSCOMPILING) + find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin) + else() + set(_GRPC_CPP_PLUGIN_EXECUTABLE $) + endif() +elseif(GRPC_FETCHCONTENT) + # Another way is to use CMake's FetchContent module to clone gRPC at + # configure time. This makes gRPC's source code available to your project, + # similar to a git submodule. + message(STATUS "Using gRPC via add_subdirectory (FetchContent).") + include(FetchContent) + FetchContent_Declare( + grpc + GIT_REPOSITORY https://github.com/grpc/grpc.git + # when using gRPC, you will actually set this to an existing tag, such as + # v1.25.0, v1.26.0 etc.. + # For the purpose of testing, we override the tag used to the commit + # that's currently under test. + GIT_TAG vGRPC_TAG_VERSION_OF_YOUR_CHOICE) + FetchContent_MakeAvailable(grpc) + + # Since FetchContent uses add_subdirectory under the hood, we can use + # the grpc targets directly from this build. + set(_PROTOBUF_LIBPROTOBUF libprotobuf) + set(_REFLECTION grpc++_reflection) + set(_PROTOBUF_PROTOC $) + set(_GRPC_GRPCPP grpc++) + if(CMAKE_CROSSCOMPILING) + find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin) + else() + set(_GRPC_CPP_PLUGIN_EXECUTABLE $) + endif() +else() + # This branch assumes that gRPC and all its dependencies are already installed + # on this system, so they can be located by find_package(). + + # Find Protobuf installation + # Looks for protobuf-config.cmake file installed by Protobuf's cmake installation. + option(protobuf_MODULE_COMPATIBLE TRUE) + find_package(Protobuf CONFIG REQUIRED) + message(STATUS "Using protobuf ${Protobuf_VERSION}") + + set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf) + set(_REFLECTION gRPC::grpc++_reflection) + if(CMAKE_CROSSCOMPILING) + find_program(_PROTOBUF_PROTOC protoc) + else() + set(_PROTOBUF_PROTOC $) + endif() + + # Find gRPC installation + # Looks for gRPCConfig.cmake file installed by gRPC's cmake installation. + find_package(gRPC CONFIG REQUIRED) + message(STATUS "Using gRPC ${gRPC_VERSION}") + + set(_GRPC_GRPCPP gRPC::grpc++) + if(CMAKE_CROSSCOMPILING) + find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin) + else() + set(_GRPC_CPP_PLUGIN_EXECUTABLE $) + endif() +endif() diff --git a/include/Algo.hh b/include/Algo.hh new file mode 100644 index 0000000..849e2b8 --- /dev/null +++ b/include/Algo.hh @@ -0,0 +1,40 @@ +#pragma once + +// C++ Includes +#include +#include +#include +#include +#include + + +// FastInAHurry includes +#include "Publisher.hh" +#include "utils/ThreadSafeQueue.hh" +#include "utils/ThreadPool.hh" + + +class Algo { +public: + Algo(); + + void initialize(); + void generate_orders(); + void process(); + void send(trading::Order&); + bool initialized(); + void stop(); +private: + std::queue _orders; + std::atomic _initialized{false}; + std::atomic _running{false}; + std::shared_ptr _pub; + + utils::ThreadSafeQueue _order_queue; + utils::ThreadPool _thread_pool; + + std::mutex _send_mutex; + std::jthread _reader_thread; + std::vector _worker_threads; + std::vector> _futures; +}; \ No newline at end of file diff --git a/include/Controller.hh b/include/Controller.hh new file mode 100644 index 0000000..269ae9b --- /dev/null +++ b/include/Controller.hh @@ -0,0 +1,9 @@ +#include "Algo.hh" + +class Controller { +public: + explicit Controller(int, char*[]); + void start(); +private: + std::unique_ptr _algo; +}; \ No newline at end of file diff --git a/include/Publisher.hh b/include/Publisher.hh new file mode 100644 index 0000000..37e6e0e --- /dev/null +++ b/include/Publisher.hh @@ -0,0 +1,14 @@ +#pragma once + +#include "trading.grpc.pb.h" +#include +#include + +class Publisher { +public: + explicit Publisher(std::shared_ptr channel); + + grpc::Status send_order(const trading::Order& order); +private: + std::unique_ptr _stub; +}; \ No newline at end of file diff --git a/include/readers/JSONReader.hh b/include/readers/JSONReader.hh new file mode 100644 index 0000000..c5923a5 --- /dev/null +++ b/include/readers/JSONReader.hh @@ -0,0 +1,17 @@ +#pragma once + +// C++ Includes +#include +#include +#include + +// Third Party Includes +#include "trading.pb.h" +#include + +namespace readers::json { +using JSON = nlohmann::json; +using OrderQueue = std::queue; + +OrderQueue read_orders_from_json(const std::string& filename); +} // End json namespace \ No newline at end of file diff --git a/include/utils/ThreadPool.hh b/include/utils/ThreadPool.hh new file mode 100644 index 0000000..0ca8f84 --- /dev/null +++ b/include/utils/ThreadPool.hh @@ -0,0 +1,46 @@ +// C++ Includes +#include +#include +#include +#include +#include +#include +#include + +namespace utils { + +class ThreadPool { +private: + std::vector _workers; + std::queue> _tasks; + + std::mutex _queue_mutex; + std::condition_variable _cv; + bool _stop{false}; + +public: + explicit ThreadPool(std::size_t threads = std::thread::hardware_concurrency()); + ~ThreadPool(); + + template + auto enqueue(F&& f, Args&&... args) -> std::future> { + using return_type = std::invoke_result_t; + + auto task_ptr = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...) + ); + + std::future result = task_ptr->get_future(); + + { + std::unique_lock lock(_queue_mutex); + if (_stop) + throw std::runtime_error("Tried to enqueue on stopped ThreadPool"); + + _tasks.emplace([task_ptr]() { (*task_ptr)(); }); + } + _cv.notify_one(); + return result; + } +}; +} // End utils namespace \ No newline at end of file diff --git a/include/utils/ThreadSafeQueue.hh b/include/utils/ThreadSafeQueue.hh new file mode 100644 index 0000000..c752ae1 --- /dev/null +++ b/include/utils/ThreadSafeQueue.hh @@ -0,0 +1,35 @@ +// C++ Includes +#include +#include +#include +#include +#include + +namespace utils { + +template +class ThreadSafeQueue { +private: + std::queue _queue; + std::mutex _mutex; + std::condition_variable _cv; + +public: + void push(T value) { + + { + std::lock_guard lock(_mutex); + _queue.push(std::move(value)); + } + _cv.notify_one(); + } + + T wait_and_pop() { + std::unique_lock lock(_mutex); + _cv.wait(lock, [&] { return !_queue.empty(); }); + T val = std::move(_queue.front()); + _queue.pop(); + return val; + } +}; +} // End utils namespace \ No newline at end of file diff --git a/main.cc b/main.cc new file mode 100644 index 0000000..9f716ca --- /dev/null +++ b/main.cc @@ -0,0 +1,19 @@ +// C++ Includes +#include +#include +#include + +// FastInAHurry Includes +#include +#include + +int main(int argc, char* argv[]) { + for (uint8_t i = 0; i < argc; i++) { + std::printf("argument[%d]: %s\n", i, argv[i]); + } + assert(2 + 2 == 4); + Controller ctlr(argc, argv); + ctlr.start(); + + return 0; +} \ No newline at end of file diff --git a/orders.json b/orders.json new file mode 100644 index 0000000..7129373 --- /dev/null +++ b/orders.json @@ -0,0 +1,18 @@ +[ + { "symbol": "AAPL", "quantity": 10, "price": 189.5, "side": "BUY" }, + { "symbol": "META", "quantity": 20, "price": 225.0, "side": "SELL" }, + { "symbol": "JAVI", "quantity": 9, "price": 100, "side": "BUY" }, + { "symbol": "EVAN", "quantity": 8, "price": 104, "side": "BUY" }, + { "symbol": "CARTI", "quantity": 7, "price": 3105.0, "side": "BUY" }, + { "symbol": "MCLOVIN", "quantity": 12, "price": 3121.0, "side": "BUY" }, + { "symbol": "XANDER", "quantity": 15, "price": 3107.0, "side": "BUY" }, + { "symbol": "ADVB", "quantity": 5, "price": 3101.0, "side": "BUY" }, + { "symbol": "fvasd", "quantity": 8, "price": 3102.0, "side": "BUY" }, + { "symbol": "asdaad", "quantity": 10, "price": 3107.0, "side": "BUY" }, + { "symbol": "JSTSXc", "quantity": 8, "price": 3105.0, "side": "BUY" }, + { "symbol": "HEDGE", "quantity": 6, "price": 3111.0, "side": "BUY" }, + { "symbol": "MAC10", "quantity": 9, "price": 3113.0, "side": "BUY" }, + { "symbol": "SAWEDOFF", "quantity": 4, "price": 3112.0, "side": "BUY" }, + { "symbol": "S&P", "quantity": 2, "price": 3104.0, "side": "BUY" }, + { "symbol": "THREEHUNNID", "quantity": 2, "price": 3105.0, "side": "BUY" } +] diff --git a/proto/msg.proto b/proto/msg.proto new file mode 100644 index 0000000..3a9702c --- /dev/null +++ b/proto/msg.proto @@ -0,0 +1,16 @@ +service Greeter { + rpc SayHello (HelloRequest) returns (HelloReply) {} +} +message HelloRequest { + string name = 1; +} + +message HelloReply { + string message = 1; +} + +message Ticket { + string name = 1; + int32 id = 2; + bool is_valid = 3; +} \ No newline at end of file diff --git a/proto/ticket_exchange.proto b/proto/ticket_exchange.proto new file mode 100644 index 0000000..7ce62ae --- /dev/null +++ b/proto/ticket_exchange.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package ticket_exchange; + +service StartHandshake { + rpc SayHello (HelloRequest) returns (HelloReply); +} + +service HelloRequest { + string name = 1; +} + +message HelloReply { + string message = 1; +} \ No newline at end of file diff --git a/proto/trading.proto b/proto/trading.proto new file mode 100644 index 0000000..542a4d9 --- /dev/null +++ b/proto/trading.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +package trading; + +service TradingService { + rpc SendOrder(Order) returns (OrderAck); + rpc StreamMarketData(MarketRequest) returns (stream MarketData); +} + +enum Side { + SIDE_UNSPECIFIED = 0; + BUY = 1; + SELL = 2; +} + +message Order { + string symbol = 1; + int32 quantity = 2; + double price = 3; + Side side = 4; // "buy" or "sell" +} + +message OrderAck { + bool success = 1; + string message = 2; +} + +message MarketRequest { + string symbol = 1; +} + +message MarketData { + string symbol = 1; + double price = 2; + int64 timestamp = 3; +} diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..c575697 --- /dev/null +++ b/shell.nix @@ -0,0 +1,18 @@ +{ pkgs ? import {} }: + +pkgs.mkShell { + name = "fastinahurry-dev"; + buildInputs = with pkgs; [ + openssl + gcc + cmake + gdb + pkg-config + boost + valgrind + zsh + grpc + protobuf + nlohmann_json + ]; +} diff --git a/src/Algo.cc b/src/Algo.cc new file mode 100644 index 0000000..1df0e98 --- /dev/null +++ b/src/Algo.cc @@ -0,0 +1,108 @@ +// C++ Includes +#include + +// FastInAHurry Includes +#include +#include + +// Third Party Includes +#include + +Algo::Algo() { + std::cout << "Algo created" << std::endl; +} + +void Algo::initialize() { + std::string server_addr = "localhost:50051"; + auto channel = grpc::CreateChannel(server_addr, grpc::InsecureChannelCredentials()); + _pub = std::make_shared(channel); + + _initialized = true; +} + +void Algo::stop() { + // _running = false; + + _reader_thread.request_stop(); + for (auto& t : _worker_threads) { + t.request_stop(); + } + + for (auto& future : _futures) { + bool ok = future.get(); + if (!ok) { + std::cerr << "[Warning] Some orders failed to send" << std::endl; + } + } +} + +void Algo::generate_orders() { + _orders = readers::json::read_orders_from_json("orders.json"); +} + +bool Algo::initialized() { + return _initialized; +} + +void Algo::process() { + // Thread to read from file source and enqueue + _reader_thread = std::jthread([this](std::stop_token stoken) { + while (!stoken.stop_requested()) { + auto orders = readers::json::read_orders_from_json("orders.json"); + while(!orders.empty()) { + _order_queue.push(orders.front()); + orders.pop(); + } + } + }); + + std::size_t num_workers = 4; + for (std::size_t i = 0; i < num_workers; ++i) { + _worker_threads.emplace_back([this] (std::stop_token stoken) { + while(!stoken.stop_requested()) { + trading::Order order = _order_queue.wait_and_pop(); + + auto future = _thread_pool.enqueue([this, order]() -> bool { + grpc::Status status; + { + std::lock_guard lock(_send_mutex); + status = _pub->send_order(order); + } + + if (!status.ok()) { + std::cerr << "[Error] Failed to send order: " + << order.symbol() << "\n"; + return false; + } + + std::cout << "[Success] Sent order for " + << order.symbol() << "\n"; + return true; + }); + + if (future.wait_for(std::chrono::milliseconds(10)) == std::future_status::ready) { + bool ok = future.get(); + if (!ok) { + std::cerr << "[Inline] Order failed immediately for " << order.symbol() << '\n'; + } + } else { + _futures.push_back(std::move(future)); + } + } + }); + } + + // while(_running) { + // this->generate_orders(); + // while (!_orders.empty()) { + // auto order = _orders.front(); + // _orders.pop(); + // send(order); + // } + // sleep(1); + // } +} + +void Algo::send(trading::Order& order) { + _pub->send_order(order); +} \ No newline at end of file diff --git a/src/Controller.cc b/src/Controller.cc new file mode 100644 index 0000000..2962930 --- /dev/null +++ b/src/Controller.cc @@ -0,0 +1,14 @@ +#include "Controller.hh" + +Controller::Controller(int argc, char* argv[]) + : _algo(new Algo()) { + std::cout << "argc: " << argc << std::endl; + for (uint16_t i = 1; i < argc; i++) { + std::cout << argv[i] << " "; + } std::cout << std::endl; +} + +void Controller::start() { + if (!_algo->initialized()) _algo->initialize(); + _algo->process(); +} diff --git a/src/Publisher.cc b/src/Publisher.cc new file mode 100644 index 0000000..fc76beb --- /dev/null +++ b/src/Publisher.cc @@ -0,0 +1,34 @@ +#include "Publisher.hh" +#include + + +Publisher::Publisher(std::shared_ptr channel) + : _stub(trading::TradingService::NewStub(channel)) {} + +grpc::Status Publisher::send_order(const trading::Order& order) { + grpc::ClientContext context; + trading::OrderAck ack; + + grpc::Status status = _stub->SendOrder(&context, order, &ack); + + if (!ack.success()) { + std::cerr << "[Error] Order rejected by server: " << order.symbol() << std::endl; + } + std::cout << "Ack received: " << ack.message() << std::endl; + return status; +} + +// void StreamMarketData() { +// MarketRequest req; +// req.set_symbol("AAPL"); + +// ClientContext context; +// std::unique_ptr> reader( +// stub_->StreamMarketData(&context, req)); + +// MarketData data; +// while (reader->Read(&data)) { +// std::cout << "Market: " << data.symbol() << " $" << data.price() +// << " @ " << data.timestamp() << std::endl; +// } +// } \ No newline at end of file diff --git a/src/grpc_client.cc b/src/grpc_client.cc new file mode 100644 index 0000000..564a6e3 --- /dev/null +++ b/src/grpc_client.cc @@ -0,0 +1,61 @@ +#include "trading.grpc.pb.h" +#include +#include + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; +using trading::TradingService; +using trading::Order; +using trading::OrderAck; +using trading::MarketRequest; +using trading::MarketData; + +class TraderClient { + public: + TraderClient(std::shared_ptr channel) + : stub_(TradingService::NewStub(channel)) {} + + // void SendOrder() { + // Order order; + // order.set_symbol("AAPL"); + // order.set_quantity(100); + // order.set_price(150.0); + // order.set_side(); + + // OrderAck ack; + // ClientContext context; + + // Status status = stub_->SendOrder(&context, order, &ack); + // if (status.ok()) { + // std::cout << "Order ack: " << ack.message() << std::endl; + // } else { + // std::cerr << "SendOrder failed: " << status.error_message() << std::endl; + // } + // } + + // void StreamMarketData() { + // MarketRequest req; + // req.set_symbol("AAPL"); + + // ClientContext context; + // std::unique_ptr> reader( + // stub_->StreamMarketData(&context, req)); + + // MarketData data; + // while (reader->Read(&data)) { + // std::cout << "Market: " << data.symbol() << " $" << data.price() + // << " @ " << data.timestamp() << std::endl; + // } + // } + + private: + std::unique_ptr stub_; +}; + +// int main() { +// TraderClient client(grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials())); +// client.SendOrder(); +// client.StreamMarketData(); +// return 0; +// } \ No newline at end of file diff --git a/src/grpc_server.cc b/src/grpc_server.cc new file mode 100644 index 0000000..403f44a --- /dev/null +++ b/src/grpc_server.cc @@ -0,0 +1,60 @@ +#include "trading.grpc.pb.h" +#include +#include +#include + +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerContext; +using grpc::Status; +using trading::TradingService; +using trading::Order; +using trading::OrderAck; +using trading::MarketRequest; +using trading::MarketData; + +class TradingServiceImpl final : public TradingService::Service { + Status SendOrder(ServerContext* ctx, const Order* req, + OrderAck* res) override { + std::cout << "Order: " << req->symbol() << " " << req->side() + << " " << req->quantity() << " " << req->price() << std::endl; + + res->set_success(true); + res->set_message("Order received"); + return Status::OK; + } + + Status StreamMarketData(ServerContext* ctx, const MarketRequest* req, + grpc::ServerWriter* writer) override { + + uint32_t max_market_streams = 5; + for (uint32_t i = 0; i < max_market_streams; i++) { + MarketData data; + data.set_symbol(req->symbol()); + data.set_price(100 + i); + data.set_timestamp(time(nullptr)); + writer->Write(data); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + return Status::OK; + } +}; + + void RunServer() { + std::string addr("0.0.0.0:50051"); + TradingServiceImpl service; + + ServerBuilder builder; + builder.AddListeningPort(addr, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); + std::unique_ptr server(builder.BuildAndStart()); + + std::cout << "Market server listening on" << addr << std::endl; + + server->Wait(); + } + +// int main() { +// RunServer(); +// return 0; +// } \ No newline at end of file diff --git a/src/readers/JSONReader.cc b/src/readers/JSONReader.cc new file mode 100644 index 0000000..3a5b0e1 --- /dev/null +++ b/src/readers/JSONReader.cc @@ -0,0 +1,46 @@ +// C++ Includes +#include +#include + +// FastInAHurry Includes +#include "readers/JSONReader.hh" +#include + +// Third Party Includes +#include "trading.pb.h" + +namespace readers::json { + +OrderQueue read_orders_from_json(const std::string& filename) { + OrderQueue orders; + + std::ifstream in_file(filename); + if (!in_file.is_open()) { + std::cerr << "Failed to open JSON file: " << filename << std::endl; + return orders; + } + + JSON j; + in_file >> j; + + for (const auto& item : j) { + trading::Order order; + order.set_symbol(item.at("symbol").get()); + order.set_quantity(item.at("quantity").get()); + order.set_price(item.at("price").get()); + + std::string side = item.at("side").get(); + if (side == "BUY") { + order.set_side(trading::BUY); + } else if (side == "SELL") { + order.set_side(trading::SELL); + } else { + order.set_side(trading::SIDE_UNSPECIFIED); + } + + orders.push(order); + } + + return orders; +} +} // End json namespace \ No newline at end of file diff --git a/src/utils/ThreadPool.cc b/src/utils/ThreadPool.cc new file mode 100644 index 0000000..e62c063 --- /dev/null +++ b/src/utils/ThreadPool.cc @@ -0,0 +1,46 @@ +// C++ Includes +#include +#include +#include +#include +#include +#include +#include +#include "utils/ThreadPool.hh" + +namespace utils { + +ThreadPool::ThreadPool(std::size_t threads) { + for (std::size_t i = 0; i < threads; ++i) { + _workers.emplace_back([this] { + while (true) { + std::function task; + { + std::unique_lock lock(_queue_mutex); + _cv.wait(lock, [this] { + return _stop || !_tasks.empty(); + }); + if (_stop && _tasks.empty()) return; + task = std::move(_tasks.front()); + _tasks.pop(); + } + task(); + } + }); + } +} + +ThreadPool::~ThreadPool() { + { + std::unique_lock lock(_queue_mutex); + _stop = true; + } + _cv.notify_all(); + + for (auto& worker : _workers) { + if (worker.joinable()) worker.join(); + } + +} + +} // End utils namespace \ No newline at end of file diff --git a/src/utils/ThreadSafeQueue.cc b/src/utils/ThreadSafeQueue.cc new file mode 100644 index 0000000..e3cac34 --- /dev/null +++ b/src/utils/ThreadSafeQueue.cc @@ -0,0 +1,11 @@ +// C++ Includes +// #include +// #include +// #include +// #include +// #include +// #include "utils/ThreadSafeQueue.hh" + +// namespace utils { + +// } // End utils namespace \ No newline at end of file