diff --git a/.gitignore b/.gitignore index 950f9c5..9a43eaf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ build/ .direnv/ -.vscode \ No newline at end of file +.vscode +html \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index d2f2a63..da186fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,6 +46,12 @@ set_target_properties(fastinahurry PROPERTIES CXX_STANDARD_REQUIRED YES ) +add_executable(server ${SRC_FILES} server_main.cc) +set_target_properties(server PROPERTIES + CXX_STANDARD 23 + CXX_STANDARD_REQUIRED YES +) + # add_executable(fastinahurry ${SRC_FILES} main.cc) # add_executable(fastinahurry ${SRC_FILES} main.cc) @@ -62,6 +68,21 @@ target_include_directories(fastinahurry PRIVATE ${CMAKE_CURRENT_BINARY_DIR} # For generated *.pb.h ) +target_link_libraries(server PRIVATE + trading_grpc_proto + nlohmann_json::nlohmann_json + ${_REFLECTION} + ${_GRPC_GRPCPP} + ${_PROTOBUF_LIBPROTOBUF} +) + +target_include_directories(server PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_CURRENT_BINARY_DIR} # For generated *.pb.h +) + +target_compile_features(server PRIVATE cxx_std_23) + add_library(trading_grpc_proto ${trading_grpc_srcs} ${trading_grpc_hdrs} diff --git a/include/Algo.hh b/include/Algo.hh index f5f6222..03503c9 100644 --- a/include/Algo.hh +++ b/include/Algo.hh @@ -22,6 +22,7 @@ public: void generate_orders(); void process(); void start_background_processing(); + void start_market_data_streaming(); grpc::Status send(trading::Order&); bool initialized(); void stop(); diff --git a/include/Publisher.hh b/include/Publisher.hh index f617aad..5967f40 100644 --- a/include/Publisher.hh +++ b/include/Publisher.hh @@ -3,12 +3,22 @@ #include "trading.grpc.pb.h" #include #include +#include +#include class Publisher { public: explicit Publisher(std::shared_ptr channel); grpc::Status send_order(const trading::Order order); + void start_market_data_stream(const std::string& symbol); + void stop_market_data_stream(); + private: std::unique_ptr _stub; + std::atomic _streaming{false}; + std::thread _stream_thread; + + void stream_market_data(const std::string& symbol); + trading::Order generate_order_from_market_data(const trading::MarketData& data); }; \ No newline at end of file diff --git a/main.cc b/main.cc index 15687d2..b0f55b8 100644 --- a/main.cc +++ b/main.cc @@ -11,7 +11,6 @@ int main(int argc, char* argv[]) { for (uint8_t i = 0; i < argc; i++) { std::printf("argument[%d]: %s\n", i, argv[i]); } - assert(argc == 2); Controller ctlr(argc, argv); ctlr.start(); diff --git a/server_main.cc b/server_main.cc new file mode 100644 index 0000000..b59ce6e --- /dev/null +++ b/server_main.cc @@ -0,0 +1,20 @@ +#include +#include + +void RunServer(std::string); +void print_help() { + std::ostringstream ss; + ss << "Usage: ./build/server " << std::endl; + std::cout << ss.str(); +} + +int main(int argc, char** argv) { + std::cout << "Starting gRPC market data server..." << std::endl; + if (argc != 2) { + print_help(); + return 1; + }; + RunServer(std::string(argv[1])); + + return 0; +} \ No newline at end of file diff --git a/src/Algo.cc b/src/Algo.cc index 1698b81..e25a1c3 100644 --- a/src/Algo.cc +++ b/src/Algo.cc @@ -29,6 +29,10 @@ void Algo::stop() { _stopping.store(true, std::memory_order_release); _running.store(false, std::memory_order_release); + if (_pub) { + _pub->stop_market_data_stream(); + } + _reader_thread.request_stop(); for (auto& t : _worker_threads) { t.request_stop(); @@ -84,6 +88,15 @@ void Algo::process() { } } +void Algo::start_market_data_streaming() { + if (!_pub) { + std::cerr << "[Error] Publisher not initialized. Call initialize() first." << std::endl; + return; + } + + _pub->start_market_data_stream("AAPL"); +} + void Algo::start_background_processing() { // Thread to read from file source and enqueue _reader_thread = std::jthread([this](std::stop_token stoken) { @@ -96,7 +109,7 @@ void Algo::start_background_processing() { } }); - std::size_t num_workers = 2; + std::size_t num_workers = 1; for (std::size_t i = 0; i < num_workers; ++i) { _worker_threads.emplace_back([this] (std::stop_token stoken) { while(!stoken.stop_requested()) { diff --git a/src/Controller.cc b/src/Controller.cc index dae6858..f6178f3 100644 --- a/src/Controller.cc +++ b/src/Controller.cc @@ -11,10 +11,12 @@ Controller::Controller(int argc, char* argv[]) void Controller::start() { if (!_algo->initialized()) _algo->initialize(); + + _algo->start_market_data_streaming(); + while(_algo->is_running()) { - _algo->process(); - std::this_thread::sleep_for(std::chrono::seconds(3)); + std::this_thread::sleep_for(std::chrono::seconds(1)); _count++; - if (_count > 5) _algo->stop(); + if (_count >= 10) _algo->stop(); } } diff --git a/src/Publisher.cc b/src/Publisher.cc index 17fa21a..0be9781 100644 --- a/src/Publisher.cc +++ b/src/Publisher.cc @@ -1,7 +1,7 @@ #include "Publisher.hh" #include #include - +#include Publisher::Publisher(std::shared_ptr channel) : _stub(trading::TradingService::NewStub(channel)) {} @@ -19,17 +19,78 @@ grpc::Status Publisher::send_order(const trading::Order order) { return status; } -// void StreamMarketData() { -// MarketRequest req; -// req.set_symbol("AAPL"); +void Publisher::start_market_data_stream(const std::string& symbol) { + if (_streaming.load()) { + std::cout << "Market data stream already running" << std::endl; + return; + } + + _streaming.store(true); + _stream_thread = std::thread(&Publisher::stream_market_data, this, symbol); + std::cout << "Started market data stream for " << symbol << std::endl; +} -// ClientContext context; -// std::unique_ptr> reader( -// stub_->StreamMarketData(&context, req)); +void Publisher::stop_market_data_stream() { + if (!_streaming.load()) { + return; + } + + _streaming.store(false); + if (_stream_thread.joinable()) { + _stream_thread.join(); + } + std::cout << "Stopped market data stream" << std::endl; +} -// MarketData data; -// while (reader->Read(&data)) { -// std::cout << "Market: " << data.symbol() << " $" << data.price() -// << " @ " << data.timestamp() << std::endl; -// } -// } \ No newline at end of file +void Publisher::stream_market_data(const std::string& symbol) { + trading::MarketRequest req; + req.set_symbol(symbol); + + grpc::ClientContext context; + std::unique_ptr> reader( + _stub->StreamMarketData(&context, req)); + + trading::MarketData data; + while (reader->Read(&data) && _streaming.load()) { + std::cout << "Received market data: " << data.symbol() + << " @ $" << data.price() + << " (timestamp: " << data.timestamp() << ")" << std::endl; + + trading::Order order = generate_order_from_market_data(data); + grpc::Status status = send_order(order); + + if (!status.ok()) { + std::cerr << "[Error] Failed to send order: " << status.error_message() << std::endl; + } + } + + grpc::Status status = reader->Finish(); + if (!status.ok()) { + std::cerr << "[Error] Market data stream ended with error: " << status.error_message() << std::endl; + } + + _streaming.store(false); +} + +trading::Order Publisher::generate_order_from_market_data(const trading::MarketData& data) { + trading::Order order; + order.set_symbol(data.symbol()); + + static std::random_device rd; + static std::mt19937 gen(rd()); + static std::uniform_int_distribution<> quantity_dist(10, 100); + static std::uniform_int_distribution<> side_dist(1, 2); + static std::uniform_real_distribution<> price_adjustment(-2.0, 2.0); + + order.set_quantity(quantity_dist(gen)); + order.set_side(static_cast(side_dist(gen))); + + double adjusted_price = data.price() + price_adjustment(gen); + order.set_price(adjusted_price); + + std::cout << "Generated order: " << (order.side() == trading::BUY ? "BUY" : "SELL") + << " " << order.quantity() << " " << order.symbol() + << " @ $" << order.price() << std::endl; + + return order; +} \ No newline at end of file diff --git a/src/grpc_server.cc b/src/grpc_server.cc index 403f44a..6c0e4f1 100644 --- a/src/grpc_server.cc +++ b/src/grpc_server.cc @@ -2,6 +2,9 @@ #include #include #include +#include +#include +#include using grpc::Server; using grpc::ServerBuilder; @@ -26,30 +29,48 @@ class TradingServiceImpl final : public TradingService::Service { Status StreamMarketData(ServerContext* ctx, const MarketRequest* req, grpc::ServerWriter* writer) override { + std::cout << "Starting market data stream for symbol: " << req->symbol() << std::endl; - uint32_t max_market_streams = 5; - for (uint32_t i = 0; i < max_market_streams; i++) { - MarketData data; - data.set_symbol(req->symbol()); - data.set_price(100 + i); - data.set_timestamp(time(nullptr)); - writer->Write(data); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + uint32_t counter = 0; + while (!ctx->IsCancelled()) { + MarketData data; + data.set_symbol(req->symbol()); + + double base_price = 100.0; + double price_variation = 5.0 * sin(counter * 0.1) + (rand() % 10 - 5) * 0.1; + data.set_price(base_price + price_variation); + data.set_timestamp(time(nullptr)); + + if (!writer->Write(data)) { + break; } - return Status::OK; + + std::cout << "Sent market data: " << req->symbol() + << " @ $" << data.price() << std::endl; + + counter++; + std::this_thread::sleep_for(std::chrono::seconds(3)); + } + + std::cout << "Market data stream ended for: " << req->symbol() << std::endl; + return Status::OK; } }; - void RunServer() { - std::string addr("0.0.0.0:50051"); + void RunServer(std::string port) { + std::string addr("0.0.0.0"); + std::string socket = addr; + std::ostringstream ss; + ss << addr << ":" << port; TradingServiceImpl service; ServerBuilder builder; - builder.AddListeningPort(addr, grpc::InsecureServerCredentials()); + + builder.AddListeningPort(ss.str(), grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); - std::cout << "Market server listening on" << addr << std::endl; + std::cout << "Market server listening on " << addr << ":" << port << std::endl; server->Wait(); }