16 Commits

Author SHA1 Message Date
dd075508b7 fix bal-pusher zmq connection stability 2025-11-03 08:46:13 -04:00
4ac492ba79 fix bal-pusher zmq connection stability 2025-11-03 07:42:09 -04:00
ae52b2b4e5 . 2025-11-01 19:02:15 -04:00
7c8ed123aa send times to welist
ED25519 times signing
2025-11-01 18:34:35 -04:00
d937ee9364 send time to welist server 2025-10-31 16:01:15 -04:00
3018e64fb7 release: bal-server-0.2.2 2025-10-31 15:13:09 -04:00
2234bb9147 bugdfix 2025-09-23 13:55:00 -04:00
fe1c4ee2c8 bal-pusher.env 2025-07-19 07:15:53 -04:00
ca10284479 logs about current block 2025-07-19 07:12:18 -04:00
afd21a5f2a bal-pusher readme 2025-07-16 04:50:40 -04:00
d154567aeb release: bal-server-0.2.1 2025-07-16 04:26:02 -04:00
8965a06dbe release: bal-server-0.2.1 2025-07-16 04:25:17 -04:00
134504e870 release: bal-server-0.2.1 2025-07-16 03:58:42 -04:00
66f34cb29f pusher fix 2025-07-16 03:47:28 -04:00
36edfcd073 fixed network db_field 2025-07-15 15:18:37 -04:00
9e89ae884e fixed db field network 2025-07-15 15:12:21 -04:00
10 changed files with 1458 additions and 109 deletions

