From b2ca2d9cc2ccbafb7e05c8599190113cbc95512c Mon Sep 17 00:00:00 2001 From: xbazzi Date: Sun, 17 Aug 2025 15:34:13 -0600 Subject: [PATCH] Fix data races --- .gitea/workflows/mirror-to-github.yaml | 16 ++++ include/Algo.hh | 6 +- include/Controller.hh | 1 + shell.nix | 16 +++- src/Algo.cc | 100 ++++++++++++++++++------- src/Controller.cc | 10 ++- src/utils/ThreadSafeQueue.cc | 11 --- 7 files changed, 117 insertions(+), 43 deletions(-) create mode 100644 .gitea/workflows/mirror-to-github.yaml delete mode 100644 src/utils/ThreadSafeQueue.cc diff --git a/.gitea/workflows/mirror-to-github.yaml b/.gitea/workflows/mirror-to-github.yaml new file mode 100644 index 0000000..9e17622 --- /dev/null +++ b/.gitea/workflows/mirror-to-github.yaml @@ -0,0 +1,16 @@ +name: Simple Mirror to GitHub + +on: + push: + branches: + - master + +jobs: + mirror: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Push to GitHub + run: | + git remote add github https://${{ secrets.GH_TOKEN }}@github.com/xbazzi/fastinahurry.git + git push github master --force \ No newline at end of file diff --git a/include/Algo.hh b/include/Algo.hh index 7ee3f2c..f5f6222 100644 --- a/include/Algo.hh +++ b/include/Algo.hh @@ -21,9 +21,12 @@ public: void initialize(); void generate_orders(); void process(); - void send(trading::Order&); + void start_background_processing(); + grpc::Status send(trading::Order&); bool initialized(); void stop(); + bool is_running(); + void cleanup_completed_futures(); private: std::queue _orders; std::atomic _initialized{false}; @@ -34,6 +37,7 @@ private: utils::ThreadSafeQueue _order_queue; utils::ThreadPool _thread_pool; + std::mutex _orders_mutex; std::mutex _send_mutex; std::mutex _futures_mutex; std::jthread _reader_thread; diff --git a/include/Controller.hh b/include/Controller.hh index 269ae9b..88aa8c5 100644 --- a/include/Controller.hh +++ b/include/Controller.hh @@ -6,4 +6,5 @@ public: void start(); private: std::unique_ptr _algo; + uint32_t _count; }; \ No newline at end of file diff --git a/shell.nix b/shell.nix index 43cf1e8..c1e6484 100644 --- a/shell.nix +++ b/shell.nix @@ -1,5 +1,12 @@ -{ pkgs ? import {} }: - +let + config = { + allowUnfree = true; + # allowUnfreePredicate = pkg: builtins.elem (builtins.parseDrvName pkg.name).name [ + # "claude-code" + # ]; + }; + pkgs = import { inherit config; }; +in pkgs.mkShell { name = "fastinahurry-dev"; buildInputs = with pkgs; [ @@ -16,6 +23,9 @@ pkgs.mkShell { nlohmann_json doxygen graphviz - claude-code ]; + + # nixpkgs.config.allowUnfreePredicate = pkg: builtins.elem (lib.getName pkg) [ + # "claude-code" + # ]; } diff --git a/src/Algo.cc b/src/Algo.cc index 981bea0..1698b81 100644 --- a/src/Algo.cc +++ b/src/Algo.cc @@ -17,34 +17,74 @@ void Algo::initialize() { auto channel = grpc::CreateChannel(server_addr, grpc::InsecureChannelCredentials()); _pub = std::make_shared(channel); - _initialized = true; + _initialized.store(true, std::memory_order_release); + _running.store(true, std::memory_order_release); +} + +bool Algo::is_running() { + return _running; } void Algo::stop() { - // _running = false; + _stopping.store(true, std::memory_order_release); + _running.store(false, std::memory_order_release); _reader_thread.request_stop(); for (auto& t : _worker_threads) { t.request_stop(); } - for (auto& future : _futures) { - bool ok = future.get(); - if (!ok) { - std::cerr << "[Warning] Some orders failed to send" << std::endl; + if (_reader_thread.joinable()) { + _reader_thread.join(); + } + for (auto& t : _worker_threads) { + if (t.joinable()) { + t.join(); } } + + { + std::lock_guard lk(_futures_mutex); + for (auto& future : _futures) { + bool ok = future.get(); + if (!ok) { + std::cerr << "[Warning] Some orders failed to send" << std::endl; + } + } + _futures.clear(); + } } void Algo::generate_orders() { + std::lock_guard lock(_orders_mutex); _orders = readers::json::read_orders_from_json("orders.json"); } bool Algo::initialized() { - return _initialized; + return _initialized.load(std::memory_order_acquire); } void Algo::process() { + this->generate_orders(); + std::lock_guard lock(_orders_mutex); + while (!_orders.empty()) { + auto order = _orders.front(); + _orders.pop(); + auto status = send(order); + if (status.ok()) { + std::ostringstream ss; + ss << "[Success] Sent order to buy " << order.quantity() + << " shares of " << order.symbol() << " at $" + << order.price() << std::endl; + std::cout << ss.str(); + } else { + std::cout << "[Error] Failed to send order for " << order.symbol() + << " - " << status.error_message() << std::endl; + } + } +} + +void Algo::start_background_processing() { // Thread to read from file source and enqueue _reader_thread = std::jthread([this](std::stop_token stoken) { while (!stoken.stop_requested()) { @@ -64,11 +104,11 @@ void Algo::process() { auto future = _thread_pool.enqueue([this, order]() -> bool { grpc::Status status; - // { - // std::lock_guard lock(_send_mutex); - // status = _pub->send_order(order); - // } - status = _pub->send_order(order); + { + std::lock_guard lock(_send_mutex); + status = _pub->send_order(order); + std:: cout << "WOAH!" << std::endl; + } if (!status.ok()) { std::cerr << "[Error] Failed to send order: " @@ -92,25 +132,33 @@ void Algo::process() { if (!ok) std::cerr << "[Late] Order failed for " << order.symbol() << std::endl; } else { std::cout << "future got pushed brah" << std::endl; - std::scoped_lock lk(_futures_mutex); - _futures.emplace_back(std::move(future)); + { + std::lock_guard lk(_futures_mutex); + _futures.emplace_back(std::move(future)); + } } } } }); } - - // while(_running) { - // this->generate_orders(); - // while (!_orders.empty()) { - // auto order = _orders.front(); - // _orders.pop(); - // send(order); - // } - // sleep(1); - // } } -void Algo::send(trading::Order& order) { - _pub->send_order(order); +void Algo::cleanup_completed_futures() { + std::lock_guard lk(_futures_mutex); + auto it = _futures.begin(); + while (it != _futures.end()) { + if (it->wait_for(std::chrono::seconds(0)) == std::future_status::ready) { + bool ok = it->get(); + if (!ok) { + std::cerr << "[Background] Future failed" << std::endl; + } + it = _futures.erase(it); + } else { + ++it; + } + } +} + +grpc::Status Algo::send(trading::Order& order) { + return _pub->send_order(order); } \ No newline at end of file diff --git a/src/Controller.cc b/src/Controller.cc index 2962930..dae6858 100644 --- a/src/Controller.cc +++ b/src/Controller.cc @@ -1,7 +1,8 @@ #include "Controller.hh" +// #include Controller::Controller(int argc, char* argv[]) - : _algo(new Algo()) { + : _algo(new Algo()), _count(0) { std::cout << "argc: " << argc << std::endl; for (uint16_t i = 1; i < argc; i++) { std::cout << argv[i] << " "; @@ -10,5 +11,10 @@ Controller::Controller(int argc, char* argv[]) void Controller::start() { if (!_algo->initialized()) _algo->initialize(); - _algo->process(); + while(_algo->is_running()) { + _algo->process(); + std::this_thread::sleep_for(std::chrono::seconds(3)); + _count++; + if (_count > 5) _algo->stop(); + } } diff --git a/src/utils/ThreadSafeQueue.cc b/src/utils/ThreadSafeQueue.cc deleted file mode 100644 index e3cac34..0000000 --- a/src/utils/ThreadSafeQueue.cc +++ /dev/null @@ -1,11 +0,0 @@ -// C++ Includes -// #include -// #include -// #include -// #include -// #include -// #include "utils/ThreadSafeQueue.hh" - -// namespace utils { - -// } // End utils namespace \ No newline at end of file