Fix data races
Some checks failed
Simple Mirror to GitHub / mirror (push) Failing after 7s

This commit is contained in:
xbazzi 2025-08-17 15:34:13 -06:00
parent 59d0867e21
commit b2ca2d9cc2
7 changed files with 117 additions and 43 deletions

View File

@ -0,0 +1,16 @@
name: Simple Mirror to GitHub
on:
push:
branches:
- master
jobs:
mirror:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Push to GitHub
run: |
git remote add github https://${{ secrets.GH_TOKEN }}@github.com/xbazzi/fastinahurry.git
git push github master --force

View File

@ -21,9 +21,12 @@ public:
void initialize(); void initialize();
void generate_orders(); void generate_orders();
void process(); void process();
void send(trading::Order&); void start_background_processing();
grpc::Status send(trading::Order&);
bool initialized(); bool initialized();
void stop(); void stop();
bool is_running();
void cleanup_completed_futures();
private: private:
std::queue<trading::Order> _orders; std::queue<trading::Order> _orders;
std::atomic<bool> _initialized{false}; std::atomic<bool> _initialized{false};
@ -34,6 +37,7 @@ private:
utils::ThreadSafeQueue<trading::Order> _order_queue; utils::ThreadSafeQueue<trading::Order> _order_queue;
utils::ThreadPool _thread_pool; utils::ThreadPool _thread_pool;
std::mutex _orders_mutex;
std::mutex _send_mutex; std::mutex _send_mutex;
std::mutex _futures_mutex; std::mutex _futures_mutex;
std::jthread _reader_thread; std::jthread _reader_thread;

View File

@ -6,4 +6,5 @@ public:
void start(); void start();
private: private:
std::unique_ptr<Algo> _algo; std::unique_ptr<Algo> _algo;
uint32_t _count;
}; };

View File

