Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
fa98283df6
|
|||
|
d5fe22cc14
|
|||
|
83749afddd
|
|||
|
dd075508b7
|
|||
|
4ac492ba79
|
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -124,8 +124,8 @@ dependencies = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bal-server"
|
name = "bal_server"
|
||||||
version = "0.1.0"
|
version = "0.2.3"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64 0.22.1",
|
"base64 0.22.1",
|
||||||
"bitcoin",
|
"bitcoin",
|
||||||
@@ -1315,9 +1315,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proc-macro2"
|
name = "proc-macro2"
|
||||||
version = "1.0.81"
|
version = "1.0.103"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba"
|
checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
@@ -1753,9 +1753,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "syn"
|
name = "syn"
|
||||||
version = "2.0.60"
|
version = "2.0.108"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3"
|
checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
|||||||
56
Cargo.toml
56
Cargo.toml
@@ -1,33 +1,33 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "bal-server"
|
name = "bal_server"
|
||||||
version = "0.1.0"
|
version = "0.2.3"
|
||||||
edition = "2021"
|
edition = "2024"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
[dependencies]
|
[dependencies]
|
||||||
base64 = "0.22.1"
|
base64 = { version = "0.22.1" }
|
||||||
bs58 = "0.4.0"
|
bs58 = { version = "0.4.0" }
|
||||||
bytes = "1.2"
|
bytes = { version = "1.2" }
|
||||||
bitcoin = { version = "0.32.5" }
|
bitcoin = { version = "0.32.5" }
|
||||||
bitcoincore-rpc = "0.19.0"
|
bitcoincore-rpc = { version = "0.19.0" }
|
||||||
bitcoincore-rpc-json = "0.19.0"
|
bitcoincore-rpc-json = { version = "0.19.0" }
|
||||||
byteorder = "1.5.0"
|
byteorder = { version = "1.5.0" }
|
||||||
confy = "0.6.1"
|
confy = { version = "0.6.1" }
|
||||||
chrono = "0.4.40"
|
chrono = { version = "0.4.40" }
|
||||||
env_logger = "0.11.5"
|
env_logger = { version = "0.11.5" }
|
||||||
hex = "0.4.3"
|
hex = { version = "0.4.3" }
|
||||||
hex-conservative = "0.1.1"
|
hex-conservative = { version = "0.1.1" }
|
||||||
hyper = { version = "1.3.1", features = ["http1","server"] }
|
hyper = { version = "1.3.1", features = ["http1","server"] }
|
||||||
hyper-util = { version = "0.1.3", features = ["tokio"] }
|
hyper-util = { version = "0.1.3", features = ["tokio"] }
|
||||||
http-body-util = "0.1"
|
http-body-util = { version = "0.1" }
|
||||||
log = "0.4.21"
|
log = { version = "0.4.21" }
|
||||||
openssl = { version = "0.10.74", features = ["vendored"] }
|
openssl = { version = "0.10.74", features = ["vendored"] }
|
||||||
sha2 = "0.10.8"
|
sha2 = { version = "0.10.8" }
|
||||||
serde = { version = "1.0.152", features = ["derive"] }
|
serde = { version = "1.0.152", features = ["derive"] }
|
||||||
serde_json = "1.0.116"
|
serde_json = { version = "1.0.116" }
|
||||||
sqlite = "0.34.0"
|
sqlite = { version = "0.34.0" }
|
||||||
regex = "1.10.4"
|
regex = { version = "1.10.4" }
|
||||||
reqwest = { version = "0.12.24", features = ["json","socks"] }
|
reqwest = { version = "0.12.24", features = ["json","socks"] }
|
||||||
tokio = { version = "1", features = ["rt", "net","macros","rt-multi-thread"] } # Keep only necessary runtime components
|
tokio = { version = "1", features = ["rt", "net","macros","rt-multi-thread"] } # Keep only necessary runtime components
|
||||||
zmq = "0.10.0"
|
zmq = { version = "0.10.0" }
|
||||||
|
|
||||||
|
|||||||
@@ -79,7 +79,7 @@ To use `bal-pusher`, you need to compile and install Bitcoin with ZMQ (ZeroMQ) s
|
|||||||
Once the application is installed and configured, you can start `bal-pusher` by running the following command:
|
Once the application is installed and configured, you can start `bal-pusher` by running the following command:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
$ bal-pusher
|
$ bal-pusher [bitcoin|testnet|regtest|]
|
||||||
```
|
```
|
||||||
|
|
||||||
This will start the service, which will listen for Bitcoin blocks via ZMQ and push transactions from the database when their locktime exceeds the median time past.
|
This will start the service, which will listen for Bitcoin blocks via ZMQ and push transactions from the database when their locktime exceeds the median time past.
|
||||||
|
|||||||
@@ -12,5 +12,5 @@ BAL_PUSHER_SEND_STATS=true
|
|||||||
WELIST_SERVER_URL=http://welist.bitcoin-after.life
|
WELIST_SERVER_URL=http://welist.bitcoin-after.life
|
||||||
SSL_KEY_PATH=/home/bal/privkey.pem
|
SSL_KEY_PATH=/home/bal/privkey.pem
|
||||||
|
|
||||||
#your server domain. do not add https or final / only domain.
|
#your server domain. do not final / only domain.
|
||||||
BAL_SERVER_URL="https://we.bitcoin-after.life"
|
BAL_SERVER_URL="https://we.bitcoin-after.life"
|
||||||
|
|||||||
@@ -11,12 +11,12 @@ use serde::Deserialize;
|
|||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::env;
|
use std::env;
|
||||||
use log::{info,warn,error,trace,debug};
|
use log::{info,warn,error,trace,debug};
|
||||||
use zmq::{Context, Socket};
|
use zmq::{Context, Socket, DEALER, DONTWAIT};
|
||||||
use std::str;
|
use std::str;
|
||||||
use std::{thread, time::Duration};
|
use std::{thread, time::Duration};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
//use byteorder::{LittleEndian, ReadBytesExt};
|
use byteorder::{LittleEndian, ReadBytesExt};
|
||||||
//use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
use hex;
|
use hex;
|
||||||
use std::error::Error as StdError;
|
use std::error::Error as StdError;
|
||||||
|
|
||||||
@@ -27,12 +27,16 @@ use openssl::sign::Signer;
|
|||||||
use openssl::sign::Verifier;
|
use openssl::sign::Verifier;
|
||||||
use base64::{engine::general_purpose, Engine as _};
|
use base64::{engine::general_purpose, Engine as _};
|
||||||
use std::fs;
|
use std::fs;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
const LOCKTIME_THRESHOLD:i64 = 5000000;
|
const LOCKTIME_THRESHOLD:i64 = 5000000;
|
||||||
const VERSION:&str = "0.0.1";
|
const VERSION:&str = "0.0.2";
|
||||||
#[derive(Debug, Clone,Serialize, Deserialize)]
|
#[derive(Debug, Clone,Serialize, Deserialize)]
|
||||||
struct MyConfig {
|
struct MyConfig {
|
||||||
zmq_listener: String,
|
zmq_listener: String,
|
||||||
@@ -312,8 +316,10 @@ async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> R
|
|||||||
|
|
||||||
let client = rClient::new();
|
let client = rClient::new();
|
||||||
let url = format!("{}/ping",welist_url);
|
let url = format!("{}/ping",welist_url);
|
||||||
|
debug!("welist url: {}",url);
|
||||||
let chain=bcinfo.chain.to_string().to_lowercase();
|
let chain=bcinfo.chain.to_string().to_lowercase();
|
||||||
let message = format!("{0}{1}{2}{3}{4}",cfg.url,chain,bcinfo.blocks,bcinfo.median_time,bcinfo.best_block_hash);
|
let message = format!("{0}{1}{2}{3}{4}",cfg.url,chain,bcinfo.blocks,bcinfo.median_time,bcinfo.best_block_hash);
|
||||||
|
trace!("message to be sent: {}",message);
|
||||||
let sign = sign_message(cfg.ssl_key_path.as_str(),&message.as_str());
|
let sign = sign_message(cfg.ssl_key_path.as_str(),&message.as_str());
|
||||||
let response = client.post(url)
|
let response = client.post(url)
|
||||||
.header("User-Agent", format!("bal-pusher/{}",VERSION))
|
.header("User-Agent", format!("bal-pusher/{}",VERSION))
|
||||||
@@ -326,9 +332,13 @@ async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> R
|
|||||||
"last_block_hash": bcinfo.best_block_hash,
|
"last_block_hash": bcinfo.best_block_hash,
|
||||||
"signature": sign,
|
"signature": sign,
|
||||||
}))
|
}))
|
||||||
.send().await?;
|
.send().await?;
|
||||||
|
if !response.status().is_success() {
|
||||||
|
warn!("Non-success response: {} {}", response.status(), response.status().canonical_reason().unwrap_or(""));
|
||||||
|
}
|
||||||
|
|
||||||
let body = &(response.text().await?);
|
let body = &(response.text().await?);
|
||||||
trace!("Body: {}", body);
|
info!("Report to welist({})\tSent: {}", welist_url,body);
|
||||||
}else {
|
}else {
|
||||||
debug!("Not sending stats");
|
debug!("Not sending stats");
|
||||||
}
|
}
|
||||||
@@ -448,6 +458,74 @@ fn get_default_config()-> MyConfig {
|
|||||||
info!("Default configuration file path is: {:#?}", file);
|
info!("Default configuration file path is: {:#?}", file);
|
||||||
confy::load("bal-pusher",None).expect("cant_load")
|
confy::load("bal-pusher",None).expect("cant_load")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn check_zmq_connection(endpoint: &str) -> bool {
|
||||||
|
trace!("check zmq connection");
|
||||||
|
let context = Context::new();
|
||||||
|
let socket = match context.socket(DEALER) {
|
||||||
|
Ok(sock) => sock,
|
||||||
|
Err(_) => return false,
|
||||||
|
};
|
||||||
|
|
||||||
|
if socket.connect(endpoint).is_err() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to send an empty message non-blocking
|
||||||
|
socket.send("", DONTWAIT).is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add this struct to monitor connection health
|
||||||
|
struct ConnectionMonitor {
|
||||||
|
last_message_time: Instant,
|
||||||
|
timeout: Duration,
|
||||||
|
consecutive_timeouts: u32,
|
||||||
|
max_consecutive_timeouts: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionMonitor {
|
||||||
|
fn new(timeout_secs: u64, max_timeouts: u32) -> Self {
|
||||||
|
Self {
|
||||||
|
last_message_time: Instant::now(),
|
||||||
|
timeout: Duration::from_secs(timeout_secs),
|
||||||
|
consecutive_timeouts: 0,
|
||||||
|
max_consecutive_timeouts: max_timeouts,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update(&mut self) {
|
||||||
|
self.last_message_time = Instant::now();
|
||||||
|
self.consecutive_timeouts = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_connection(&mut self) -> ConnectionStatus {
|
||||||
|
let elapsed = self.last_message_time.elapsed();
|
||||||
|
|
||||||
|
if elapsed > self.timeout {
|
||||||
|
self.consecutive_timeouts += 1;
|
||||||
|
|
||||||
|
if self.consecutive_timeouts >= self.max_consecutive_timeouts {
|
||||||
|
ConnectionStatus::Lost(elapsed)
|
||||||
|
} else {
|
||||||
|
ConnectionStatus::Warning(elapsed)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ConnectionStatus::Healthy
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset(&mut self) {
|
||||||
|
self.consecutive_timeouts = 0;
|
||||||
|
self.last_message_time = Instant::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum ConnectionStatus {
|
||||||
|
Healthy,
|
||||||
|
Warning(Duration),
|
||||||
|
Lost(Duration),
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main()-> std::io::Result<()>{
|
async fn main()-> std::io::Result<()>{
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
@@ -488,7 +566,6 @@ async fn main()-> std::io::Result<()>{
|
|||||||
info!("Network: {}",arg_network);
|
info!("Network: {}",arg_network);
|
||||||
let network_params = get_network_params(&cfg,network);
|
let network_params = get_network_params(&cfg,network);
|
||||||
|
|
||||||
|
|
||||||
let context = Context::new();
|
let context = Context::new();
|
||||||
let socket: Socket = context.socket(zmq::SUB).unwrap();
|
let socket: Socket = context.socket(zmq::SUB).unwrap();
|
||||||
|
|
||||||
@@ -501,28 +578,28 @@ async fn main()-> std::io::Result<()>{
|
|||||||
let _ = main_result(&cfg,network_params).await;
|
let _ = main_result(&cfg,network_params).await;
|
||||||
info!("waiting new blocks..");
|
info!("waiting new blocks..");
|
||||||
let mut last_seq:Vec<u8>=[0;4].to_vec();
|
let mut last_seq:Vec<u8>=[0;4].to_vec();
|
||||||
|
let mut counter=0;
|
||||||
|
let max=100;
|
||||||
loop {
|
loop {
|
||||||
let message = socket.recv_multipart(0).unwrap();
|
let message = socket.recv_multipart(0).unwrap();
|
||||||
let topic = message[0].clone();
|
let topic = message[0].clone();
|
||||||
let body = message[1].clone();
|
let body = message[1].clone();
|
||||||
let seq = message[2].clone();
|
let seq = message[2].clone();
|
||||||
if last_seq >= seq {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
last_seq = seq;
|
last_seq = seq;
|
||||||
//let mut sequence_str = "Unknown".to_string();
|
|
||||||
/*if seq.len()==4{
|
|
||||||
let mut rdr = Cursor::new(seq);
|
|
||||||
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
|
|
||||||
sequence_str = sequence.to_string();
|
|
||||||
}*/
|
|
||||||
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
|
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
|
||||||
trace!("ZMQ:GET BODY: {}", hex::encode(&body));
|
trace!("ZMQ:GET BODY: {}", hex::encode(&body));
|
||||||
if topic == b"hashblock" {
|
if topic == b"hashblock" {
|
||||||
info!("NEW BLOCK: {}", hex::encode(&body));
|
info!("NEW BLOCK: {}", hex::encode(&body));
|
||||||
//let cfg = cfg.clone();
|
|
||||||
let _ = main_result(&cfg,network_params).await;
|
let _ = main_result(&cfg,network_params).await;
|
||||||
}
|
}
|
||||||
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
|
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fn seq_to_str(seq:&Vec<u8>) -> String{
|
||||||
|
if seq.len()==4{
|
||||||
|
let mut rdr = Cursor::new(seq);
|
||||||
|
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
|
||||||
|
return sequence.to_string();
|
||||||
|
}
|
||||||
|
"Unknown".to_string()
|
||||||
|
}
|
||||||
|
|||||||
@@ -26,15 +26,10 @@ use log::{ info, error, trace, debug};
|
|||||||
use serde_json;
|
use serde_json;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
|
|
||||||
#[path = "../db.rs"]
|
use bal_server::db::{ create_database, get_next_address_index, insert_xpub, save_new_address, get_last_used_address_by_ip, execute_insert };
|
||||||
mod db;
|
use bal_server::xpub::new_address_from_xpub;
|
||||||
use crate::db::{ create_database, get_next_address_index, insert_xpub, save_new_address, get_last_used_address_by_ip, execute_insert };
|
|
||||||
|
|
||||||
|
const VERSION:&str=env!("CARGO_PKG_VERSION");
|
||||||
#[path = "../xpub.rs"]
|
|
||||||
mod xpub;
|
|
||||||
use crate::xpub::new_address_from_xpub;
|
|
||||||
const VERSION:&str="0.2.1";
|
|
||||||
const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"];
|
const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"];
|
||||||
#[derive(Debug, Clone,Serialize, Deserialize)]
|
#[derive(Debug, Clone,Serialize, Deserialize)]
|
||||||
struct NetConfig {
|
struct NetConfig {
|
||||||
@@ -112,6 +107,11 @@ async fn echo_version(
|
|||||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||||
Ok(Response::new(full(VERSION)))
|
Ok(Response::new(full(VERSION)))
|
||||||
}
|
}
|
||||||
|
async fn echo_home(cfg: &MyConfig
|
||||||
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||||
|
debug!("echo_home: {}", cfg.info );
|
||||||
|
Ok(Response::new(full(cfg.info.clone())))
|
||||||
|
}
|
||||||
async fn echo_pub_key(
|
async fn echo_pub_key(
|
||||||
cfg: &MyConfig,
|
cfg: &MyConfig,
|
||||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||||
@@ -454,6 +454,9 @@ async fn echo(
|
|||||||
if uri=="/.pub_key.pem" {
|
if uri=="/.pub_key.pem" {
|
||||||
ret = echo_pub_key(cfg).await;
|
ret = echo_pub_key(cfg).await;
|
||||||
}
|
}
|
||||||
|
if uri=="/"{
|
||||||
|
ret = echo_home(cfg).await;
|
||||||
|
}
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user