5 Commits
v0.2.2 ... main

Author SHA1 Message Date
fa98283df6 version 2026-01-19 20:03:17 -04:00
d5fe22cc14 bal server route / 2025-11-03 09:36:56 -04:00
83749afddd update version 2025-11-03 08:53:58 -04:00
dd075508b7 fix bal-pusher zmq connection stability 2025-11-03 08:46:13 -04:00
4ac492ba79 fix bal-pusher zmq connection stability 2025-11-03 07:42:09 -04:00
6 changed files with 141 additions and 61 deletions

12
Cargo.lock generated
View File

@@ -124,8 +124,8 @@ dependencies = [
] ]
[[package]] [[package]]
name = "bal-server" name = "bal_server"
version = "0.1.0" version = "0.2.3"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bitcoin", "bitcoin",
@@ -1315,9 +1315,9 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.81" version = "1.0.103"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba" checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@@ -1753,9 +1753,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.60" version = "2.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3" checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@@ -1,33 +1,33 @@
[package] [package]
name = "bal-server" name = "bal_server"
version = "0.1.0" version = "0.2.3"
edition = "2021" edition = "2024"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
base64 = "0.22.1" base64 = { version = "0.22.1" }
bs58 = "0.4.0" bs58 = { version = "0.4.0" }
bytes = "1.2" bytes = { version = "1.2" }
bitcoin = { version = "0.32.5" } bitcoin = { version = "0.32.5" }
bitcoincore-rpc = "0.19.0" bitcoincore-rpc = { version = "0.19.0" }
bitcoincore-rpc-json = "0.19.0" bitcoincore-rpc-json = { version = "0.19.0" }
byteorder = "1.5.0" byteorder = { version = "1.5.0" }
confy = "0.6.1" confy = { version = "0.6.1" }
chrono = "0.4.40" chrono = { version = "0.4.40" }
env_logger = "0.11.5" env_logger = { version = "0.11.5" }
hex = "0.4.3" hex = { version = "0.4.3" }
hex-conservative = "0.1.1" hex-conservative = { version = "0.1.1" }
hyper = { version = "1.3.1", features = ["http1","server"] } hyper = { version = "1.3.1", features = ["http1","server"] }
hyper-util = { version = "0.1.3", features = ["tokio"] } hyper-util = { version = "0.1.3", features = ["tokio"] }
http-body-util = "0.1" http-body-util = { version = "0.1" }
log = "0.4.21" log = { version = "0.4.21" }
openssl = { version = "0.10.74", features = ["vendored"] } openssl = { version = "0.10.74", features = ["vendored"] }
sha2 = "0.10.8" sha2 = { version = "0.10.8" }
serde = { version = "1.0.152", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.116" serde_json = { version = "1.0.116" }
sqlite = "0.34.0" sqlite = { version = "0.34.0" }
regex = "1.10.4" regex = { version = "1.10.4" }
reqwest = { version = "0.12.24", features = ["json","socks"] } reqwest = { version = "0.12.24", features = ["json","socks"] }
tokio = { version = "1", features = ["rt", "net","macros","rt-multi-thread"] } # Keep only necessary runtime components tokio = { version = "1", features = ["rt", "net","macros","rt-multi-thread"] } # Keep only necessary runtime components
zmq = "0.10.0" zmq = { version = "0.10.0" }

View File

@@ -79,7 +79,7 @@ To use `bal-pusher`, you need to compile and install Bitcoin with ZMQ (ZeroMQ) s
Once the application is installed and configured, you can start `bal-pusher` by running the following command: Once the application is installed and configured, you can start `bal-pusher` by running the following command:
```bash ```bash
$ bal-pusher $ bal-pusher [bitcoin|testnet|regtest|]
``` ```
This will start the service, which will listen for Bitcoin blocks via ZMQ and push transactions from the database when their locktime exceeds the median time past. This will start the service, which will listen for Bitcoin blocks via ZMQ and push transactions from the database when their locktime exceeds the median time past.

View File

@@ -12,5 +12,5 @@ BAL_PUSHER_SEND_STATS=true
WELIST_SERVER_URL=http://welist.bitcoin-after.life WELIST_SERVER_URL=http://welist.bitcoin-after.life
SSL_KEY_PATH=/home/bal/privkey.pem SSL_KEY_PATH=/home/bal/privkey.pem
#your server domain. do not add https or final / only domain. #your server domain. do not final / only domain.
BAL_SERVER_URL="https://we.bitcoin-after.life" BAL_SERVER_URL="https://we.bitcoin-after.life"

View File

@@ -11,12 +11,12 @@ use serde::Deserialize;
use serde_json::json; use serde_json::json;
use std::env; use std::env;
use log::{info,warn,error,trace,debug}; use log::{info,warn,error,trace,debug};
use zmq::{Context, Socket}; use zmq::{Context, Socket, DEALER, DONTWAIT};
use std::str; use std::str;
use std::{thread, time::Duration}; use std::{thread, time::Duration};
use std::collections::HashMap; use std::collections::HashMap;
//use byteorder::{LittleEndian, ReadBytesExt}; use byteorder::{LittleEndian, ReadBytesExt};
//use std::io::Cursor; use std::io::Cursor;
use hex; use hex;
use std::error::Error as StdError; use std::error::Error as StdError;
@@ -27,12 +27,16 @@ use openssl::sign::Signer;
use openssl::sign::Verifier; use openssl::sign::Verifier;
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose, Engine as _};
use std::fs; use std::fs;
use std::time::Instant;
const LOCKTIME_THRESHOLD:i64 = 5000000; const LOCKTIME_THRESHOLD:i64 = 5000000;
const VERSION:&str = "0.0.1"; const VERSION:&str = "0.0.2";
#[derive(Debug, Clone,Serialize, Deserialize)] #[derive(Debug, Clone,Serialize, Deserialize)]
struct MyConfig { struct MyConfig {
zmq_listener: String, zmq_listener: String,
@@ -312,8 +316,10 @@ async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> R
let client = rClient::new(); let client = rClient::new();
let url = format!("{}/ping",welist_url); let url = format!("{}/ping",welist_url);
debug!("welist url: {}",url);
let chain=bcinfo.chain.to_string().to_lowercase(); let chain=bcinfo.chain.to_string().to_lowercase();
let message = format!("{0}{1}{2}{3}{4}",cfg.url,chain,bcinfo.blocks,bcinfo.median_time,bcinfo.best_block_hash); let message = format!("{0}{1}{2}{3}{4}",cfg.url,chain,bcinfo.blocks,bcinfo.median_time,bcinfo.best_block_hash);
trace!("message to be sent: {}",message);
let sign = sign_message(cfg.ssl_key_path.as_str(),&message.as_str()); let sign = sign_message(cfg.ssl_key_path.as_str(),&message.as_str());
let response = client.post(url) let response = client.post(url)
.header("User-Agent", format!("bal-pusher/{}",VERSION)) .header("User-Agent", format!("bal-pusher/{}",VERSION))
@@ -326,9 +332,13 @@ async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> R
"last_block_hash": bcinfo.best_block_hash, "last_block_hash": bcinfo.best_block_hash,
"signature": sign, "signature": sign,
})) }))
.send().await?; .send().await?;
if !response.status().is_success() {
warn!("Non-success response: {} {}", response.status(), response.status().canonical_reason().unwrap_or(""));
}
let body = &(response.text().await?); let body = &(response.text().await?);
trace!("Body: {}", body); info!("Report to welist({})\tSent: {}", welist_url,body);
}else { }else {
debug!("Not sending stats"); debug!("Not sending stats");
} }
@@ -448,6 +458,74 @@ fn get_default_config()-> MyConfig {
info!("Default configuration file path is: {:#?}", file); info!("Default configuration file path is: {:#?}", file);
confy::load("bal-pusher",None).expect("cant_load") confy::load("bal-pusher",None).expect("cant_load")
} }
fn check_zmq_connection(endpoint: &str) -> bool {
trace!("check zmq connection");
let context = Context::new();
let socket = match context.socket(DEALER) {
Ok(sock) => sock,
Err(_) => return false,
};
if socket.connect(endpoint).is_err() {
return false;
}
// Try to send an empty message non-blocking
socket.send("", DONTWAIT).is_ok()
}
// Add this struct to monitor connection health
struct ConnectionMonitor {
last_message_time: Instant,
timeout: Duration,
consecutive_timeouts: u32,
max_consecutive_timeouts: u32,
}
impl ConnectionMonitor {
fn new(timeout_secs: u64, max_timeouts: u32) -> Self {
Self {
last_message_time: Instant::now(),
timeout: Duration::from_secs(timeout_secs),
consecutive_timeouts: 0,
max_consecutive_timeouts: max_timeouts,
}
}
fn update(&mut self) {
self.last_message_time = Instant::now();
self.consecutive_timeouts = 0;
}
fn check_connection(&mut self) -> ConnectionStatus {
let elapsed = self.last_message_time.elapsed();
if elapsed > self.timeout {
self.consecutive_timeouts += 1;
if self.consecutive_timeouts >= self.max_consecutive_timeouts {
ConnectionStatus::Lost(elapsed)
} else {
ConnectionStatus::Warning(elapsed)
}
} else {
ConnectionStatus::Healthy
}
}
fn reset(&mut self) {
self.consecutive_timeouts = 0;
self.last_message_time = Instant::now();
}
}
enum ConnectionStatus {
Healthy,
Warning(Duration),
Lost(Duration),
}
#[tokio::main] #[tokio::main]
async fn main()-> std::io::Result<()>{ async fn main()-> std::io::Result<()>{
env_logger::init(); env_logger::init();
@@ -488,7 +566,6 @@ async fn main()-> std::io::Result<()>{
info!("Network: {}",arg_network); info!("Network: {}",arg_network);
let network_params = get_network_params(&cfg,network); let network_params = get_network_params(&cfg,network);
let context = Context::new(); let context = Context::new();
let socket: Socket = context.socket(zmq::SUB).unwrap(); let socket: Socket = context.socket(zmq::SUB).unwrap();
@@ -501,28 +578,28 @@ async fn main()-> std::io::Result<()>{
let _ = main_result(&cfg,network_params).await; let _ = main_result(&cfg,network_params).await;
info!("waiting new blocks.."); info!("waiting new blocks..");
let mut last_seq:Vec<u8>=[0;4].to_vec(); let mut last_seq:Vec<u8>=[0;4].to_vec();
let mut counter=0;
let max=100;
loop { loop {
let message = socket.recv_multipart(0).unwrap(); let message = socket.recv_multipart(0).unwrap();
let topic = message[0].clone(); let topic = message[0].clone();
let body = message[1].clone(); let body = message[1].clone();
let seq = message[2].clone(); let seq = message[2].clone();
if last_seq >= seq {
continue
}
last_seq = seq; last_seq = seq;
//let mut sequence_str = "Unknown".to_string();
/*if seq.len()==4{
let mut rdr = Cursor::new(seq);
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
sequence_str = sequence.to_string();
}*/
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic")); debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
trace!("ZMQ:GET BODY: {}", hex::encode(&body)); trace!("ZMQ:GET BODY: {}", hex::encode(&body));
if topic == b"hashblock" { if topic == b"hashblock" {
info!("NEW BLOCK: {}", hex::encode(&body)); info!("NEW BLOCK: {}", hex::encode(&body));
//let cfg = cfg.clone();
let _ = main_result(&cfg,network_params).await; let _ = main_result(&cfg,network_params).await;
} }
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
} }
} }
fn seq_to_str(seq:&Vec<u8>) -> String{
if seq.len()==4{
let mut rdr = Cursor::new(seq);
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
return sequence.to_string();
}
"Unknown".to_string()
}

View File

@@ -26,15 +26,10 @@ use log::{ info, error, trace, debug};
use serde_json; use serde_json;
use chrono::Utc; use chrono::Utc;
#[path = "../db.rs"] use bal_server::db::{ create_database, get_next_address_index, insert_xpub, save_new_address, get_last_used_address_by_ip, execute_insert };
mod db; use bal_server::xpub::new_address_from_xpub;
use crate::db::{ create_database, get_next_address_index, insert_xpub, save_new_address, get_last_used_address_by_ip, execute_insert };
const VERSION:&str=env!("CARGO_PKG_VERSION");
#[path = "../xpub.rs"]
mod xpub;
use crate::xpub::new_address_from_xpub;
const VERSION:&str="0.2.1";
const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"]; const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"];
#[derive(Debug, Clone,Serialize, Deserialize)] #[derive(Debug, Clone,Serialize, Deserialize)]
struct NetConfig { struct NetConfig {
@@ -112,6 +107,11 @@ async fn echo_version(
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
Ok(Response::new(full(VERSION))) Ok(Response::new(full(VERSION)))
} }
async fn echo_home(cfg: &MyConfig
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
debug!("echo_home: {}", cfg.info );
Ok(Response::new(full(cfg.info.clone())))
}
async fn echo_pub_key( async fn echo_pub_key(
cfg: &MyConfig, cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
@@ -454,6 +454,9 @@ async fn echo(
if uri=="/.pub_key.pem" { if uri=="/.pub_key.pem" {
ret = echo_pub_key(cfg).await; ret = echo_pub_key(cfg).await;
} }
if uri=="/"{
ret = echo_home(cfg).await;
}
ret ret
} }