@ -1,5 +1,12 @@
{ pkgs ? import <nixpkgs> {} }: let
config = {
allowUnfree = true;
# allowUnfreePredicate = pkg: builtins.elem (builtins.parseDrvName pkg.name).name [
# "claude-code"
# ];
};
pkgs = import <nixpkgs> { inherit config; };
in
pkgs.mkShell { pkgs.mkShell {
name = "fastinahurry-dev"; name = "fastinahurry-dev";
buildInputs = with pkgs; [ buildInputs = with pkgs; [
@ -16,6 +23,9 @@ pkgs.mkShell {
nlohmann_json nlohmann_json
doxygen doxygen
graphviz graphviz
claude-code
]; ];
# nixpkgs.config.allowUnfreePredicate = pkg: builtins.elem (lib.getName pkg) [
# "claude-code"
# ];
} }

View File

@ -17,34 +17,74 @@ void Algo::initialize() {
auto channel = grpc::CreateChannel(server_addr, grpc::InsecureChannelCredentials()); auto channel = grpc::CreateChannel(server_addr, grpc::InsecureChannelCredentials());
_pub = std::make_shared<Publisher>(channel); _pub = std::make_shared<Publisher>(channel);
_initialized = true; _initialized.store(true, std::memory_order_release);
_running.store(true, std::memory_order_release);
}
bool Algo::is_running() {
return _running;
} }
void Algo::stop() { void Algo::stop() {
// _running = false; _stopping.store(true, std::memory_order_release);
_running.store(false, std::memory_order_release);
_reader_thread.request_stop(); _reader_thread.request_stop();
for (auto& t : _worker_threads) { for (auto& t : _worker_threads) {
t.request_stop(); t.request_stop();
} }
for (auto& future : _futures) { if (_reader_thread.joinable()) {
bool ok = future.get(); _reader_thread.join();
if (!ok) { }
std::cerr << "[Warning] Some orders failed to send" << std::endl; for (auto& t : _worker_threads) {
if (t.joinable()) {
t.join();
} }
} }
{
std::lock_guard<std::mutex> lk(_futures_mutex);
for (auto& future : _futures) {
bool ok = future.get();
if (!ok) {
std::cerr << "[Warning] Some orders failed to send" << std::endl;
}
}
_futures.clear();
}
} }
void Algo::generate_orders() { void Algo::generate_orders() {
std::lock_guard<std::mutex> lock(_orders_mutex);
_orders = readers::json::read_orders_from_json("orders.json"); _orders = readers::json::read_orders_from_json("orders.json");
} }
bool Algo::initialized() { bool Algo::initialized() {
return _initialized; return _initialized.load(std::memory_order_acquire);
} }
void Algo::process() { void Algo::process() {
this->generate_orders();
std::lock_guard<std::mutex> lock(_orders_mutex);
while (!_orders.empty()) {
auto order = _orders.front();
_orders.pop();
auto status = send(order);
if (status.ok()) {
std::ostringstream ss;
ss << "[Success] Sent order to buy " << order.quantity()
<< " shares of " << order.symbol() << " at $"
<< order.price() << std::endl;
std::cout << ss.str();
} else {
std::cout << "[Error] Failed to send order for " << order.symbol()
<< " - " << status.error_message() << std::endl;
}
}
}
void Algo::start_background_processing() {
// Thread to read from file source and enqueue // Thread to read from file source and enqueue
_reader_thread = std::jthread([this](std::stop_token stoken) { _reader_thread = std::jthread([this](std::stop_token stoken) {
while (!stoken.stop_requested()) { while (!stoken.stop_requested()) {
@ -64,11 +104,11 @@ void Algo::process() {
auto future = _thread_pool.enqueue([this, order]() -> bool { auto future = _thread_pool.enqueue([this, order]() -> bool {
grpc::Status status; grpc::Status status;
// { {
// std::lock_guard lock(_send_mutex); std::lock_guard lock(_send_mutex);
// status = _pub->send_order(order); status = _pub->send_order(order);
// } std:: cout << "WOAH!" << std::endl;
status = _pub->send_order(order); }
if (!status.ok()) { if (!status.ok()) {
std::cerr << "[Error] Failed to send order: " std::cerr << "[Error] Failed to send order: "
@ -92,25 +132,33 @@ void Algo::process() {
if (!ok) std::cerr << "[Late] Order failed for " << order.symbol() << std::endl; if (!ok) std::cerr << "[Late] Order failed for " << order.symbol() << std::endl;
} else { } else {
std::cout << "future got pushed brah" << std::endl; std::cout << "future got pushed brah" << std::endl;
std::scoped_lock lk(_futures_mutex); {
_futures.emplace_back(std::move(future)); std::lock_guard<std::mutex> lk(_futures_mutex);
_futures.emplace_back(std::move(future));
}
} }
} }
} }
}); });
} }
// while(_running) {
// this->generate_orders();
// while (!_orders.empty()) {
// auto order = _orders.front();
// _orders.pop();
// send(order);
// }
// sleep(1);
// }
} }
void Algo::send(trading::Order& order) { void Algo::cleanup_completed_futures() {
_pub->send_order(order); std::lock_guard<std::mutex> lk(_futures_mutex);
auto it = _futures.begin();
while (it != _futures.end()) {
if (it->wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
bool ok = it->get();
if (!ok) {
std::cerr << "[Background] Future failed" << std::endl;
}
it = _futures.erase(it);
} else {
++it;
}
}
}
grpc::Status Algo::send(trading::Order& order) {
return _pub->send_order(order);
} }

View File

@ -1,7 +1,8 @@
#include "Controller.hh" #include "Controller.hh"
// #include <thread>
Controller::Controller(int argc, char* argv[]) Controller::Controller(int argc, char* argv[])
: _algo(new Algo()) { : _algo(new Algo()), _count(0) {
std::cout << "argc: " << argc << std::endl; std::cout << "argc: " << argc << std::endl;
for (uint16_t i = 1; i < argc; i++) { for (uint16_t i = 1; i < argc; i++) {
std::cout << argv[i] << " "; std::cout << argv[i] << " ";
@ -10,5 +11,10 @@ Controller::Controller(int argc, char* argv[])
void Controller::start() { void Controller::start() {
if (!_algo->initialized()) _algo->initialize(); if (!_algo->initialized()) _algo->initialize();
_algo->process(); while(_algo->is_running()) {
_algo->process();
std::this_thread::sleep_for(std::chrono::seconds(3));
_count++;
if (_count > 5) _algo->stop();
}
} }

View File

@ -1,11 +0,0 @@
// C++ Includes
// #include <queue>
// #include <thread>
// #include <functional>
// #include <mutex>
// #include <condition_variable>
// #include "utils/ThreadSafeQueue.hh"
// namespace utils {
// } // End utils namespace