1071
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -5,6 +5,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
base64 = "0.22.1"
bs58 = "0.4.0" bs58 = "0.4.0"
bytes = "1.2" bytes = "1.2"
bitcoin = { version = "0.32.5" } bitcoin = { version = "0.32.5" }
@@ -20,11 +21,13 @@ hyper = { version = "1.3.1", features = ["http1","server"] }
hyper-util = { version = "0.1.3", features = ["tokio"] } hyper-util = { version = "0.1.3", features = ["tokio"] }
http-body-util = "0.1" http-body-util = "0.1"
log = "0.4.21" log = "0.4.21"
openssl = { version = "0.10.74", features = ["vendored"] }
sha2 = "0.10.8" sha2 = "0.10.8"
serde = { version = "1.0.152", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.116" serde_json = "1.0.116"
sqlite = "0.34.0" sqlite = "0.34.0"
regex = "1.10.4" regex = "1.10.4"
reqwest = { version = "0.12.24", features = ["json","socks"] }
tokio = { version = "1", features = ["rt", "net","macros","rt-multi-thread"] } # Keep only necessary runtime components tokio = { version = "1", features = ["rt", "net","macros","rt-multi-thread"] } # Keep only necessary runtime components
zmq = "0.10.0" zmq = "0.10.0"

View File

@@ -1,10 +1,11 @@
# bal-server # bal-server
## Installation ## Installation
```bash ```bash
$ git clone .... $ git clone ....
$ cd bal-server $ cd bal-server
$ openssl genpkey -algorithm ED25519 -out private_key.pem
$ openssl pkey -in private_key.pem -pubout -out public_key.pem
$ cargo build --release $ cargo build --release
$ sudo cp target/release/bal-server /usr/local/bin $ sudo cp target/release/bal-server /usr/local/bin
$ bal-server $ bal-server
@@ -20,6 +21,7 @@ The `bal-server` application can be configured using environment variables. The
| `BAL_SERVER_DB_FILE` | Path to the SQLite3 database file. If the file does not exist, a new one will be created. | `bal.db` | | `BAL_SERVER_DB_FILE` | Path to the SQLite3 database file. If the file does not exist, a new one will be created. | `bal.db` |
| `BAL_SERVER_BIND_ADDRESS` | Public address for listening to requests. | `127.0.0.1` | | `BAL_SERVER_BIND_ADDRESS` | Public address for listening to requests. | `127.0.0.1` |
| `BAL_SERVER_BIND_PORT` | Default port for listening to requests. | `9137` | | `BAL_SERVER_BIND_PORT` | Default port for listening to requests. | `9137` |
| `BAL_SERVER_PUB_KEY_PATH` | WillExecutor Ed25519 public key | `public_key.pem` |
| `BAL_SERVER_REGTEST_ADDRESS` | Bitcoin address for the regtest environment. | - | | `BAL_SERVER_REGTEST_ADDRESS` | Bitcoin address for the regtest environment. | - |
| `BAL_SERVER_REGTEST_FIXED_FEE` | Fixed fee for the regtest environment. | 50000 | | `BAL_SERVER_REGTEST_FIXED_FEE` | Fixed fee for the regtest environment. | 50000 |
| `BAL_SERVER_SIGNET_ADDRESS` | Bitcoin address for the signet environment. | - | | `BAL_SERVER_SIGNET_ADDRESS` | Bitcoin address for the signet environment. | - |
@@ -28,3 +30,56 @@ The `bal-server` application can be configured using environment variables. The
| `BAL_SERVER_TESTNET_FIXED_FEE` | Fixed fee for the testnet environment. | 50000 | | `BAL_SERVER_TESTNET_FIXED_FEE` | Fixed fee for the testnet environment. | 50000 |
| `BAL_SERVER_BITCOIN_ADDRESS` | Bitcoin address for the mainnet environment. | - | | `BAL_SERVER_BITCOIN_ADDRESS` | Bitcoin address for the mainnet environment. | - |
| `BAL_SERVER_BITCOIN_FIXED_FEE` | Fixed fee for the mainnet environment. | 50000 | | `BAL_SERVER_BITCOIN_FIXED_FEE` | Fixed fee for the mainnet environment. | 50000 |
# bal-pusher
`bal-pusher` is a tool that retrieves Bitcoin transactions from a database and pushes them to the Bitcoin network when their **locktime** exceeds the **median time past** (MTP). It listens for Bitcoin block updates via ZMQ.
## Installation
To use `bal-pusher`, you need to compile and install Bitcoin with ZMQ (ZeroMQ) support enabled. Then, configure your Bitcoin node and `bal-pusher` to push the transactions.
### Prerequisites
1. **Bitcoin with ZMQ Support**:
Ensure that Bitcoin is compiled with ZMQ support. Add the following line to your `bitcoin.conf` file:
```
zmqpubhashblock=tcp://127.0.0.1:28332
```
2. **Install Rust and Cargo**:
If you haven't already installed Rust and Cargo, you can follow the official instructions to do so: [Rust Installation](https://www.rust-lang.org/tools/install).
## Configuration
`bal-pusher` can be configured using environment variables. If no configuration file is provided, a default configuration file will be created.
### Available Configuration Variables
| Variable | Description | Default |
|---------------------------------------|------------------------------------------|----------------------------------------------|
| `BAL_PUSHER_CONFIG_FILE` | Path to the configuration file. If the file does not exist, it will be created. | `$HOME/.config/bal-pusher/default-config.toml` |
| `BAL_PUSHER_DB_FILE` | Path to the SQLite3 database file. If the file does not exist, it will be created. | `bal.db` |
| `BAL_PUSHER_ZMQ_LISTENER` | ZMQ listener for Bitcoin updates. | `tcp://127.0.0.1:28332` |
| `BAL_PUSHER_BITCOIN_HOST` | Bitcoin server host for RPC connections. | `http://127.0.0.1` |
| `BAL_PUSHER_BITCOIN_PORT` | Bitcoin RPC server port. | `8332` |
| `BAL_PUSHER_BITCOIN_COOKIE_FILE` | Path to Bitcoin RPC cookie file. | `$HOME/.bitcoin/.cookie` |
| `BAL_PUSHER_BITCOIN_RPC_USER` | Bitcoin RPC username. | - |
| `BAL_PUSHER_BITCOIN_RPC_PASSWORD` | Bitcoin RPC password. | - |
| `BAL_PUSHER_SEND_STATS` | Contact welist to provide times | false |
| `WELIST_SERVER_URL` | welist server url to provide times | https://welist.bitcoin-afer.life |
| `BAL_SERVER_URL` | WillExecutor server url | - |
| `SSL_KEY_PATH` | Ed25519 private key pem file | `private_key.pem` |
## Running `bal-pusher`
Once the application is installed and configured, you can start `bal-pusher` by running the following command:
```bash
$ bal-pusher
```
This will start the service, which will listen for Bitcoin blocks via ZMQ and push transactions from the database when their locktime exceeds the median time past.

16
bal-pusher.env Normal file
View File

@@ -0,0 +1,16 @@
RUST_LOG=info
BAL_PUSHER_DB_FILE=/home/bal/bal.db
BAL_PUSHER_BITCOIN_COOKIE_FILE=/home/bitcoin/.bitcoin/.cookie
BAL_PUSHER_REGTEST_COOKIE_FILE=/home/bitcoin/.bitcoin/regtest/.cookie
BAL_PUSHER_TESTNET_COOKIE_FILE=/home/bitcoin/.bitcoin/testnet3/.cookie
BAL_PUSHER_SIGNET_COOKIE_FILE=/home/bitcoin/.bitcoin/signet/.cookie
BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332
BAL_PUSHER_SEND_STATS=true
WELIST_SERVER_URL=http://welist.bitcoin-after.life
SSL_KEY_PATH=/home/bal/privkey.pem
#your server domain. do not add https or final / only domain.
BAL_SERVER_URL="https://we.bitcoin-after.life"

14
bal-pusher.sh Normal file
View File

@@ -0,0 +1,14 @@
RUST_LOG=trace
BAL_PUSHER_DB_FILE="$(pwd)/bal.db"
#export BAL_PUSHER_BITCOIN_COOKIE_FILE=/~/.bitcoin/.cookie
#export BAL_PUSHER_REGTEST_COOKIE_FILE=/~/.bitcoin/regtest/.cookie
#export BAL_PUSHER_TESTNET_COOKIE_FILE=/~/.bitcoin/testnet3/.cookie
#export BAL_PUSHER_SIGNET_COOKIE_FILE=/~/.bitcoin/signet/.cookie
BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332
export BAL_PUSHER_SEND_STATS=true
export WELIST_SERVER_URL=http://localhost:8085
export BAL_SERVER_URL="http://127.0.0.1:9133"
export SSL_KEY_PATH="$(pwd)/private_key.pem"
cargo run --bin=bal-pusher regtest

View File

@@ -1,10 +1,11 @@
RUST_LOG=trace RUST_LOG=info
BAL_SERVER_DB_FILE=/home/bal/bal.db BAL_SERVER_DB_FILE="/home/bal/bal.db"
BAL_SERVER_INFO="BAL server test willexecutor" BAL_SERVER_INFO="BAL server test willexecutor"
BAL_SERVER_BIND_ADDRESS=127.0.0.1 BAL_SERVER_BIND_ADDRESS=127.0.0.1
BAL_SERVER_BIND_PORT=9133 BAL_SERVER_BIND_PORT=9133
BAL_SERVER_BITCOIN_ADDRESS="your bitcoin to recive payments here" BAL_SERVER_BITCOIN_ADDRESS="your bitcoin or xpub to recive payments here"
BAL_SERVER_BITCOIN_FIXED_FEE=50000 BAL_SERVER_BITCOIN_FIXED_FEE=50000
BAL_SERVER_PUB_KEY_PATH="/home/bal/public_key.pem"
BAL_SERVER_REGTEST_ADDRESS="vpub5UhLrYG1qQjnJhvJgBdqgpznyH11mxW9hwBYxf3KhfdjiupCFPUVDvgwpeZ9Wj5YUJXjKjXjy7DSbJNBW1sXbKwARiaphm1UjHYy3mKvTG4" BAL_SERVER_REGTEST_ADDRESS="vpub5UhLrYG1qQjnJhvJgBdqgpznyH11mxW9hwBYxf3KhfdjiupCFPUVDvgwpeZ9Wj5YUJXjKjXjy7DSbJNBW1sXbKwARiaphm1UjHYy3mKvTG4"
BAL_SERVER_REGTEST_FEE=5000 BAL_SERVER_REGTEST_FEE=5000

24
bal-server.sh Normal file
View File

@@ -0,0 +1,24 @@
WORKING_DIR=$(pwd)
if [ ! -f "$WORKING_DIR/public_key.pem" ]; then
echo "creating keypairs"
openssl genpkey -algorithm ED25519 -out private_key.pem
openssl pkey -in private_key.pem -pubout -out public_key.pem
fi
export RUST_LOG="trace"
export BAL_SERVER_DB_FILE="$WORKING_DIR/bal.db"
export BAL_SERVER_INFO="BAL devel willexecutor server"
export BAL_SERVER_BIND_ADDRESS="127.0.0.1"
export BAL_SERVER_BIND_PORT=9133
export BAL_SERVER_PUB_KEY_PATH="$WORKING_DIR/public_key.pem"
#export BAL_SERVER_BITCOIN_ADDRESS="your bitcoin address or xpub to recive payments here"
#export BAL_SERVER_BITCOIN_FIXED_FEE=50000
export BAL_SERVER_REGTEST_ADDRESS="vpub5UhLrYG1qQjnJhvJgBdqgpznyH11mxW9hwBYxf3KhfdjiupCFPUVDvgwpeZ9Wj5YUJXjKjXjy7DSbJNBW1sXbKwARiaphm1UjHYy3mKvTG4"
export BAL_SERVER_REGTEST_FEE=5000
#export BAL_SERVER_TESTNET_ADDRESS=
#export BAL_SERVER_TESTNET_FEE=100000
#export BAL_SERVER_SIGNET_ADDRESS=
#export BAL_SERVER_SIGNET_FEE=100000
cargo run --bin=bal-server

View File

@@ -8,21 +8,35 @@ use bitcoincore_rpc_json::GetBlockchainInfoResult;
use sqlite::{Value}; use sqlite::{Value};
use serde::Serialize; use serde::Serialize;
use serde::Deserialize; use serde::Deserialize;
use serde_json::json;
use std::env; use std::env;
use std::fs::OpenOptions; use log::{info,warn,error,trace,debug};
use std::io::{ Write}; use zmq::{Context, Socket, DEALER, DONTWAIT};
use log::{info,debug,warn,error};
use zmq::{Context, Socket};
use std::str; use std::str;
use std::{thread, time::Duration}; use std::{thread, time::Duration};
use std::collections::HashMap; use std::collections::HashMap;
//use byteorder::{LittleEndian, ReadBytesExt}; use byteorder::{LittleEndian, ReadBytesExt};
//use std::io::Cursor; use std::io::Cursor;
use hex; use hex;
use std::error::Error as StdError; use std::error::Error as StdError;
const LOCKTIME_THRESHOLD:i64 = 5000000; use reqwest::Client as rClient;
use openssl::hash::MessageDigest;
use openssl::pkey::{PKey};
use openssl::sign::Signer;
use openssl::sign::Verifier;
use base64::{engine::general_purpose, Engine as _};
use std::fs;
use std::time::Instant;
const LOCKTIME_THRESHOLD:i64 = 5000000;
const VERSION:&str = "0.0.1";
#[derive(Debug, Clone,Serialize, Deserialize)] #[derive(Debug, Clone,Serialize, Deserialize)]
struct MyConfig { struct MyConfig {
zmq_listener: String, zmq_listener: String,
@@ -33,8 +47,10 @@ struct MyConfig {
testnet: NetworkParams, testnet: NetworkParams,
signet: NetworkParams, signet: NetworkParams,
mainnet: NetworkParams, mainnet: NetworkParams,
send_stats: bool,
url: String,
secret_code: String,
ssl_key_path: String
} }
impl Default for MyConfig { impl Default for MyConfig {
@@ -48,6 +64,10 @@ impl Default for MyConfig {
testnet: get_network_params_default(Network::Testnet), testnet: get_network_params_default(Network::Testnet),
signet: get_network_params_default(Network::Signet), signet: get_network_params_default(Network::Signet),
mainnet: get_network_params_default(Network::Bitcoin), mainnet: get_network_params_default(Network::Bitcoin),
send_stats: false,
url: "http://localhost/".to_string(),
secret_code: "xxx".to_string(),
ssl_key_path: "privkey.pem".to_string(),
} }
} }
} }
@@ -73,7 +93,7 @@ fn get_network_params(cfg: &MyConfig,network:Network)-> &NetworkParams{
fn get_network_params_default(network:Network) -> NetworkParams{ fn get_network_params_default(network:Network) -> NetworkParams{
match network { match network {
Network::Testnet => NetworkParams{ Network::Testnet => NetworkParams{
host: "http://localhost".to_string(), host: "http://i27.0.0.1".to_string(),
port: 18332, port: 18332,
dir_path: "testnet3/".to_string(), dir_path: "testnet3/".to_string(),
db_field: "testnet".to_string(), db_field: "testnet".to_string(),
@@ -82,7 +102,7 @@ fn get_network_params_default(network:Network) -> NetworkParams{
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
Network::Signet => NetworkParams{ Network::Signet => NetworkParams{
host: "http://localhost".to_string(), host: "http://127.0.0.1".to_string(),
port: 18332, port: 18332,
dir_path: "signet/".to_string(), dir_path: "signet/".to_string(),
db_field: "signet".to_string(), db_field: "signet".to_string(),
@@ -91,7 +111,7 @@ fn get_network_params_default(network:Network) -> NetworkParams{
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
Network::Regtest => NetworkParams{ Network::Regtest => NetworkParams{
host: "http://localhost".to_string(), host: "http://127.0.0.1".to_string(),
port: 18443, port: 18443,
dir_path: "regtest/".to_string(), dir_path: "regtest/".to_string(),
db_field: "regtest".to_string(), db_field: "regtest".to_string(),
@@ -100,7 +120,7 @@ fn get_network_params_default(network:Network) -> NetworkParams{
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
_ => NetworkParams{ _ => NetworkParams{
host: "http://localhost".to_string(), host: "http://127.0.0.1".to_string(),
port: 8332, port: 8332,
dir_path: "".to_string(), dir_path: "".to_string(),
db_field: "bitcoin".to_string(), db_field: "bitcoin".to_string(),
@@ -117,7 +137,6 @@ fn get_cookie_filename(network: &NetworkParams) ->Result<String,Box<dyn StdError
}else{ }else{
match env::var_os("HOME") { match env::var_os("HOME") {
Some(home) => { Some(home) => {
info!("some home {}",home.to_str().unwrap());
match home.to_str(){ match home.to_str(){
Some(home_str) => { Some(home_str) => {
let cookie_file_path = format!("{}/.bitcoin/{}.cookie",home_str, network.dir_path); let cookie_file_path = format!("{}/.bitcoin/{}.cookie",home_str, network.dir_path);
@@ -148,9 +167,15 @@ fn get_client_from_cookie(url: &String,network: &NetworkParams)->Result<(Client,
match get_cookie_filename(network){ match get_cookie_filename(network){
Ok(cookie) => { Ok(cookie) => {
match Client::new(&url[..], Auth::CookieFile(cookie.into())) { match Client::new(&url[..], Auth::CookieFile(cookie.into())) {
Ok(client) => match client.get_blockchain_info(){ Ok(client) => {
Ok(bcinfo) => Ok((client,bcinfo)), match client.get_blockchain_info(){
Err(err) => Err(err.into()) Ok(bcinfo) => {
Ok((client,bcinfo))
},
Err(err) => {
Err(err.into())
}
}
}, },
Err(err)=>Err(err.into()) Err(err)=>Err(err.into())
@@ -160,7 +185,7 @@ fn get_client_from_cookie(url: &String,network: &NetworkParams)->Result<(Client,
} }
} }
fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{ fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{
let url = format!("{}:{}",network.host,&network.port); let url = format!("{}:{}/",network.host,&network.port);
match get_client_from_username(&url,network){ match get_client_from_username(&url,network){
Ok(client) =>{Ok(client)}, Ok(client) =>{Ok(client)},
Err(_) =>{ Err(_) =>{
@@ -173,7 +198,7 @@ fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult
} }
} }
} }
fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Error> { async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Error> {
/*let url = args.next().expect("Usage: <rpc_url> <username> <password>"); /*let url = args.next().expect("Usage: <rpc_url> <username> <password>");
@@ -200,10 +225,22 @@ fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Err
//} //}
//let average_time = time_sum/11; //let average_time = time_sum/11;
info!("median time: {}",bcinfo.median_time); info!("median time: {}",bcinfo.median_time);
//info!("height time: {}",bcinfo.median_time);
info!("blocks: {}",bcinfo.blocks);
debug!("best block hash: {}",bcinfo.best_block_hash);
let average_time = bcinfo.median_time; let average_time = bcinfo.median_time;
let db = sqlite::open(&cfg.db_file).unwrap(); let db = sqlite::open(&cfg.db_file).unwrap();
let query_tx = db.prepare("SELECT * FROM tbl_tx WHERE network = :network AND status = :status AND ( locktime < :bestblock_height OR locktime > :locktime_threshold AND locktime < :bestblock_time);").unwrap().into_iter();
let sqlquery = "SELECT * FROM tbl_tx WHERE network = :network AND status = :status AND ( locktime < :bestblock_height OR locktime > :locktime_threshold AND locktime < :bestblock_time);";
let query_tx = db.prepare(sqlquery).unwrap().into_iter();
trace!("query_tx: {}",sqlquery);
trace!(":locktime_threshold: {}", LOCKTIME_THRESHOLD );
trace!(":bestblock_time: {}", average_time);
trace!(":bestblock_height: {}", bcinfo.blocks);
trace!(":network: {}", network_params.db_field.clone());
trace!(":status: {}", 0);
//let query_tx = db.prepare("SELECT * FROM tbl_tx where status = :status").unwrap().into_iter(); //let query_tx = db.prepare("SELECT * FROM tbl_tx where status = :status").unwrap().into_iter();
let mut pushed_txs:Vec<String> = Vec::new(); let mut pushed_txs:Vec<String> = Vec::new();
let mut invalid_txs: std::collections::HashMap<String, String> = HashMap::new(); let mut invalid_txs: std::collections::HashMap<String, String> = HashMap::new();
@@ -223,25 +260,26 @@ fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Err
info!("to be pushed: {}: {}",txid, locktime); info!("to be pushed: {}: {}",txid, locktime);
match rpc.send_raw_transaction(tx){ match rpc.send_raw_transaction(tx){
Ok(o) => { Ok(o) => {
let mut file = OpenOptions::new() /*let mut file = OpenOptions::new()
.append(true) // Set the append option .append(true) // Set the append option
.create(true) // Create the file if it doesn't exist .create(true) // Create the file if it doesn't exist
.open("valid_txs")?; .open("valid_txs")?;
let data = format!("{}\t:\t{}\t:\t{}\n",txid,average_time,locktime); let data = format!("{}\t:\t{}\t:\t{}\n",txid,average_time,locktime);
file.write_all(data.as_bytes())?; file.write_all(data.as_bytes())?;
drop(file); drop(file);
*/
info!("tx: {} pusshata PUSHED\n{}",txid,o); info!("tx: {} pusshata PUSHED\n{}",txid,o);
pushed_txs.push(txid.to_string()); pushed_txs.push(txid.to_string());
}, },
Err(err) => { Err(err) => {
let mut file = OpenOptions::new() /*let mut file = OpenOptions::new()
.append(true) // Set the append option .append(true) // Set the append option
.create(true) // Create the file if it doesn't exist .create(true) // Create the file if it doesn't exist
.open("invalid_txs")?; .open("/home/bal/invalid_txs")?;
let data = format!("{}:\t{}\t:\t{}\t:\t{}\n",txid,err,average_time,locktime); let data = format!("{}:\t{}\t:\t{}\t:\t{}\n",txid,err,average_time,locktime);
file.write_all(data.as_bytes())?; file.write_all(data.as_bytes())?;
drop(file); drop(file);
*/
warn!("Error: {}\n{}",err,txid); warn!("Error: {}\n{}",err,txid);
//store err in invalid_txs //store err in invalid_txs
invalid_txs.insert(txid.to_string(), err.to_string()); invalid_txs.insert(txid.to_string(), err.to_string());
@@ -251,21 +289,70 @@ fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Err
} }
if pushed_txs.len() > 0 { if pushed_txs.len() > 0 {
let _ = db.execute(format!("UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",pushed_txs.join("','"))); let sql = format!("UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",pushed_txs.join("','"));
trace!("sqlok: {}",&sql);
let _ = db.execute(&sql);
} }
if invalid_txs.len() > 0 { if invalid_txs.len() > 0 {
for (txid,txerr) in &invalid_txs{ for (txid,txerr) in &invalid_txs{
//let _ = db.execute(format!("UPDATE tbl_tx SET status = 2 WHERE txid in ('{}'Yp);",invalid_txs.join("','"))); //let _ = db.execute(format!("UPDATE tbl_tx SET status = 2 WHERE txid in ('{}'Yp);",invalid_txs.join("','")));
let _ = db.execute(format!("UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'")); let sql = format!("UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'");
trace!("sqlerror: {}",&sql);
let _ = db.execute(&sql);
} }
} }
let _ = send_stats_report(cfg, bcinfo).await;
} }
Err(_)=>{ Err(erx)=>{
panic!("impossible to get client") panic!("impossible to get client {}",erx)
} }
} }
Ok(()) Ok(())
} }
async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> Result<(),reqwest::Error>{
if cfg.send_stats {
debug!("sending report to welist");
let welist_url=env::var("WELIST_SERVER_URL").unwrap_or("https://welist.bitcoin-after.life".to_string());
let client = rClient::new();
let url = format!("{}/ping",welist_url);
let chain=bcinfo.chain.to_string().to_lowercase();
let message = format!("{0}{1}{2}{3}{4}",cfg.url,chain,bcinfo.blocks,bcinfo.median_time,bcinfo.best_block_hash);
let sign = sign_message(cfg.ssl_key_path.as_str(),&message.as_str());
let response = client.post(url)
.header("User-Agent", format!("bal-pusher/{}",VERSION))
.json(&json!(
{
"url": cfg.url,
"chain": chain,
"height": bcinfo.blocks,
"median_time": bcinfo.median_time,
"last_block_hash": bcinfo.best_block_hash,
"signature": sign,
}))
.send().await?;
let body = &(response.text().await?);
info!("Report to welist({})\tSent: {}", welist_url,body);
}else {
debug!("Not sending stats");
}
Ok(())
}
fn sign_message(private_key_path: &str, message: &str) -> String {
let key_data = fs::read(private_key_path).unwrap();
let private_key = PKey::private_key_from_pem(&key_data).unwrap();
let mut signer = Signer::new_without_digest(&private_key).unwrap();
let signature = signer.sign_oneshot_to_vec(message.as_bytes()).unwrap();
let signature_b64 = general_purpose::STANDARD.encode(&signature);
signature_b64
}
fn parse_env(cfg: &mut MyConfig){ fn parse_env(cfg: &mut MyConfig){
match env::var("BAL_PUSHER_ZMQ_LISTENER") { match env::var("BAL_PUSHER_ZMQ_LISTENER") {
@@ -288,6 +375,27 @@ fn parse_env(cfg: &mut MyConfig){
cfg.bitcoin_dir = value;}, cfg.bitcoin_dir = value;},
Err(_) => {}, Err(_) => {},
} }
match env::var("BAL_PUSHER_SEND_STATS") {
Ok(value) => {
cfg.send_stats = value.parse::<bool>().unwrap();
},
Err(_) => {},
}
match env::var("BAL_SERVER_URL") {
Ok(value) => {
cfg.url= value;},
Err(_) => {},
}
match env::var("WELIST_SECRET_CODE") {
Ok(value) => {
cfg.secret_code = value;},
Err(_) => {},
}
match env::var("SSL_KEY_PATH") {
Ok(value) => {
cfg.ssl_key_path = value;},
Err(_) => {},
}
cfg.regtest = parse_env_netconfig(cfg,"regtest"); cfg.regtest = parse_env_netconfig(cfg,"regtest");
cfg.signet = parse_env_netconfig(cfg,"signet"); cfg.signet = parse_env_netconfig(cfg,"signet");
cfg.testnet = parse_env_netconfig(cfg,"testnet"); cfg.testnet = parse_env_netconfig(cfg,"testnet");
@@ -345,7 +453,75 @@ fn get_default_config()-> MyConfig {
confy::load("bal-pusher",None).expect("cant_load") confy::load("bal-pusher",None).expect("cant_load")
} }
fn main(){ fn check_zmq_connection(endpoint: &str) -> bool {
trace!("check zmq connection");
let context = Context::new();
let socket = match context.socket(DEALER) {
Ok(sock) => sock,
Err(_) => return false,
};
if socket.connect(endpoint).is_err() {
return false;
}
// Try to send an empty message non-blocking
socket.send("", DONTWAIT).is_ok()
}
// Add this struct to monitor connection health
struct ConnectionMonitor {
last_message_time: Instant,
timeout: Duration,
consecutive_timeouts: u32,
max_consecutive_timeouts: u32,
}
impl ConnectionMonitor {
fn new(timeout_secs: u64, max_timeouts: u32) -> Self {
Self {
last_message_time: Instant::now(),
timeout: Duration::from_secs(timeout_secs),
consecutive_timeouts: 0,
max_consecutive_timeouts: max_timeouts,
}
}
fn update(&mut self) {
self.last_message_time = Instant::now();
self.consecutive_timeouts = 0;
}
fn check_connection(&mut self) -> ConnectionStatus {
let elapsed = self.last_message_time.elapsed();
if elapsed > self.timeout {
self.consecutive_timeouts += 1;
if self.consecutive_timeouts >= self.max_consecutive_timeouts {
ConnectionStatus::Lost(elapsed)
} else {
ConnectionStatus::Warning(elapsed)
}
} else {
ConnectionStatus::Healthy
}
}
fn reset(&mut self) {
self.consecutive_timeouts = 0;
self.last_message_time = Instant::now();
}
}
enum ConnectionStatus {
Healthy,
Warning(Duration),
Lost(Duration),
}
#[tokio::main]
async fn main()-> std::io::Result<()>{
env_logger::init(); env_logger::init();
let mut cfg: MyConfig = match env::var("BAL_PUSHER_CONFIG_FILE") { let mut cfg: MyConfig = match env::var("BAL_PUSHER_CONFIG_FILE") {
Ok(value) => { Ok(value) => {
@@ -381,41 +557,43 @@ fn main(){
}; };
debug!("Network: {}",arg_network); info!("Network: {}",arg_network);
let network_params = get_network_params(&cfg,network); let network_params = get_network_params(&cfg,network);
let context = Context::new(); let context = Context::new();
let socket: Socket = context.socket(zmq::SUB).unwrap(); let socket: Socket = context.socket(zmq::SUB).unwrap();
let zmq_address = cfg.zmq_listener.clone(); let zmq_address = cfg.zmq_listener.clone();
info!("zmq listening on: {}",zmq_address);
socket.connect(&zmq_address).unwrap(); socket.connect(&zmq_address).unwrap();
socket.set_subscribe(b"").unwrap(); socket.set_subscribe(b"").unwrap();
let _ = main_result(&cfg,network_params); let _ = main_result(&cfg,network_params).await;
info!("waiting new blocks.."); info!("waiting new blocks..");
let mut last_seq:Vec<u8>=[0;4].to_vec(); let mut last_seq:Vec<u8>=[0;4].to_vec();
let mut counter=0;
let max=100;
loop { loop {
let message = socket.recv_multipart(0).unwrap(); let message = socket.recv_multipart(0).unwrap();
let topic = message[0].clone(); let topic = message[0].clone();
let body = message[1].clone(); let body = message[1].clone();
let seq = message[2].clone(); let seq = message[2].clone();
if last_seq >= seq {
continue
}
last_seq = seq; last_seq = seq;
//let mut sequence_str = "Unknown".to_string(); debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
/*if seq.len()==4{ trace!("ZMQ:GET BODY: {}", hex::encode(&body));
let mut rdr = Cursor::new(seq);
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
sequence_str = sequence.to_string();
}*/
if topic == b"hashblock" { if topic == b"hashblock" {
info!("NEW BLOCK{}", hex::encode(body)); info!("NEW BLOCK: {}", hex::encode(&body));
//let cfg = cfg.clone(); let _ = main_result(&cfg,network_params).await;
let _ = main_result(&cfg,network_params);
} }
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
} }
} }
fn seq_to_str(seq:&Vec<u8>) -> String{
if seq.len()==4{
let mut rdr = Cursor::new(seq);
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
return sequence.to_string();
}
"Unknown".to_string()
}

View File

@@ -11,6 +11,7 @@ use std::net::IpAddr;
use std::env; use std::env;
//use std::time::{SystemTime,UNIX_EPOCH}; //use std::time::{SystemTime,UNIX_EPOCH};
use std::fs;
use std::sync::{ Arc, Mutex, MutexGuard }; use std::sync::{ Arc, Mutex, MutexGuard };
//use std::net::SocketAddr; //use std::net::SocketAddr;
use std::collections::HashMap; use std::collections::HashMap;
@@ -33,7 +34,7 @@ use crate::db::{ create_database, get_next_address_index, insert_xpub, save_new_
#[path = "../xpub.rs"] #[path = "../xpub.rs"]
mod xpub; mod xpub;
use crate::xpub::new_address_from_xpub; use crate::xpub::new_address_from_xpub;
const VERSION:&str="0.2.0"; const VERSION:&str="0.2.1";
const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"]; const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"];
#[derive(Debug, Clone,Serialize, Deserialize)] #[derive(Debug, Clone,Serialize, Deserialize)]
struct NetConfig { struct NetConfig {
@@ -68,6 +69,7 @@ struct MyConfig {
bind_address: String, bind_address: String,
bind_port: u16, // Changed to u16 for port numbers bind_port: u16, // Changed to u16 for port numbers
db_file: String, db_file: String,
pub_key_path: String,
} }
#[derive(Debug,Serialize, Deserialize)] #[derive(Debug,Serialize, Deserialize)]
@@ -90,7 +92,8 @@ impl Default for MyConfig {
bind_address: "127.0.0.1".to_string(), bind_address: "127.0.0.1".to_string(),
bind_port: 9137, bind_port: 9137,
db_file: "bal.db".to_string(), db_file: "bal.db".to_string(),
info:"Will Executor Server".to_string() info: "Will Executor Server".to_string(),
pub_key_path: "public_key.pem".to_string(),
} }
} }
} }
@@ -109,6 +112,13 @@ async fn echo_version(
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
Ok(Response::new(full(VERSION))) Ok(Response::new(full(VERSION)))
} }
async fn echo_pub_key(
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let pub_key = fs::read_to_string(&cfg.pub_key_path)
.expect(format!("Failed to read public key file {}",cfg.pub_key_path).as_str());
Ok(Response::new(full(pub_key)))
}
async fn echo_info( async fn echo_info(
param: &str, param: &str,
cfg: &MyConfig, cfg: &MyConfig,
@@ -117,13 +127,13 @@ async fn echo_info(
info!("echo info!!!{}",param); info!("echo info!!!{}",param);
let netconfig=MyConfig::get_net_config(cfg,param); let netconfig=MyConfig::get_net_config(cfg,param);
if !netconfig.enabled { if !netconfig.enabled {
trace!("network disabled"); debug!("network disabled {}",param);
return Ok(Response::new(full("network disabled"))); return Ok(Response::new(full("network disabled")));
} }
let address = match netconfig.xpub{ let address = match netconfig.xpub{
false => { false => {
let address = netconfig.address.to_string(); let address = netconfig.address.to_string();
info!("is address: {}",&address); trace!("is address: {}",&address);
address address
}, },
true => { true => {
@@ -134,8 +144,8 @@ async fn echo_info(
let next = get_next_address_index(&db,&netconfig.name,&netconfig.address); let next = get_next_address_index(&db,&netconfig.name,&netconfig.address);
let address = new_address_from_xpub(&netconfig.address,next.1,netconfig.network).unwrap(); let address = new_address_from_xpub(&netconfig.address,next.1,netconfig.network).unwrap();
save_new_address(&db,next.0,&address.0,&address.1,&remote_addr); save_new_address(&db,next.0,&address.0,&address.1,&remote_addr);
debug!("address {} {}",address.0,address.1); debug!("save new address {} {}",address.0,address.1);
debug!("next {} {}",next.0,next.1); trace!("next {} {}",next.0,next.1);
address.0 address.0
} }
} }
@@ -151,7 +161,10 @@ async fn echo_info(
}; };
trace!("address: {:#?}",info); trace!("address: {:#?}",info);
match serde_json::to_string(&info){ match serde_json::to_string(&info){
Ok(json_data) => Ok(Response::new(full(json_data))), Ok(json_data) => {
debug!("echo info reply: {}", json_data);
return Ok(Response::new(full(json_data)));
},
Err(err) => Ok(Response::new(full(format!("error:{}",err)))) Err(err) => Ok(Response::new(full(format!("error:{}",err))))
} }
@@ -369,7 +382,7 @@ async fn echo_push(whole_body: &Bytes,
ptx.push((linenum+3,Value::String(line.to_string()))); ptx.push((linenum+3,Value::String(line.to_string())));
ptx.push((linenum+4,Value::String(locktime.to_string()))); ptx.push((linenum+4,Value::String(locktime.to_string())));
ptx.push((linenum+5,Value::String(req_time.to_string()))); ptx.push((linenum+5,Value::String(req_time.to_string())));
ptx.push((linenum+6,Value::String(param.to_string()))); ptx.push((linenum+6,Value::String(netconfig.name.to_string())));
ptx.push((linenum+7,Value::String(our_address.to_string()))); ptx.push((linenum+7,Value::String(our_address.to_string())));
ptx.push((linenum+8,Value::String(our_fees.to_string()))); ptx.push((linenum+8,Value::String(our_fees.to_string())));
linenum += 9; linenum += 9;
@@ -438,6 +451,9 @@ async fn echo(
if uri=="/version"{ if uri=="/version"{
ret= echo_version().await; ret= echo_version().await;
} }
if uri=="/.pub_key.pem" {
ret = echo_pub_key(cfg).await;
}
ret ret
} }
@@ -458,23 +474,35 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
.boxed() .boxed()
} }
fn parse_env(cfg: &Arc<Mutex<MyConfig>>){ fn parse_env(cfg: &Arc<Mutex<MyConfig>>){
for (key, value) in std::env::vars() {
debug!("ENVIRONMENT {key}: {value}");
}
let mut cfg_lock = cfg.lock().unwrap(); let mut cfg_lock = cfg.lock().unwrap();
if let Ok(value) = env::var("BAL_SERVER_DB_FILE") { if let Ok(value) = env::var("BAL_SERVER_DB_FILE") {
debug!("BAL_SERVER_DB_FILE: {}",value);
cfg_lock.db_file = value; cfg_lock.db_file = value;
} }
if let Ok(value) = env::var("BAL_SERVER_BIND_ADDRESS") { if let Ok(value) = env::var("BAL_SERVER_BIND_ADDRESS") {
debug!("BAL_SERVER_BIND_ADDRESS: {}",value);
cfg_lock.bind_address= value; cfg_lock.bind_address= value;
} }
if let Ok(value) = env::var("BAL_SERVER_BIND_PORT") { if let Ok(value) = env::var("BAL_SERVER_BIND_PORT") {
debug!("BAL_SERVER_BIND_PORT: {}",value);
if let Ok(v) = value.parse::<u16>(){ if let Ok(v) = value.parse::<u16>(){
cfg_lock.bind_port = v; cfg_lock.bind_port = v;
} }
} }
if let Ok(value) = env::var("BAL_SERVER_INFO"){ if let Ok(value) = env::var("BAL_SERVER_PUB_KEY_PATH") {
cfg_lock.info = value; debug!("BAL_SERVER_PUB_KEY_PATH: {}",value);
cfg_lock.pub_key_path = value;
} }
if let Ok(value) = env::var("BAL_SERVER_INFO"){
debug!("BAL_SERVER_INFO: {}",value);
cfg_lock.info = value;
}
cfg_lock = parse_env_netconfig(cfg_lock,"regtest"); cfg_lock = parse_env_netconfig(cfg_lock,"regtest");
cfg_lock = parse_env_netconfig(cfg_lock,"signet"); cfg_lock = parse_env_netconfig(cfg_lock,"signet");
cfg_lock = parse_env_netconfig(cfg_lock,"testnet"); cfg_lock = parse_env_netconfig(cfg_lock,"testnet");
@@ -489,6 +517,7 @@ fn parse_env_netconfig<'a>(mut cfg_lock: MutexGuard<'a, MyConfig>, chain: &'a st
&_ => &mut cfg_lock.mainnet, &_ => &mut cfg_lock.mainnet,
}; };
if let Ok(value) = env::var(format!("BAL_SERVER_{}_ADDRESS",chain.to_uppercase())) { if let Ok(value) = env::var(format!("BAL_SERVER_{}_ADDRESS",chain.to_uppercase())) {
debug!("BAL_SERVER_{}_ADDRESS: {}",chain.to_uppercase(),value);
cfg.address = value; cfg.address = value;
if cfg.address.len() > 5 { if cfg.address.len() > 5 {
if cfg.address[1..4] == *"pub" { if cfg.address[1..4] == *"pub" {
@@ -499,7 +528,8 @@ fn parse_env_netconfig<'a>(mut cfg_lock: MutexGuard<'a, MyConfig>, chain: &'a st
} }
} }
if let Ok(value) = env::var(format!("BAL_SERVER_{}_FIXE_FEE",chain.to_uppercase())) { if let Ok(value) = env::var(format!("BAL_SERVER_{}_FIXED_FEE",chain.to_uppercase())) {
debug!("BAL_SERVER_{}_FIXED_FEE: {}",chain.to_uppercase(),value);
if let Ok(v) = value.parse::<u64>(){ if let Ok(v) = value.parse::<u64>(){
cfg.fixed_fee = v; cfg.fixed_fee = v;
} }

View File

@@ -17,6 +17,8 @@ pub fn create_database(db: &Connection){
let _ = db.execute("CREATE UNIQUE INDEX idx_xpub ON tbl_xpub (network, xpub)"); let _ = db.execute("CREATE UNIQUE INDEX idx_xpub ON tbl_xpub (network, xpub)");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);"); let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);");
let _ = db.execute("UPDATE tbl_tx set network='bitcoin' where network='mainnet');");
} }
/* /*
@@ -129,4 +131,17 @@ pub fn execute_insert(db: &Connection,
Ok(()) Ok(())
} }
pub fn get_total_transaction_number(db: Connection, network: &String) -> Result<i64,Error> {
let mut stmt = db.prepare("SELECT COUNT(*) as total_number FROM tbl_tx where network = ?;").unwrap();
stmt.bind((1,Value::String(network.to_string()))).unwrap();
match stmt.next(){
Ok(State::Row)=>{
Ok(stmt.read::<i64,_>("total_number").unwrap())
},
Ok(sqlite::State::Done) => todo!(),
Err(err)=>Err(err)
}
}