Initial commit 🚀

This commit is contained in:
xbazzi 2025-07-27 21:47:49 -06:00
commit dbfc015d72
26 changed files with 1016 additions and 0 deletions

1
.envrc Normal file
View File

@ -0,0 +1 @@
use nix

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
build/
.direnv/
.vscode

96
CMakeLists.txt Normal file
View File

@ -0,0 +1,96 @@
cmake_minimum_required(VERSION 3.25)
project(grpc_trading CXX)
include(cmake/common.cmake)
find_package(gRPC REQUIRED)
find_package(Protobuf REQUIRED)
find_package(nlohmann_json REQUIRED)
include_directories(${Protobuf_INCLUDE_DIRS})
# Proto file
get_filename_component(trading_proto "proto/trading.proto" ABSOLUTE)
get_filename_component(trading_proto_path "${trading_proto}" PATH)
# Generate sources
set(trading_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/trading.pb.cc")
set(trading_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/trading.pb.h")
set(trading_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/trading.grpc.pb.cc")
set(trading_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/trading.grpc.pb.h")
add_custom_command(
OUTPUT "${trading_proto_srcs}" "${trading_proto_hdrs}" "${trading_grpc_srcs}" "${trading_grpc_hdrs}"
COMMAND ${_PROTOBUF_PROTOC}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${trading_proto_path}"
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
"${trading_proto}"
DEPENDS "${trading_proto}"
)
# Include generated *.pb.h files
include_directories("${CMAKE_SOURCE_DIR}/include")
# Glob all source files in src/
file(GLOB_RECURSE SRC_FILES CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/src/*.cc")
set(CMAKE_CXX_STANDARD 23)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
add_executable(fastinahurry ${SRC_FILES} main.cc)
set_target_properties(fastinahurry PROPERTIES
CXX_STANDARD 23
CXX_STANDARD_REQUIRED YES
)
# add_executable(fastinahurry ${SRC_FILES} main.cc)
# add_executable(fastinahurry ${SRC_FILES} main.cc)
target_link_libraries(fastinahurry PRIVATE
trading_grpc_proto
nlohmann_json::nlohmann_json
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF}
)
target_include_directories(fastinahurry PRIVATE
${CMAKE_SOURCE_DIR}/include
${CMAKE_CURRENT_BINARY_DIR} # For generated *.pb.h
)
add_library(trading_grpc_proto
${trading_grpc_srcs}
${trading_grpc_hdrs}
${trading_proto_srcs}
${trading_proto_hdrs})
target_compile_features(fastinahurry PRIVATE cxx_std_23)
# target_link_libraries(trading_grpc_proto
# # absl::check
# ${_REFLECTION}
# ${_GRPC_GRPCPP}
# ${_PROTOBUF_LIBPROTOBUF})
# add_executable(server src/grpc_server.cc)
# target_link_libraries(server trading_grpc_proto gRPC::grpc++)
# target_include_directories(server PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
# add_executable(client src/grpc_client.cc)
# target_link_libraries(client trading_grpc_proto gRPC::grpc++)
# target_include_directories(client PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
# foreach(_target
# greeter_client greeter_server
# greeter_callback_client greeter_callback_server
# greeter_async_client greeter_async_client2 greeter_async_server)
# add_executable(${_target} "{CMAKE_BUILD_DIR}${_target}.cc")
# target_link_libraries(${_target}
# trading_grpc_proto
# ${_REFLECTION}
# ${_GRPC_GRPCPP}
# ${_PROTOBUF_LIBPROTOBUF})
# endforeach()

3
Makefile Normal file
View File

@ -0,0 +1,3 @@
all:
mkdir -p build
cd build && cmake .. && make -j$(nproc)

125
cmake/common.cmake Normal file
View File

@ -0,0 +1,125 @@
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# cmake build file for C++ route_guide example.
# Assumes protobuf and gRPC have been installed using cmake.
# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build
# that automatically builds all the dependencies before building route_guide.
cmake_minimum_required(VERSION 3.16)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED True)
if(MSVC)
add_definitions(-D_WIN32_WINNT=0x600)
endif()
find_package(Threads REQUIRED)
if(GRPC_AS_SUBMODULE)
# One way to build a projects that uses gRPC is to just include the
# entire gRPC project tree via "add_subdirectory".
# This approach is very simple to use, but the are some potential
# disadvantages:
# * it includes gRPC's CMakeLists.txt directly into your build script
# without and that can make gRPC's internal setting interfere with your
# own build.
# * depending on what's installed on your system, the contents of submodules
# in gRPC's third_party/* might need to be available (and there might be
# additional prerequisites required to build them). Consider using
# the gRPC_*_PROVIDER options to fine-tune the expected behavior.
#
# A more robust approach to add dependency on gRPC is using
# cmake's ExternalProject_Add (see cmake_externalproject/CMakeLists.txt).
# Include the gRPC's cmake build (normally grpc source code would live
# in a git submodule called "third_party/grpc", but this example lives in
# the same repository as gRPC sources, so we just look a few directories up)
add_subdirectory(../../.. ${CMAKE_CURRENT_BINARY_DIR}/grpc EXCLUDE_FROM_ALL)
message(STATUS "Using gRPC via add_subdirectory.")
# After using add_subdirectory, we can now use the grpc targets directly from
# this build.
set(_PROTOBUF_LIBPROTOBUF libprotobuf)
set(_REFLECTION grpc++_reflection)
set(_ORCA_SERVICE grpcpp_orca_service)
if(CMAKE_CROSSCOMPILING)
find_program(_PROTOBUF_PROTOC protoc)
else()
set(_PROTOBUF_PROTOC $<TARGET_FILE:protobuf::protoc>)
endif()
set(_GRPC_GRPCPP grpc++)
if(CMAKE_CROSSCOMPILING)
find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin)
else()
set(_GRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:grpc_cpp_plugin>)
endif()
elseif(GRPC_FETCHCONTENT)
# Another way is to use CMake's FetchContent module to clone gRPC at
# configure time. This makes gRPC's source code available to your project,
# similar to a git submodule.
message(STATUS "Using gRPC via add_subdirectory (FetchContent).")
include(FetchContent)
FetchContent_Declare(
grpc
GIT_REPOSITORY https://github.com/grpc/grpc.git
# when using gRPC, you will actually set this to an existing tag, such as
# v1.25.0, v1.26.0 etc..
# For the purpose of testing, we override the tag used to the commit
# that's currently under test.
GIT_TAG vGRPC_TAG_VERSION_OF_YOUR_CHOICE)
FetchContent_MakeAvailable(grpc)
# Since FetchContent uses add_subdirectory under the hood, we can use
# the grpc targets directly from this build.
set(_PROTOBUF_LIBPROTOBUF libprotobuf)
set(_REFLECTION grpc++_reflection)
set(_PROTOBUF_PROTOC $<TARGET_FILE:protoc>)
set(_GRPC_GRPCPP grpc++)
if(CMAKE_CROSSCOMPILING)
find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin)
else()
set(_GRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:grpc_cpp_plugin>)
endif()
else()
# This branch assumes that gRPC and all its dependencies are already installed
# on this system, so they can be located by find_package().
# Find Protobuf installation
# Looks for protobuf-config.cmake file installed by Protobuf's cmake installation.
option(protobuf_MODULE_COMPATIBLE TRUE)
find_package(Protobuf CONFIG REQUIRED)
message(STATUS "Using protobuf ${Protobuf_VERSION}")
set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf)
set(_REFLECTION gRPC::grpc++_reflection)
if(CMAKE_CROSSCOMPILING)
find_program(_PROTOBUF_PROTOC protoc)
else()
set(_PROTOBUF_PROTOC $<TARGET_FILE:protobuf::protoc>)
endif()
# Find gRPC installation
# Looks for gRPCConfig.cmake file installed by gRPC's cmake installation.
find_package(gRPC CONFIG REQUIRED)
message(STATUS "Using gRPC ${gRPC_VERSION}")
set(_GRPC_GRPCPP gRPC::grpc++)
if(CMAKE_CROSSCOMPILING)
find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin)
else()
set(_GRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:gRPC::grpc_cpp_plugin>)
endif()
endif()

125
common.cmake Normal file
View File

@ -0,0 +1,125 @@
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# cmake build file for C++ route_guide example.
# Assumes protobuf and gRPC have been installed using cmake.
# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build
# that automatically builds all the dependencies before building route_guide.
cmake_minimum_required(VERSION 3.16)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED True)
if(MSVC)
add_definitions(-D_WIN32_WINNT=0x600)
endif()
find_package(Threads REQUIRED)
if(GRPC_AS_SUBMODULE)
# One way to build a projects that uses gRPC is to just include the
# entire gRPC project tree via "add_subdirectory".
# This approach is very simple to use, but the are some potential
# disadvantages:
# * it includes gRPC's CMakeLists.txt directly into your build script
# without and that can make gRPC's internal setting interfere with your
# own build.
# * depending on what's installed on your system, the contents of submodules
# in gRPC's third_party/* might need to be available (and there might be
# additional prerequisites required to build them). Consider using
# the gRPC_*_PROVIDER options to fine-tune the expected behavior.
#
# A more robust approach to add dependency on gRPC is using
# cmake's ExternalProject_Add (see cmake_externalproject/CMakeLists.txt).
# Include the gRPC's cmake build (normally grpc source code would live
# in a git submodule called "third_party/grpc", but this example lives in
# the same repository as gRPC sources, so we just look a few directories up)
add_subdirectory(../../.. ${CMAKE_CURRENT_BINARY_DIR}/grpc EXCLUDE_FROM_ALL)
message(STATUS "Using gRPC via add_subdirectory.")
# After using add_subdirectory, we can now use the grpc targets directly from
# this build.
set(_PROTOBUF_LIBPROTOBUF libprotobuf)
set(_REFLECTION grpc++_reflection)
set(_ORCA_SERVICE grpcpp_orca_service)
if(CMAKE_CROSSCOMPILING)
find_program(_PROTOBUF_PROTOC protoc)
else()
set(_PROTOBUF_PROTOC $<TARGET_FILE:protobuf::protoc>)
endif()
set(_GRPC_GRPCPP grpc++)
if(CMAKE_CROSSCOMPILING)
find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin)
else()
set(_GRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:grpc_cpp_plugin>)
endif()
elseif(GRPC_FETCHCONTENT)
# Another way is to use CMake's FetchContent module to clone gRPC at
# configure time. This makes gRPC's source code available to your project,
# similar to a git submodule.
message(STATUS "Using gRPC via add_subdirectory (FetchContent).")
include(FetchContent)
FetchContent_Declare(
grpc
GIT_REPOSITORY https://github.com/grpc/grpc.git
# when using gRPC, you will actually set this to an existing tag, such as
# v1.25.0, v1.26.0 etc..
# For the purpose of testing, we override the tag used to the commit
# that's currently under test.
GIT_TAG vGRPC_TAG_VERSION_OF_YOUR_CHOICE)
FetchContent_MakeAvailable(grpc)
# Since FetchContent uses add_subdirectory under the hood, we can use
# the grpc targets directly from this build.
set(_PROTOBUF_LIBPROTOBUF libprotobuf)
set(_REFLECTION grpc++_reflection)
set(_PROTOBUF_PROTOC $<TARGET_FILE:protoc>)
set(_GRPC_GRPCPP grpc++)
if(CMAKE_CROSSCOMPILING)
find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin)
else()
set(_GRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:grpc_cpp_plugin>)
endif()
else()
# This branch assumes that gRPC and all its dependencies are already installed
# on this system, so they can be located by find_package().
# Find Protobuf installation
# Looks for protobuf-config.cmake file installed by Protobuf's cmake installation.
option(protobuf_MODULE_COMPATIBLE TRUE)
find_package(Protobuf CONFIG REQUIRED)
message(STATUS "Using protobuf ${Protobuf_VERSION}")
set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf)
set(_REFLECTION gRPC::grpc++_reflection)
if(CMAKE_CROSSCOMPILING)
find_program(_PROTOBUF_PROTOC protoc)
else()
set(_PROTOBUF_PROTOC $<TARGET_FILE:protobuf::protoc>)
endif()
# Find gRPC installation
# Looks for gRPCConfig.cmake file installed by gRPC's cmake installation.
find_package(gRPC CONFIG REQUIRED)
message(STATUS "Using gRPC ${gRPC_VERSION}")
set(_GRPC_GRPCPP gRPC::grpc++)
if(CMAKE_CROSSCOMPILING)
find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin)
else()
set(_GRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:gRPC::grpc_cpp_plugin>)
endif()
endif()

40
include/Algo.hh Normal file
View File

@ -0,0 +1,40 @@
#pragma once
// C++ Includes
#include <memory>
#include <queue>
#include <atomic>
#include <thread>
#include <future>
// FastInAHurry includes
#include "Publisher.hh"
#include "utils/ThreadSafeQueue.hh"
#include "utils/ThreadPool.hh"
class Algo {
public:
Algo();
void initialize();
void generate_orders();
void process();
void send(trading::Order&);
bool initialized();
void stop();
private:
std::queue<trading::Order> _orders;
std::atomic<bool> _initialized{false};
std::atomic<bool> _running{false};
std::shared_ptr<Publisher> _pub;
utils::ThreadSafeQueue<trading::Order> _order_queue;
utils::ThreadPool _thread_pool;
std::mutex _send_mutex;
std::jthread _reader_thread;
std::vector<std::jthread> _worker_threads;
std::vector<std::future<bool>> _futures;
};

9
include/Controller.hh Normal file
View File

@ -0,0 +1,9 @@
#include "Algo.hh"
class Controller {
public:
explicit Controller(int, char*[]);
void start();
private:
std::unique_ptr<Algo> _algo;
};

14
include/Publisher.hh Normal file
View File

@ -0,0 +1,14 @@
#pragma once
#include "trading.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <memory>
class Publisher {
public:
explicit Publisher(std::shared_ptr<grpc::Channel> channel);
grpc::Status send_order(const trading::Order& order);
private:
std::unique_ptr<trading::TradingService::Stub> _stub;
};

View File

@ -0,0 +1,17 @@
#pragma once
// C++ Includes
#include <queue>
#include <string>
#include <memory>
// Third Party Includes
#include "trading.pb.h"
#include <nlohmann/json.hpp>
namespace readers::json {
using JSON = nlohmann::json;
using OrderQueue = std::queue<trading::Order>;
OrderQueue read_orders_from_json(const std::string& filename);
} // End json namespace

View File

@ -0,0 +1,46 @@
// C++ Includes
#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <future>
namespace utils {
class ThreadPool {
private:
std::vector<std::thread> _workers;
std::queue<std::function<void()>> _tasks;
std::mutex _queue_mutex;
std::condition_variable _cv;
bool _stop{false};
public:
explicit ThreadPool(std::size_t threads = std::thread::hardware_concurrency());
~ThreadPool();
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>> {
using return_type = std::invoke_result_t<F, Args...>;
auto task_ptr = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> result = task_ptr->get_future();
{
std::unique_lock lock(_queue_mutex);
if (_stop)
throw std::runtime_error("Tried to enqueue on stopped ThreadPool");
_tasks.emplace([task_ptr]() { (*task_ptr)(); });
}
_cv.notify_one();
return result;
}
};
} // End utils namespace

View File

@ -0,0 +1,35 @@
// C++ Includes
#include <queue>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>
namespace utils {
template<typename T>
class ThreadSafeQueue {
private:
std::queue<T> _queue;
std::mutex _mutex;
std::condition_variable _cv;
public:
void push(T value) {
{
std::lock_guard<std::mutex> lock(_mutex);
_queue.push(std::move(value));
}
_cv.notify_one();
}
T wait_and_pop() {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [&] { return !_queue.empty(); });
T val = std::move(_queue.front());
_queue.pop();
return val;
}
};
} // End utils namespace

19
main.cc Normal file
View File

@ -0,0 +1,19 @@
// C++ Includes
#include <cstdint>
#include <cassert>
#include <iostream>
// FastInAHurry Includes
#include <Controller.hh>
#include <Algo.hh>
int main(int argc, char* argv[]) {
for (uint8_t i = 0; i < argc; i++) {
std::printf("argument[%d]: %s\n", i, argv[i]);
}
assert(2 + 2 == 4);
Controller ctlr(argc, argv);
ctlr.start();
return 0;
}

18
orders.json Normal file
View File

@ -0,0 +1,18 @@
[
{ "symbol": "AAPL", "quantity": 10, "price": 189.5, "side": "BUY" },
{ "symbol": "META", "quantity": 20, "price": 225.0, "side": "SELL" },
{ "symbol": "JAVI", "quantity": 9, "price": 100, "side": "BUY" },
{ "symbol": "EVAN", "quantity": 8, "price": 104, "side": "BUY" },
{ "symbol": "CARTI", "quantity": 7, "price": 3105.0, "side": "BUY" },
{ "symbol": "MCLOVIN", "quantity": 12, "price": 3121.0, "side": "BUY" },
{ "symbol": "XANDER", "quantity": 15, "price": 3107.0, "side": "BUY" },
{ "symbol": "ADVB", "quantity": 5, "price": 3101.0, "side": "BUY" },
{ "symbol": "fvasd", "quantity": 8, "price": 3102.0, "side": "BUY" },
{ "symbol": "asdaad", "quantity": 10, "price": 3107.0, "side": "BUY" },
{ "symbol": "JSTSXc", "quantity": 8, "price": 3105.0, "side": "BUY" },
{ "symbol": "HEDGE", "quantity": 6, "price": 3111.0, "side": "BUY" },
{ "symbol": "MAC10", "quantity": 9, "price": 3113.0, "side": "BUY" },
{ "symbol": "SAWEDOFF", "quantity": 4, "price": 3112.0, "side": "BUY" },
{ "symbol": "S&P", "quantity": 2, "price": 3104.0, "side": "BUY" },
{ "symbol": "THREEHUNNID", "quantity": 2, "price": 3105.0, "side": "BUY" }
]

16
proto/msg.proto Normal file
View File

@ -0,0 +1,16 @@
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
message Ticket {
string name = 1;
int32 id = 2;
bool is_valid = 3;
}

View File

@ -0,0 +1,15 @@
syntax = "proto3";
package ticket_exchange;
service StartHandshake {
rpc SayHello (HelloRequest) returns (HelloReply);
}
service HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}

36
proto/trading.proto Normal file
View File

@ -0,0 +1,36 @@
syntax = "proto3";
package trading;
service TradingService {
rpc SendOrder(Order) returns (OrderAck);
rpc StreamMarketData(MarketRequest) returns (stream MarketData);
}
enum Side {
SIDE_UNSPECIFIED = 0;
BUY = 1;
SELL = 2;
}
message Order {
string symbol = 1;
int32 quantity = 2;
double price = 3;
Side side = 4; // "buy" or "sell"
}
message OrderAck {
bool success = 1;
string message = 2;
}
message MarketRequest {
string symbol = 1;
}
message MarketData {
string symbol = 1;
double price = 2;
int64 timestamp = 3;
}

18
shell.nix Normal file
View File

@ -0,0 +1,18 @@
{ pkgs ? import <nixpkgs> {} }:
pkgs.mkShell {
name = "fastinahurry-dev";
buildInputs = with pkgs; [
openssl
gcc
cmake
gdb
pkg-config
boost
valgrind
zsh
grpc
protobuf
nlohmann_json
];
}

108
src/Algo.cc Normal file
View File

@ -0,0 +1,108 @@
// C++ Includes
#include <sstream>
// FastInAHurry Includes
#include <Algo.hh>
#include <readers/JSONReader.hh>
// Third Party Includes
#include <nlohmann/json.hpp>
Algo::Algo() {
std::cout << "Algo created" << std::endl;
}
void Algo::initialize() {
std::string server_addr = "localhost:50051";
auto channel = grpc::CreateChannel(server_addr, grpc::InsecureChannelCredentials());
_pub = std::make_shared<Publisher>(channel);
_initialized = true;
}
void Algo::stop() {
// _running = false;
_reader_thread.request_stop();
for (auto& t : _worker_threads) {
t.request_stop();
}
for (auto& future : _futures) {
bool ok = future.get();
if (!ok) {
std::cerr << "[Warning] Some orders failed to send" << std::endl;
}
}
}
void Algo::generate_orders() {
_orders = readers::json::read_orders_from_json("orders.json");
}
bool Algo::initialized() {
return _initialized;
}
void Algo::process() {
// Thread to read from file source and enqueue
_reader_thread = std::jthread([this](std::stop_token stoken) {
while (!stoken.stop_requested()) {
auto orders = readers::json::read_orders_from_json("orders.json");
while(!orders.empty()) {
_order_queue.push(orders.front());
orders.pop();
}
}
});
std::size_t num_workers = 4;
for (std::size_t i = 0; i < num_workers; ++i) {
_worker_threads.emplace_back([this] (std::stop_token stoken) {
while(!stoken.stop_requested()) {
trading::Order order = _order_queue.wait_and_pop();
auto future = _thread_pool.enqueue([this, order]() -> bool {
grpc::Status status;
{
std::lock_guard lock(_send_mutex);
status = _pub->send_order(order);
}
if (!status.ok()) {
std::cerr << "[Error] Failed to send order: "
<< order.symbol() << "\n";
return false;
}
std::cout << "[Success] Sent order for "
<< order.symbol() << "\n";
return true;
});
if (future.wait_for(std::chrono::milliseconds(10)) == std::future_status::ready) {
bool ok = future.get();
if (!ok) {
std::cerr << "[Inline] Order failed immediately for " << order.symbol() << '\n';
}
} else {
_futures.push_back(std::move(future));
}
}
});
}
// while(_running) {
// this->generate_orders();
// while (!_orders.empty()) {
// auto order = _orders.front();
// _orders.pop();
// send(order);
// }
// sleep(1);
// }
}
void Algo::send(trading::Order& order) {
_pub->send_order(order);
}

14
src/Controller.cc Normal file
View File

@ -0,0 +1,14 @@
#include "Controller.hh"
Controller::Controller(int argc, char* argv[])
: _algo(new Algo()) {
std::cout << "argc: " << argc << std::endl;
for (uint16_t i = 1; i < argc; i++) {
std::cout << argv[i] << " ";
} std::cout << std::endl;
}
void Controller::start() {
if (!_algo->initialized()) _algo->initialize();
_algo->process();
}

34
src/Publisher.cc Normal file
View File

@ -0,0 +1,34 @@
#include "Publisher.hh"
#include <iostream>
Publisher::Publisher(std::shared_ptr<grpc::Channel> channel)
: _stub(trading::TradingService::NewStub(channel)) {}
grpc::Status Publisher::send_order(const trading::Order& order) {
grpc::ClientContext context;
trading::OrderAck ack;
grpc::Status status = _stub->SendOrder(&context, order, &ack);
if (!ack.success()) {
std::cerr << "[Error] Order rejected by server: " << order.symbol() << std::endl;
}
std::cout << "Ack received: " << ack.message() << std::endl;
return status;
}
// void StreamMarketData() {
// MarketRequest req;
// req.set_symbol("AAPL");
// ClientContext context;
// std::unique_ptr<grpc::ClientReader<MarketData>> reader(
// stub_->StreamMarketData(&context, req));
// MarketData data;
// while (reader->Read(&data)) {
// std::cout << "Market: " << data.symbol() << " $" << data.price()
// << " @ " << data.timestamp() << std::endl;
// }
// }

61
src/grpc_client.cc Normal file
View File

@ -0,0 +1,61 @@
#include "trading.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <iostream>
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using trading::TradingService;
using trading::Order;
using trading::OrderAck;
using trading::MarketRequest;
using trading::MarketData;
class TraderClient {
public:
TraderClient(std::shared_ptr<Channel> channel)
: stub_(TradingService::NewStub(channel)) {}
// void SendOrder() {
// Order order;
// order.set_symbol("AAPL");
// order.set_quantity(100);
// order.set_price(150.0);
// order.set_side();
// OrderAck ack;
// ClientContext context;
// Status status = stub_->SendOrder(&context, order, &ack);
// if (status.ok()) {
// std::cout << "Order ack: " << ack.message() << std::endl;
// } else {
// std::cerr << "SendOrder failed: " << status.error_message() << std::endl;
// }
// }
// void StreamMarketData() {
// MarketRequest req;
// req.set_symbol("AAPL");
// ClientContext context;
// std::unique_ptr<grpc::ClientReader<MarketData>> reader(
// stub_->StreamMarketData(&context, req));
// MarketData data;
// while (reader->Read(&data)) {
// std::cout << "Market: " << data.symbol() << " $" << data.price()
// << " @ " << data.timestamp() << std::endl;
// }
// }
private:
std::unique_ptr<TradingService::Stub> stub_;
};
// int main() {
// TraderClient client(grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()));
// client.SendOrder();
// client.StreamMarketData();
// return 0;
// }

60
src/grpc_server.cc Normal file
View File

@ -0,0 +1,60 @@
#include "trading.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <iostream>
#include <thread>
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using trading::TradingService;
using trading::Order;
using trading::OrderAck;
using trading::MarketRequest;
using trading::MarketData;
class TradingServiceImpl final : public TradingService::Service {
Status SendOrder(ServerContext* ctx, const Order* req,
OrderAck* res) override {
std::cout << "Order: " << req->symbol() << " " << req->side()
<< " " << req->quantity() << " " << req->price() << std::endl;
res->set_success(true);
res->set_message("Order received");
return Status::OK;
}
Status StreamMarketData(ServerContext* ctx, const MarketRequest* req,
grpc::ServerWriter<MarketData>* writer) override {
uint32_t max_market_streams = 5;
for (uint32_t i = 0; i < max_market_streams; i++) {
MarketData data;
data.set_symbol(req->symbol());
data.set_price(100 + i);
data.set_timestamp(time(nullptr));
writer->Write(data);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return Status::OK;
}
};
void RunServer() {
std::string addr("0.0.0.0:50051");
TradingServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(addr, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Market server listening on" << addr << std::endl;
server->Wait();
}
// int main() {
// RunServer();
// return 0;
// }

46
src/readers/JSONReader.cc Normal file
View File

@ -0,0 +1,46 @@
// C++ Includes
#include <fstream>
#include <iostream>
// FastInAHurry Includes
#include "readers/JSONReader.hh"
#include <Algo.hh>
// Third Party Includes
#include "trading.pb.h"
namespace readers::json {
OrderQueue read_orders_from_json(const std::string& filename) {
OrderQueue orders;
std::ifstream in_file(filename);
if (!in_file.is_open()) {
std::cerr << "Failed to open JSON file: " << filename << std::endl;
return orders;
}
JSON j;
in_file >> j;
for (const auto& item : j) {
trading::Order order;
order.set_symbol(item.at("symbol").get<std::string>());
order.set_quantity(item.at("quantity").get<int>());
order.set_price(item.at("price").get<double>());
std::string side = item.at("side").get<std::string>();
if (side == "BUY") {
order.set_side(trading::BUY);
} else if (side == "SELL") {
order.set_side(trading::SELL);
} else {
order.set_side(trading::SIDE_UNSPECIFIED);
}
orders.push(order);
}
return orders;
}
} // End json namespace

46
src/utils/ThreadPool.cc Normal file
View File

@ -0,0 +1,46 @@
// C++ Includes
#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <future>
#include "utils/ThreadPool.hh"
namespace utils {
ThreadPool::ThreadPool(std::size_t threads) {
for (std::size_t i = 0; i < threads; ++i) {
_workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock lock(_queue_mutex);
_cv.wait(lock, [this] {
return _stop || !_tasks.empty();
});
if (_stop && _tasks.empty()) return;
task = std::move(_tasks.front());
_tasks.pop();
}
task();
}
});
}
}
ThreadPool::~ThreadPool() {
{
std::unique_lock lock(_queue_mutex);
_stop = true;
}
_cv.notify_all();
for (auto& worker : _workers) {
if (worker.joinable()) worker.join();
}
}
} // End utils namespace

View File

@ -0,0 +1,11 @@
// C++ Includes
// #include <queue>
// #include <thread>
// #include <functional>
// #include <mutex>
// #include <condition_variable>
// #include "utils/ThreadSafeQueue.hh"
// namespace utils {
// } // End utils namespace