From dcaab2ed6d791a6ef6e119c6101e3c1135cb9543 Mon Sep 17 00:00:00 2001 From: svatantrya Date: Wed, 21 Jan 2026 00:08:47 -0400 Subject: [PATCH] expose stats --- bal-pusher.sh | 4 +- bal-server.sh | 1 + src/bin/bal-pusher.rs | 681 ++++++++++++++++++++++-------------------- src/bin/bal-server.rs | 579 ++++++++++++++++++++--------------- src/db.rs | 160 +++++----- src/xpub.rs | 56 ++-- 6 files changed, 814 insertions(+), 667 deletions(-) diff --git a/bal-pusher.sh b/bal-pusher.sh index fb90a88..b1169ca 100644 --- a/bal-pusher.sh +++ b/bal-pusher.sh @@ -1,12 +1,12 @@ RUST_LOG=trace -BAL_PUSHER_DB_FILE="$(pwd)/bal.db" +export BAL_PUSHER_DB_FILE="$(pwd)/bal.db" #export BAL_PUSHER_BITCOIN_COOKIE_FILE=/~/.bitcoin/.cookie #export BAL_PUSHER_REGTEST_COOKIE_FILE=/~/.bitcoin/regtest/.cookie #export BAL_PUSHER_TESTNET_COOKIE_FILE=/~/.bitcoin/testnet3/.cookie #export BAL_PUSHER_SIGNET_COOKIE_FILE=/~/.bitcoin/signet/.cookie -BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332 +export BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332 export BAL_PUSHER_SEND_STATS=true export WELIST_SERVER_URL=http://localhost:8085 export BAL_SERVER_URL="http://127.0.0.1:9133" diff --git a/bal-server.sh b/bal-server.sh index 3342820..bf535cb 100644 --- a/bal-server.sh +++ b/bal-server.sh @@ -11,6 +11,7 @@ export BAL_SERVER_INFO="BAL devel willexecutor server" export BAL_SERVER_BIND_ADDRESS="127.0.0.1" export BAL_SERVER_BIND_PORT=9133 export BAL_SERVER_PUB_KEY_PATH="$WORKING_DIR/public_key.pem" +export BAL_SERVER_EXPOSE_STATS=true; #export BAL_SERVER_BITCOIN_ADDRESS="your bitcoin address or xpub to recive payments here" #export BAL_SERVER_BITCOIN_FIXED_FEE=50000 diff --git a/src/bin/bal-pusher.rs b/src/bin/bal-pusher.rs index f0b545b..65edd14 100644 --- a/src/bin/bal-pusher.rs +++ b/src/bin/bal-pusher.rs @@ -2,212 +2,197 @@ extern crate bitcoincore_rpc; extern crate zmq; use bitcoin::Network; -use bitcoincore_rpc::{bitcoin, Auth, Client, Error, RpcApi}; +use bitcoincore_rpc::{Auth, Client, Error, RpcApi, bitcoin}; use bitcoincore_rpc_json::GetBlockchainInfoResult; -use sqlite::{Value}; -use serde::Serialize; +use byteorder::{LittleEndian, ReadBytesExt}; +use hex; +use log::{debug, error, info, trace, warn}; use serde::Deserialize; +use serde::Serialize; use serde_json::json; +use sqlite::{Value, Connection}; +use std::collections::HashMap; use std::env; -use log::{info,warn,error,trace,debug}; -use zmq::{Context, Socket, DEALER, DONTWAIT}; +use std::error::Error as StdError; +use std::io::Cursor; use std::str; use std::{thread, time::Duration}; -use std::collections::HashMap; -use byteorder::{LittleEndian, ReadBytesExt}; -use std::io::Cursor; -use hex; -use std::error::Error as StdError; +use zmq::{Context, DEALER, DONTWAIT, Socket}; -use reqwest::Client as rClient; +use base64::{Engine as _, engine::general_purpose}; use openssl::hash::MessageDigest; -use openssl::pkey::{PKey}; +use openssl::pkey::PKey; use openssl::sign::Signer; use openssl::sign::Verifier; -use base64::{engine::general_purpose, Engine as _}; +use reqwest::Client as rClient; use std::fs; use std::time::Instant; - - - - - - -const LOCKTIME_THRESHOLD:i64 = 5000000; -const VERSION:&str = "0.0.2"; -#[derive(Debug, Clone,Serialize, Deserialize)] +const LOCKTIME_THRESHOLD: i64 = 5000000; +const VERSION: &str = "0.0.2"; +#[derive(Debug, Clone, Serialize, Deserialize)] struct MyConfig { - zmq_listener: String, - requests_file: String, - db_file: String, - bitcoin_dir: String, - regtest: NetworkParams, - testnet: NetworkParams, - signet: NetworkParams, - mainnet: NetworkParams, - send_stats: bool, - url: String, - secret_code: String, - ssl_key_path: String + zmq_listener: String, + db_file: String, + bitcoin_dir: String, + regtest: NetworkParams, + testnet: NetworkParams, + signet: NetworkParams, + mainnet: NetworkParams, + send_stats: bool, + url: String, + ssl_key_path: String, } impl Default for MyConfig { fn default() -> Self { MyConfig { - zmq_listener: "tcp://127.0.0.1:28332".to_string(), - requests_file: "rawrequests.log".to_string(), - db_file: "../bal.db".to_string(), - bitcoin_dir: "".to_string(), - regtest: get_network_params_default(Network::Regtest), - testnet: get_network_params_default(Network::Testnet), - signet: get_network_params_default(Network::Signet), - mainnet: get_network_params_default(Network::Bitcoin), - send_stats: false, - url: "http://localhost/".to_string(), - secret_code: "xxx".to_string(), - ssl_key_path: "privkey.pem".to_string(), + zmq_listener: env::var("BAL_PUSHER_ZMQ_LISTENER").unwrap_or("tcp://127.0.0.1:28332".to_string()), + db_file: env::var("BAL_PUSHER_DB_FILE").unwrap_or("bal.db".to_string()), + bitcoin_dir: env::var("BAL_PUSHER_BITCOIN_DIR").unwrap_or("".to_string()), + regtest: get_network_params_default(Network::Regtest), + testnet: get_network_params_default(Network::Testnet), + signet: get_network_params_default(Network::Signet), + mainnet: get_network_params_default(Network::Bitcoin), + send_stats: env::var("BAL_PUSHER_SEND_STATS").unwrap_or("false".to_string()).parse::().unwrap(), + url: env::var("BAL_SERVER_URL").unwrap_or("http://localhost/".to_string()), + ssl_key_path: env::var("SSL_KEY_PATH").unwrap_or("privkey.pem".to_string()), } } } #[derive(Debug, Clone, Serialize, Deserialize)] struct NetworkParams { - host: String, - port: u16, - dir_path: String, - db_field: String, - cookie_file: String, - rpc_user: String, - rpc_pass: String, + host: String, + port: u16, + dir_path: String, + db_field: String, + cookie_file: String, + rpc_user: String, + rpc_pass: String, } -fn get_network_params(cfg: &MyConfig,network:Network)-> &NetworkParams{ - match network{ +fn get_network_params(cfg: &MyConfig, network: Network) -> &NetworkParams { + match network { Network::Testnet => &cfg.testnet, Network::Signet => &cfg.signet, Network::Regtest => &cfg.regtest, - _ => &cfg.mainnet + _ => &cfg.mainnet, } } -fn get_network_params_default(network:Network) -> NetworkParams{ +fn get_network_params_default(network: Network) -> NetworkParams { match network { - Network::Testnet => NetworkParams{ - host: "http://i27.0.0.1".to_string(), - port: 18332, - dir_path: "testnet3/".to_string(), - db_field: "testnet".to_string(), - cookie_file: "".to_string(), - rpc_user: "".to_string(), - rpc_pass: "".to_string(), + Network::Testnet => NetworkParams { + host: "http://i27.0.0.1".to_string(), + port: 18332, + dir_path: "testnet3/".to_string(), + db_field: "testnet".to_string(), + cookie_file: "".to_string(), + rpc_user: "".to_string(), + rpc_pass: "".to_string(), }, - Network::Signet => NetworkParams{ - host: "http://127.0.0.1".to_string(), - port: 18332, - dir_path: "signet/".to_string(), - db_field: "signet".to_string(), - cookie_file: "".to_string(), - rpc_user: "".to_string(), - rpc_pass: "".to_string(), + Network::Signet => NetworkParams { + host: "http://127.0.0.1".to_string(), + port: 18332, + dir_path: "signet/".to_string(), + db_field: "signet".to_string(), + cookie_file: "".to_string(), + rpc_user: "".to_string(), + rpc_pass: "".to_string(), }, - Network::Regtest => NetworkParams{ - host: "http://127.0.0.1".to_string(), - port: 18443, - dir_path: "regtest/".to_string(), - db_field: "regtest".to_string(), - cookie_file: "".to_string(), - rpc_user: "".to_string(), - rpc_pass: "".to_string(), + Network::Regtest => NetworkParams { + host: "http://127.0.0.1".to_string(), + port: 18443, + dir_path: "regtest/".to_string(), + db_field: "regtest".to_string(), + cookie_file: "".to_string(), + rpc_user: "".to_string(), + rpc_pass: "".to_string(), }, - _ => NetworkParams{ - host: "http://127.0.0.1".to_string(), - port: 8332, - dir_path: "".to_string(), - db_field: "bitcoin".to_string(), - cookie_file: "".to_string(), - rpc_user: "".to_string(), - rpc_pass: "".to_string(), + _ => NetworkParams { + host: "http://127.0.0.1".to_string(), + port: 8332, + dir_path: "".to_string(), + db_field: "bitcoin".to_string(), + cookie_file: "".to_string(), + rpc_user: "".to_string(), + rpc_pass: "".to_string(), }, } } -fn get_cookie_filename(network: &NetworkParams) ->Result>{ - if network.cookie_file !=""{ +fn get_cookie_filename(network: &NetworkParams) -> Result> { + if network.cookie_file != "" { Ok(network.cookie_file.clone()) - }else{ + } else { match env::var_os("HOME") { - Some(home) => { - match home.to_str(){ - Some(home_str) => { - let cookie_file_path = format!("{}/.bitcoin/{}.cookie",home_str, network.dir_path); - - Ok(cookie_file_path) - }, - None => Err("wrong HOME value".into()) + Some(home) => match home.to_str() { + Some(home_str) => { + let cookie_file_path = + format!("{}/.bitcoin/{}.cookie", home_str, network.dir_path); + + Ok(cookie_file_path) } + None => Err("wrong HOME value".into()), }, - None => Err("Please Set HOME environment variable".into()) + None => Err("Please Set HOME environment variable".into()), } } } -fn get_client_from_username(url: &String, network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box>{ +fn get_client_from_username( + url: &String, + network: &NetworkParams, +) -> Result<(Client, GetBlockchainInfoResult), Box> { if network.rpc_user != "" { - match Client::new(&url[..],Auth::UserPass(network.rpc_user.to_string(),network.rpc_pass.to_string())){ - Ok(client) => match client.get_blockchain_info(){ - Ok(bcinfo) => Ok((client,bcinfo)), - Err(err) => Err(err.into()) - } - Err(err)=>Err(err.into()) + match Client::new( + &url[..], + Auth::UserPass(network.rpc_user.to_string(), network.rpc_pass.to_string()), + ) { + Ok(client) => match client.get_blockchain_info() { + Ok(bcinfo) => Ok((client, bcinfo)), + Err(err) => Err(err.into()), + }, + Err(err) => Err(err.into()), } - }else{ + } else { Err("Failed".into()) } } -fn get_client_from_cookie(url: &String,network: &NetworkParams)->Result<(Client,GetBlockchainInfoResult),Box>{ - match get_cookie_filename(network){ - Ok(cookie) => { - match Client::new(&url[..], Auth::CookieFile(cookie.into())) { - Ok(client) => { - match client.get_blockchain_info(){ - Ok(bcinfo) => { - Ok((client,bcinfo)) - }, - Err(err) => { - Err(err.into()) - } - } - }, - Err(err)=>Err(err.into()) - - } +fn get_client_from_cookie( + url: &String, + network: &NetworkParams, +) -> Result<(Client, GetBlockchainInfoResult), Box> { + match get_cookie_filename(network) { + Ok(cookie) => match Client::new(&url[..], Auth::CookieFile(cookie.into())) { + Ok(client) => match client.get_blockchain_info() { + Ok(bcinfo) => Ok((client, bcinfo)), + Err(err) => Err(err.into()), + }, + Err(err) => Err(err.into()), }, - Err(err)=>Err(err.into()) + Err(err) => Err(err.into()), } } -fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box>{ - let url = format!("{}:{}/",network.host,&network.port); - match get_client_from_username(&url,network){ - Ok(client) =>{Ok(client)}, - Err(_) =>{ - match get_client_from_cookie(&url,&network){ - Ok(client)=>{ - Ok(client) - }, - Err(err)=> Err(err.into()) - } - } +fn get_client( + network: &NetworkParams, +) -> Result<(Client, GetBlockchainInfoResult), Box> { + let url = format!("{}:{}/", network.host, &network.port); + match get_client_from_username(&url, network) { + Ok(client) => Ok(client), + Err(_) => match get_client_from_cookie(&url, &network) { + Ok(client) => Ok(client), + Err(err) => Err(err.into()), + }, } } async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Error> { - - /*let url = args.next().expect("Usage: "); let user = args.next().expect("no user given"); let pass = args.next().expect("no pass given"); */ //let network = Network::Regtest - match get_client(network_params){ - Ok((rpc,bcinfo)) => { + match get_client(network_params) { + Ok((rpc, bcinfo)) => { info!("connected"); //let best_block_hash = rpc.get_best_block_hash()?; //info!("best block hash: {}", best_block_hash); @@ -224,41 +209,44 @@ async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<( // time_sum += >::into(block.header.time); //} //let average_time = time_sum/11; - info!("median time: {}",bcinfo.median_time); + info!("median time: {}", bcinfo.median_time); //info!("height time: {}",bcinfo.median_time); - info!("blocks: {}",bcinfo.blocks); - debug!("best block hash: {}",bcinfo.best_block_hash); + info!("blocks: {}", bcinfo.blocks); + debug!("best block hash: {}", bcinfo.best_block_hash); let average_time = bcinfo.median_time; let db = sqlite::open(&cfg.db_file).unwrap(); - + info!("db open {}",&cfg.db_file); let sqlquery = "SELECT * FROM tbl_tx WHERE network = :network AND status = :status AND ( locktime < :bestblock_height OR locktime > :locktime_threshold AND locktime < :bestblock_time);"; let query_tx = db.prepare(sqlquery).unwrap().into_iter(); - trace!("query_tx: {}",sqlquery); - trace!(":locktime_threshold: {}", LOCKTIME_THRESHOLD ); + trace!("query_tx: {}", sqlquery); + trace!(":locktime_threshold: {}", LOCKTIME_THRESHOLD); trace!(":bestblock_time: {}", average_time); trace!(":bestblock_height: {}", bcinfo.blocks); trace!(":network: {}", network_params.db_field.clone()); trace!(":status: {}", 0); //let query_tx = db.prepare("SELECT * FROM tbl_tx where status = :status").unwrap().into_iter(); - let mut pushed_txs:Vec = Vec::new(); + let mut pushed_txs: Vec = Vec::new(); let mut invalid_txs: std::collections::HashMap = HashMap::new(); - for row in query_tx.bind::<&[(_, Value)]>(&[ - (":locktime_threshold", (LOCKTIME_THRESHOLD as i64).into()), - (":bestblock_time", (average_time as i64).into()), - (":bestblock_height", (bcinfo.blocks as i64).into()), - (":network", network_params.db_field.clone().into()), - (":status", 0.into()), - ][..]) - .unwrap() - .map(|row| row.unwrap()) + for row in query_tx + .bind::<&[(_, Value)]>( + &[ + (":locktime_threshold", (LOCKTIME_THRESHOLD as i64).into()), + (":bestblock_time", (average_time as i64).into()), + (":bestblock_height", (bcinfo.blocks as i64).into()), + (":network", network_params.db_field.clone().into()), + (":status", 0.into()), + ][..], + ) + .unwrap() + .map(|row| row.unwrap()) { let tx = row.read::<&str, _>("tx"); let txid = row.read::<&str, _>("txid"); - let locktime = row.read::("locktime"); - info!("to be pushed: {}: {}",txid, locktime); - match rpc.send_raw_transaction(tx){ + let locktime = row.read::("locktime"); + info!("to be pushed: {}: {}", txid, locktime); + match rpc.send_raw_transaction(tx) { Ok(o) => { /*let mut file = OpenOptions::new() .append(true) // Set the append option @@ -268,9 +256,9 @@ async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<( file.write_all(data.as_bytes())?; drop(file); */ - info!("tx: {} pusshata PUSHED\n{}",txid,o); + info!("tx: {} pusshata PUSHED\n{}", txid, o); pushed_txs.push(txid.to_string()); - }, + } Err(err) => { /*let mut file = OpenOptions::new() .append(true) // Set the append option @@ -280,184 +268,233 @@ async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<( file.write_all(data.as_bytes())?; drop(file); */ - warn!("Error: {}\n{}",err,txid); + warn!("Error: {}\n{}", err, txid); //store err in invalid_txs invalid_txs.insert(txid.to_string(), err.to_string()); - - }, + } }; } - + if pushed_txs.len() > 0 { - let sql = format!("UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",pushed_txs.join("','")); - trace!("sqlok: {}",&sql); + let sql = format!( + "UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');", + pushed_txs.join("','") + ); + trace!("sqlok: {}", &sql); let _ = db.execute(&sql); } if invalid_txs.len() > 0 { - for (txid,txerr) in &invalid_txs{ + for (txid, txerr) in &invalid_txs { //let _ = db.execute(format!("UPDATE tbl_tx SET status = 2 WHERE txid in ('{}'Yp);",invalid_txs.join("','"))); - let sql = format!("UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'"); - trace!("sqlerror: {}",&sql); + let sql = format!( + "UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'" + ); + trace!("sqlerror: {}", &sql); let _ = db.execute(&sql); } } let _ = send_stats_report(cfg, bcinfo).await; + let _ = calculate_stats(&db,network_params.db_field.clone()).await; } - Err(erx)=>{ - panic!("impossible to get client {}",erx) + Err(erx) => { + panic!("impossible to get client {}", erx) } } Ok(()) } -async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> Result<(),reqwest::Error>{ +async fn calculate_stats(db: &Connection,chain: String) -> Result<(), reqwest::Error> { + //let sql = "drop table if exists tbl_stats;"; + let sql = "DELETE FROM tbl_stats WHERE chain = '{chain}';"; + if let Err(err) = db.execute(&sql){ + error!("error deleting from tbl_stats where chain:{chain} error: {err}"); + } + let sql = format!("INSERT INTO tbl_stats ( + report_date, chain, totals, waiting, sent, failed, + waiting_profit, sent_profit, missed_profit, unique_inputs +) +VALUES ( + CURRENT_TIMESTAMP, + '{chain}', + (SELECT COUNT(*) FROM tbl_tx WHERE network = '{chain}'), + (SELECT COUNT(*) FROM tbl_tx WHERE status = 0 AND network = '{chain}'), + (SELECT COUNT(*) FROM tbl_tx WHERE status = 1 AND network = '{chain}'), + (SELECT COUNT(*) FROM tbl_tx WHERE status = 2 AND network = '{chain}'), + (SELECT IFNULL(SUM(our_fees),0) FROM tbl_tx WHERE status = 0 AND network = '{chain}'), + (SELECT IFNULL(SUM(our_fees),0) FROM tbl_tx WHERE status = 1 AND network = '{chain}'), + (SELECT IFNULL(SUM(our_fees),0) FROM tbl_tx WHERE status = 2 AND network = '{chain}'), + (SELECT COUNT(DISTINCT tbl_inp.in_txid) -- or appropriate input identifier + FROM tbl_inp + JOIN tbl_tx ON tbl_inp.txid = tbl_tx.txid + WHERE tbl_tx.status = 0 AND tbl_tx.network = '{chain}') +) +ON CONFLICT(chain) DO UPDATE SET + report_date = excluded.report_date, + totals = excluded.totals, + waiting = excluded.waiting, + sent = excluded.sent, + failed = excluded.failed, + waiting_profit = excluded.waiting_profit, + sent_profit = excluded.sent_profit, + missed_profit = excluded.missed_profit, + unique_inputs = excluded.unique_inputs; + "); + + /* + let sql = format!("CREATE TABLE tbl_stats AS + SELECT + CURRENT_TIMESTAMP AS report_date, + '{chain}' as chain, + (SELECT COUNT(*) FROM tbl_tx WHERE network ='{chain}') AS totals, + (SELECT COUNT(*) FROM tbl_tx WHERE status = 0 AND network ='{chain}') AS waiting, + (SELECT COUNT(*) FROM tbl_tx WHERE status = 1 AND network ='{chain}') AS sent, + (SELECT COUNT(*) FROM tbl_tx WHERE status = 2 AND network ='{chain}') AS failed, + (SELECT SUM(our_fees) FROM tbl_tx WHERE status = 0 AND network ='{chain}') AS waiting_profit, + (SELECT SUM(our_fees) OR 0 FROM tbl_tx WHERE status = 1 AND network ='{chain}') AS sent_profit, + (SELECT SUM(our_fees) FROM tbl_tx WHERE status = 2 AND network ='{chain}') AS missed_profit, + (SELECT COUNT(*) FROM tbl_inp JOIN tbl_tx ON(tbl_inp.txid = tbl_tx.txid) WHERE tbl_tx.status=0 AND tbl_tx.network ='{chain}') AS unique_inputs; + "); + let sql = "UPDATE tbl_stats set + totals = (SELECT COUNT(*) FROM tbl_tx WHERE network ='{chain}'), + waiting = (SELECT COUNT(*) FROM tbl_tx WHERE status = 0 AND network ='{chain}'), + sent = (SELECT COUNT(*) FROM tbl_tx WHERE status = 1 AND network ='{chain}'), + failed = (SELECT COUNT(*) FROM tbl_tx WHERE status = 1 AND network ='{chain}'), + waiting_profit = (SELECT SUM(our_fees) FROM tbl_tx WHERE status = 0 AND network ='{chain}'), + sent_profit = (SELECT SUM(our_fees) FROM tbl_tx WHERE status = 0 AND network ='{chain}'), + missed_profit = (SELECT SUM(our_fees) FROM tbl_tx WHERE status = 0 AND network ='{chain}') + unique_inputs = (SELECT COUNT(*) FROM tbl_inp JOIN tbl_tx ON(tbl_inp.txid = tbl_tx.txid) WHERE tbl_tx.status=0 AND tbl_tx.network ='{chain}') + WHERE chain = '{chain}' + */ + if let Err(err) = db.execute(&sql){ + error!("error inserting creating stats table {err}"); + } + else{ + info!("tbl_stats creation success"); + } + Ok(()) +} +async fn send_stats_report( + cfg: &MyConfig, + bcinfo: GetBlockchainInfoResult, +) -> Result<(), reqwest::Error> { if cfg.send_stats { debug!("sending report to welist"); - let welist_url=env::var("WELIST_SERVER_URL").unwrap_or("https://welist.bitcoin-after.life".to_string()); - + let welist_url = env::var("WELIST_SERVER_URL") + .unwrap_or("https://welist.bitcoin-after.life".to_string()); + let client = rClient::new(); - let url = format!("{}/ping",welist_url); - debug!("welist url: {}",url); - let chain=bcinfo.chain.to_string().to_lowercase(); - let message = format!("{0}{1}{2}{3}{4}",cfg.url,chain,bcinfo.blocks,bcinfo.median_time,bcinfo.best_block_hash); - trace!("message to be sent: {}",message); - let sign = sign_message(cfg.ssl_key_path.as_str(),&message.as_str()); - let response = client.post(url) - .header("User-Agent", format!("bal-pusher/{}",VERSION)) + let url = format!("{}/ping", welist_url); + debug!("welist url: {}", url); + let chain = bcinfo.chain.to_string().to_lowercase(); + let message = format!( + "{0}{1}{2}{3}{4}", + cfg.url, chain, bcinfo.blocks, bcinfo.median_time, bcinfo.best_block_hash + ); + trace!("message to be sent: {}", message); + let sign = sign_message(cfg.ssl_key_path.as_str(), &message.as_str()); + let response = client + .post(url) + .header("User-Agent", format!("bal-pusher/{}", VERSION)) .json(&json!( - { - "url": cfg.url, - "chain": chain, - "height": bcinfo.blocks, - "median_time": bcinfo.median_time, - "last_block_hash": bcinfo.best_block_hash, - "signature": sign, - })) - .send().await?; + { + "url": cfg.url, + "chain": chain, + "height": bcinfo.blocks, + "median_time": bcinfo.median_time, + "last_block_hash": bcinfo.best_block_hash, + "signature": sign, + })) + .send() + .await?; if !response.status().is_success() { - warn!("Non-success response: {} {}", response.status(), response.status().canonical_reason().unwrap_or("")); + warn!( + "Non-success response: {} {}", + response.status(), + response.status().canonical_reason().unwrap_or("") + ); } let body = &(response.text().await?); - info!("Report to welist({})\tSent: {}", welist_url,body); - }else { + info!("Report to welist({})\tSent: {}", welist_url, body); + } else { debug!("Not sending stats"); } Ok(()) - - } fn sign_message(private_key_path: &str, message: &str) -> String { - let key_data = fs::read(private_key_path).unwrap(); + let key_data = fs::read(private_key_path).unwrap(); let private_key = PKey::private_key_from_pem(&key_data).unwrap(); - let mut signer = Signer::new_without_digest(&private_key).unwrap(); + let mut signer = Signer::new_without_digest(&private_key).unwrap(); let signature = signer.sign_oneshot_to_vec(message.as_bytes()).unwrap(); - let signature_b64 = general_purpose::STANDARD.encode(&signature); signature_b64 } -fn parse_env(cfg: &mut MyConfig){ - match env::var("BAL_PUSHER_ZMQ_LISTENER") { - Ok(value) => { - cfg.zmq_listener = value;}, - Err(_) => {}, - } - match env::var("BAL_PUSHER_REQUEST_FILE") { - Ok(value) => { - cfg.requests_file = value;}, - Err(_) => {}, - } - match env::var("BAL_PUSHER_DB_FILE") { - Ok(value) => { - cfg.db_file = value;}, - Err(_) => {}, - } - match env::var("BAL_PUSHER_BITCOIN_DIR") { - Ok(value) => { - cfg.bitcoin_dir = value;}, - Err(_) => {}, - } - match env::var("BAL_PUSHER_SEND_STATS") { - Ok(value) => { - cfg.send_stats = value.parse::().unwrap(); - }, - Err(_) => {}, - } - match env::var("BAL_SERVER_URL") { - Ok(value) => { - cfg.url= value;}, - Err(_) => {}, - } - match env::var("WELIST_SECRET_CODE") { - Ok(value) => { - cfg.secret_code = value;}, - Err(_) => {}, - } - match env::var("SSL_KEY_PATH") { - Ok(value) => { - cfg.ssl_key_path = value;}, - Err(_) => {}, - } - cfg.regtest = parse_env_netconfig(cfg,"regtest"); - cfg.signet = parse_env_netconfig(cfg,"signet"); - cfg.testnet = parse_env_netconfig(cfg,"testnet"); - drop(parse_env_netconfig(cfg,"bitcoin")); - +fn parse_env(cfg: &mut MyConfig) { + cfg.regtest = parse_env_netconfig(cfg, "regtest"); + cfg.signet = parse_env_netconfig(cfg, "signet"); + cfg.testnet = parse_env_netconfig(cfg, "testnet"); + drop(parse_env_netconfig(cfg, "bitcoin")); } -fn parse_env_netconfig(cfg_lock: &mut MyConfig, chain: &str) -> NetworkParams{ -//fn parse_env_netconfig(cfg_lock: &MutexGuard, chain: &str) -> &NetworkParams{ - let cfg = match chain{ +fn parse_env_netconfig(cfg_lock: &mut MyConfig, chain: &str) -> NetworkParams { + //fn parse_env_netconfig(cfg_lock: &MutexGuard, chain: &str) -> &NetworkParams{ + let cfg = match chain { "regtest" => &mut cfg_lock.regtest, "signet" => &mut cfg_lock.signet, "testnet" => &mut cfg_lock.testnet, &_ => &mut cfg_lock.mainnet, }; - match env::var(format!("BAL_PUSHER_{}_HOST",chain.to_uppercase())) { - Ok(value) => { cfg.host= value; }, - Err(_) => {}, - } - match env::var(format!("BAL_PUSHER_{}_PORT",chain.to_uppercase())) { + match env::var(format!("BAL_PUSHER_{}_HOST", chain.to_uppercase())) { Ok(value) => { - match value.parse::(){ - Ok(value) =>{ cfg.port = value.try_into().unwrap(); }, - Err(_) => {}, - } + cfg.host = value; } - Err(_) => {}, + Err(_) => {} } - match env::var(format!("BAL_PUSHER_{}_DIR_PATH",chain.to_uppercase())) { - Ok(value) => { cfg.dir_path = value; }, - Err(_) => {}, + match env::var(format!("BAL_PUSHER_{}_PORT", chain.to_uppercase())) { + Ok(value) => match value.parse::() { + Ok(value) => { + cfg.port = value.try_into().unwrap(); + } + Err(_) => {} + }, + Err(_) => {} } - match env::var(format!("BAL_PUSHER_{}_DB_FIELD",chain.to_uppercase())) { - Ok(value) => { cfg.db_field = value; }, - Err(_) => {}, + match env::var(format!("BAL_PUSHER_{}_DIR_PATH", chain.to_uppercase())) { + Ok(value) => { + cfg.dir_path = value; + } + Err(_) => {} } - match env::var(format!("BAL_PUSHER_{}_COOKIE_FILE",chain.to_uppercase())) { - Ok(value) => { - cfg.cookie_file = value; }, - Err(_) => {}, + match env::var(format!("BAL_PUSHER_{}_DB_FIELD", chain.to_uppercase())) { + Ok(value) => { + cfg.db_field = value; + } + Err(_) => {} } - match env::var(format!("BAL_PUSHER_{}_RPC_USER",chain.to_uppercase())) { - Ok(value) => { cfg.rpc_user = value; }, - Err(_) => {}, + match env::var(format!("BAL_PUSHER_{}_COOKIE_FILE", chain.to_uppercase())) { + Ok(value) => { + cfg.cookie_file = value; + } + Err(_) => {} } - match env::var(format!("BAL_PUSHER_{}_RPC_PASSWORD",chain.to_uppercase())) { - Ok(value) => { cfg.rpc_pass = value; }, - Err(_) => {}, + match env::var(format!("BAL_PUSHER_{}_RPC_USER", chain.to_uppercase())) { + Ok(value) => { + cfg.rpc_user = value; + } + Err(_) => {} + } + match env::var(format!("BAL_PUSHER_{}_RPC_PASSWORD", chain.to_uppercase())) { + Ok(value) => { + cfg.rpc_pass = value; + } + Err(_) => {} } cfg.clone() } -fn get_default_config()-> MyConfig { - let file = confy::get_configuration_file_path("bal-pusher",None).expect("Error while getting path"); - info!("Default configuration file path is: {:#?}", file); - confy::load("bal-pusher",None).expect("cant_load") -} fn check_zmq_connection(endpoint: &str) -> bool { trace!("check zmq connection"); @@ -466,11 +503,11 @@ fn check_zmq_connection(endpoint: &str) -> bool { Ok(sock) => sock, Err(_) => return false, }; - + if socket.connect(endpoint).is_err() { return false; } - + // Try to send an empty message non-blocking socket.send("", DONTWAIT).is_ok() } @@ -492,18 +529,18 @@ impl ConnectionMonitor { max_consecutive_timeouts: max_timeouts, } } - + fn update(&mut self) { self.last_message_time = Instant::now(); self.consecutive_timeouts = 0; } - + fn check_connection(&mut self) -> ConnectionStatus { let elapsed = self.last_message_time.elapsed(); - + if elapsed > self.timeout { self.consecutive_timeouts += 1; - + if self.consecutive_timeouts >= self.max_consecutive_timeouts { ConnectionStatus::Lost(elapsed) } else { @@ -513,7 +550,7 @@ impl ConnectionMonitor { ConnectionStatus::Healthy } } - + fn reset(&mut self) { self.consecutive_timeouts = 0; self.last_message_time = Instant::now(); @@ -527,78 +564,66 @@ enum ConnectionStatus { } #[tokio::main] -async fn main()-> std::io::Result<()>{ +async fn main() -> std::io::Result<()> { env_logger::init(); - let mut cfg: MyConfig = match env::var("BAL_PUSHER_CONFIG_FILE") { - Ok(value) => { - match confy::load_path(&value){ - Ok(val) => { - info!("The configuration file path is: {:#?}", value); - val - }, - Err(err) => { - error!("{}",err); - get_default_config() - } - } - }, - Err(_) => { - get_default_config() - }, - }; + let mut cfg = MyConfig::default(); + let dbfile = env::var("BAL_PUSHER_DB_FILE").unwrap(); parse_env(&mut cfg); let mut args = std::env::args(); let _exe_name = args.next().unwrap(); - let arg_network = match args.next(){ + let arg_network = match args.next() { Some(nargs) => nargs, - None => "bitcoin".to_string() + None => "bitcoin".to_string(), }; - let network = match arg_network.as_str(){ - + let network = match arg_network.as_str() { "testnet" => Network::Testnet, "signet" => Network::Signet, "regtest" => Network::Regtest, _ => Network::Bitcoin, }; - - info!("Network: {}",arg_network); - let network_params = get_network_params(&cfg,network); + info!("Network: {}", arg_network); + let network_params = get_network_params(&cfg, network); let context = Context::new(); let socket: Socket = context.socket(zmq::SUB).unwrap(); let zmq_address = cfg.zmq_listener.clone(); - info!("zmq listening on: {}",zmq_address); + info!("zmq listening on: {}", zmq_address); socket.connect(&zmq_address).unwrap(); - socket.set_subscribe(b"").unwrap(); + socket.set_subscribe(b"").unwrap(); - let _ = main_result(&cfg,network_params).await; + let _ = main_result(&cfg, network_params).await; info!("waiting new blocks.."); - let mut last_seq:Vec=[0;4].to_vec(); - let mut counter=0; - let max=100; + let mut last_seq: Vec = [0; 4].to_vec(); + let mut counter = 0; + let max = 100; loop { let message = socket.recv_multipart(0).unwrap(); let topic = message[0].clone(); let body = message[1].clone(); let seq = message[2].clone(); last_seq = seq; - debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic")); + debug!( + "ZMQ:GET TOPIC: {}", + String::from_utf8(topic.clone()).expect("invalid topic") + ); trace!("ZMQ:GET BODY: {}", hex::encode(&body)); if topic == b"hashblock" { - info!("NEW BLOCK: {}", hex::encode(&body)); - let _ = main_result(&cfg,network_params).await; + info!("NEW BLOCK: {}", hex::encode(&body)); + let _ = main_result(&cfg, network_params).await; } thread::sleep(Duration::from_millis(100)); // Sleep for 100ms } } -fn seq_to_str(seq:&Vec) -> String{ - if seq.len()==4{ +fn seq_to_str(seq: &Vec) -> String { + if seq.len() == 4 { let mut rdr = Cursor::new(seq); - let sequence = rdr.read_u32::().expect("Failed to read integer"); + let sequence = rdr + .read_u32::() + .expect("Failed to read integer"); return sequence.to_string(); } "Unknown".to_string() diff --git a/src/bin/bal-server.rs b/src/bin/bal-server.rs index bce152f..fe6c053 100644 --- a/src/bin/bal-server.rs +++ b/src/bin/bal-server.rs @@ -1,37 +1,39 @@ use bytes::Bytes; -use http_body_util::{ combinators::BoxBody, BodyExt, Empty, Full }; +use http_body_util::{BodyExt, Empty, Full, combinators::BoxBody}; use hyper::server::conn::http1; use hyper::service::service_fn; -use hyper::{ Method, Request, Response, StatusCode }; -use tokio::net::TcpListener; +use hyper::{Method, Request, Response, StatusCode}; use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; - -use std::net::IpAddr; use std::env; +use std::net::IpAddr; //use std::time::{SystemTime,UNIX_EPOCH}; use std::fs; -use std::sync::{ Arc, Mutex, MutexGuard }; +use std::sync::{Arc, Mutex, MutexGuard}; //use std::net::SocketAddr; +use sqlite::{Connection, State, Value}; use std::collections::HashMap; -use sqlite::{ State, Value, Connection }; -use bitcoin::{ consensus, Transaction, Network }; +use bitcoin::{Network, Transaction, consensus}; -use hex_conservative::FromHex; -use regex::Regex; -use serde::{ Serialize, Deserialize}; -use log::{ info, error, trace, debug}; -use serde_json; use chrono::Utc; +use hex_conservative::FromHex; +use log::{debug, error, info, trace}; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use serde_json; -use bal_server::db::{ create_database, get_next_address_index, insert_xpub, save_new_address, get_last_used_address_by_ip, execute_insert }; +use bal_server::db::{ + create_database, execute_insert, get_last_used_address_by_ip, get_next_address_index, + insert_xpub, save_new_address, +}; use bal_server::xpub::new_address_from_xpub; -const VERSION:&str=env!("CARGO_PKG_VERSION"); -const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"]; -#[derive(Debug, Clone,Serialize, Deserialize)] +const VERSION: &str = env!("CARGO_PKG_VERSION"); +const NETWORKS: [&str; 4] = ["bitcoin", "testnet", "signet", "regtest"]; +#[derive(Debug, Clone, Serialize, Deserialize)] struct NetConfig { address: String, fixed_fee: u64, @@ -42,146 +44,220 @@ struct NetConfig { } impl NetConfig { - fn default_network(name:String, network: Network) -> Self { + fn default_network(name: String, network: Network) -> Self { NetConfig { - address: "".to_string(), - fixed_fee: 50000, - xpub: false, + address: "".to_string(), + fixed_fee: 50000, + xpub: false, name, network, - enabled: false, + enabled: false, } } } -#[derive(Debug, Serialize, Deserialize,Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] struct MyConfig { - regtest: NetConfig, - signet: NetConfig, - testnet: NetConfig, - mainnet: NetConfig, - info: String, - bind_address: String, - bind_port: u16, // Changed to u16 for port numbers - db_file: String, - pub_key_path: String, + regtest: NetConfig, + signet: NetConfig, + testnet: NetConfig, + mainnet: NetConfig, + info: String, + bind_address: String, + bind_port: u16, // Changed to u16 for port numbers + db_file: String, + pub_key_path: String, + expose_stats: bool, } -#[derive(Debug,Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct Info { - pub address: String, - pub base_fee: u64, - pub chain: String, - pub info: String, - pub version: String + pub address: String, + pub base_fee: u64, + pub chain: String, + pub info: String, + pub version: String, +} +#[derive(Debug, Serialize, Deserialize)] +pub struct Stats { + pub report_date: String, + pub chain: String, + pub totals:i64, + pub waiting:i64, + pub sent:i64, + pub failed:i64, + pub waiting_profit:i64, + pub sent_profit:i64, + pub missed_profit:i64, + pub unique_inputs:i64, } - impl Default for MyConfig { fn default() -> Self { MyConfig { - regtest: NetConfig::default_network("regtest".to_string(), Network::Regtest), - signet: NetConfig::default_network("signet".to_string(), Network::Signet), - testnet: NetConfig::default_network("testnet".to_string(), Network::Testnet), - mainnet: NetConfig::default_network("bitcoin".to_string(), Network::Bitcoin), - bind_address: "127.0.0.1".to_string(), - bind_port: 9137, - db_file: "bal.db".to_string(), - info: "Will Executor Server".to_string(), - pub_key_path: "public_key.pem".to_string(), + regtest: NetConfig::default_network("regtest".to_string(), Network::Regtest), + signet: NetConfig::default_network("signet".to_string(), Network::Signet), + testnet: NetConfig::default_network("testnet".to_string(), Network::Testnet), + mainnet: NetConfig::default_network("bitcoin".to_string(), Network::Bitcoin), + bind_address: "127.0.0.1".to_string(), + bind_port: 9137, + db_file: "bal.db".to_string(), + info: "Will Executor Server".to_string(), + pub_key_path: "public_key.pem".to_string(), + expose_stats:env::var("BAL_SERVER_EXPOSE_STATS").unwrap_or("false".to_string()).parse::().unwrap(), } } } impl MyConfig { - fn get_net_config(&self, param: &str) -> &NetConfig{ + fn get_net_config(&self, param: &str) -> &NetConfig { match param { "regtest" => &self.regtest, "testnet" => &self.testnet, "signet" => &self.signet, - _ => &self.mainnet, + _ => &self.mainnet, } } } -async fn echo_version( -) -> Result>, hyper::Error> { +async fn echo_version() -> Result>, hyper::Error> { Ok(Response::new(full(VERSION))) } -async fn echo_home(cfg: &MyConfig -) -> Result>, hyper::Error> { - debug!("echo_home: {}", cfg.info ); +async fn echo_home(cfg: &MyConfig) -> Result>, hyper::Error> { + debug!("echo_home: {}", cfg.info); Ok(Response::new(full(cfg.info.clone()))) } async fn echo_pub_key( cfg: &MyConfig, ) -> Result>, hyper::Error> { let pub_key = fs::read_to_string(&cfg.pub_key_path) - .expect(format!("Failed to read public key file {}",cfg.pub_key_path).as_str()); + .expect(format!("Failed to read public key file {}", cfg.pub_key_path).as_str()); Ok(Response::new(full(pub_key))) } -async fn echo_info( - param: &str, - cfg: &MyConfig, - remote_addr: String, +async fn echo_stats( + param: &str, + cfg: &MyConfig, + remote_addr: &String, ) -> Result>, hyper::Error> { - info!("echo info!!!{}",param); - let netconfig=MyConfig::get_net_config(cfg,param); - if !netconfig.enabled { - debug!("network disabled {}",param); - return Ok(Response::new(full("network disabled"))); + info!("echo stats!!! {} - {}", param,cfg.expose_stats); + let netconfig = MyConfig::get_net_config(cfg, param); + if !netconfig.enabled { + debug!("network disabled {}", param); + return Ok(Response::new(full("network disabled"))); + } + let sql = format!("SELECT + report_date, + chain, + totals, + waiting, + sent, + failed, + waiting_profit, + sent_profit, + missed_profit, + unique_inputs FROM tbl_stats where chain = '{}' + ", netconfig.name); + let mut stats:Vec=vec![]; + let db = sqlite::open(&cfg.db_file).unwrap(); + db.iterate(&sql,|pairs|{ + let row: HashMap<_, _> = pairs.into_iter().map(|(k,v)| (k.to_string(), v.map(|s| s))).collect(); + //let row:HashMap<_,_>= pairs.into_iter().collect(); + println!("row report date {}",row["report_date"].clone().unwrap()); + + dbg!(&row); + stats.push(Stats{ + report_date: row["report_date"].clone().unwrap().to_string(), + chain: row["chain"].clone().unwrap().to_string(), + totals: row["totals"].clone().unwrap().parse::().unwrap(), + waiting: row["waiting"].clone().unwrap().parse::().unwrap(), + sent: row["sent"].clone().unwrap().parse::().unwrap(), + failed: row["failed"].clone().unwrap().parse::().unwrap(), + waiting_profit: row["waiting_profit"].clone().unwrap().parse::().unwrap(), + sent_profit: row["sent_profit"].clone().unwrap().parse::().unwrap(), + missed_profit:row["missed_profit"].clone().unwrap().parse::().unwrap(), + unique_inputs: row["unique_inputs"].clone().unwrap().parse::().unwrap(), + }); + true + }); + match serde_json::to_string(&stats) { + Ok(json_data) => { + debug!("echo info reply: {}", json_data); + return Ok(Response::new(full(json_data))); } - let address = match netconfig.xpub{ - false => { - let address = netconfig.address.to_string(); - trace!("is address: {}",&address); - address - }, - true => { - let db = sqlite::open(&cfg.db_file).unwrap(); - match get_last_used_address_by_ip(&db,&netconfig.name,&netconfig.address,&remote_addr){ - Some(address)=>address, - None => { - let next = get_next_address_index(&db,&netconfig.name,&netconfig.address); - let address = new_address_from_xpub(&netconfig.address,next.1,netconfig.network).unwrap(); - save_new_address(&db,next.0,&address.0,&address.1,&remote_addr); - debug!("save new address {} {}",address.0,address.1); - trace!("next {} {}",next.0,next.1); - address.0 - } + Err(err) => Ok(Response::new(full(format!("error:{}", err)))), + } +} + + +async fn echo_info( + param: &str, + cfg: &MyConfig, + remote_addr: &String, +) -> Result>, hyper::Error> { + info!("echo info!!!{}", param); + let netconfig = MyConfig::get_net_config(cfg, param); + if !netconfig.enabled { + debug!("network disabled {}", param); + return Ok(Response::new(full("network disabled"))); + } + let address = match netconfig.xpub { + false => { + let address = netconfig.address.to_string(); + trace!("is address: {}", &address); + address + } + true => { + let db = sqlite::open(&cfg.db_file).unwrap(); + match get_last_used_address_by_ip( + &db, + &netconfig.name, + &netconfig.address, + &remote_addr, + ) { + Some(address) => address, + None => { + let next = get_next_address_index(&db, &netconfig.name, &netconfig.address); + let address = + new_address_from_xpub(&netconfig.address, next.1, netconfig.network) + .unwrap(); + save_new_address(&db, next.0, &address.0, &address.1, &remote_addr); + debug!("save new address {} {}", address.0, address.1); + trace!("next {} {}", next.0, next.1); + address.0 } } - }; - let info = Info{ - address, - base_fee: netconfig.fixed_fee, - chain: netconfig.network.to_string(), - info: cfg.info.to_string(), - version: VERSION.to_string() - - }; - trace!("address: {:#?}",info); - match serde_json::to_string(&info){ - Ok(json_data) => { - debug!("echo info reply: {}", json_data); - return Ok(Response::new(full(json_data))); - }, - Err(err) => Ok(Response::new(full(format!("error:{}",err)))) } - - + }; + let info = Info { + address, + base_fee: netconfig.fixed_fee, + chain: netconfig.network.to_string(), + info: cfg.info.to_string(), + version: VERSION.to_string(), + }; + trace!("address: {:#?}", info); + match serde_json::to_string(&info) { + Ok(json_data) => { + debug!("echo info reply: {}", json_data); + return Ok(Response::new(full(json_data))); + } + Err(err) => Ok(Response::new(full(format!("error:{}", err)))), + } } -async fn echo_search(whole_body: &Bytes, - cfg: &MyConfig, +async fn echo_search( + whole_body: &Bytes, + cfg: &MyConfig, ) -> Result>, hyper::Error> { info!("echo search!!!"); let strbody = std::str::from_utf8(whole_body).unwrap(); - info!("{}",strbody); + info!("{}", strbody); let mut response = Response::new(full("Bad data received".to_owned())); - *response.status_mut() = StatusCode::BAD_REQUEST; - if !strbody.is_empty() && strbody.len()<=70 { + *response.status_mut() = StatusCode::BAD_REQUEST; + if !strbody.is_empty() && strbody.len() <= 70 { let db = sqlite::open(&cfg.db_file).unwrap(); - let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ? LIMIT 1").unwrap(); + let mut statement = db + .prepare("SELECT * FROM tbl_tx WHERE txid = ? LIMIT 1") + .unwrap(); statement.bind((1, strbody)).unwrap(); if let Ok(State::Row) = statement.next() { @@ -232,36 +308,35 @@ async fn echo_search(whole_body: &Bytes, None } }; - response = match serde_json::to_string(&response_data){ + response = match serde_json::to_string(&response_data) { Ok(json_data) => Response::new(full(json_data)), - Err(_) => { response } + Err(_) => response, }; return Ok(response); } } Ok(response) - - } -async fn echo_push(whole_body: &Bytes, - cfg: &MyConfig, - param: &str, +async fn echo_push( + whole_body: &Bytes, + cfg: &MyConfig, + param: &str, ) -> Result>, hyper::Error> { //let whole_body = req.collect().await?.to_bytes(); let strbody = std::str::from_utf8(whole_body).unwrap(); let mut response = Response::new(full("Bad data received".to_owned())); let mut response_not_enable = Response::new(full("Network not enabled".to_owned())); - *response.status_mut() = StatusCode::BAD_REQUEST; - *response_not_enable.status_mut()=StatusCode::BAD_REQUEST; - let netconfig = MyConfig::get_net_config(cfg,param); - if !netconfig.enabled{ - return Ok(response_not_enable); + *response.status_mut() = StatusCode::BAD_REQUEST; + *response_not_enable.status_mut() = StatusCode::BAD_REQUEST; + let netconfig = MyConfig::get_net_config(cfg, param); + if !netconfig.enabled { + return Ok(response_not_enable); } let req_time = Utc::now().timestamp_nanos_opt().unwrap(); // Returns i64 - + let db = sqlite::open(&cfg.db_file).unwrap(); - + let lines = strbody.split("\n"); let sqltxshead = "INSERT INTO tbl_tx (txid, wtxid, ntxid, tx, locktime, reqid, network, our_address, our_fees)".to_string(); let mut sqltxs = "".to_string(); @@ -273,75 +348,86 @@ async fn echo_push(whole_body: &Bytes, let mut union_inps = true; let mut union_outs = true; let mut already_present = false; - let mut ptx:Vec<(usize, Value)> = vec![]; - let mut pinps:Vec<(usize, Value)> = vec![]; - let mut pouts:Vec<(usize, Value)> = vec![]; + let mut ptx: Vec<(usize, Value)> = vec![]; + let mut pinps: Vec<(usize, Value)> = vec![]; + let mut pouts: Vec<(usize, Value)> = vec![]; let mut linenum = 1; let mut lineinp = 1; let mut lineout = 1; for line in lines { - if line.is_empty(){ - trace!("line len is: {}",line.len()); - continue + if line.is_empty() { + trace!("line len is: {}", line.len()); + continue; } let linea = format!("{req_time}:{line}"); info!("New Tx: {}", linea); - let raw_tx = match Vec::::from_hex(line) { + let raw_tx = match Vec::::from_hex(line) { Ok(raw_tx) => raw_tx, Err(err) => { - error!("rawtx error: {}",err); - continue + error!("rawtx error: {}", err); + continue; } }; - if !raw_tx.is_empty(){ - trace!("len: {}",raw_tx.len()); - let tx: Transaction = match consensus::deserialize(&raw_tx){ + if !raw_tx.is_empty() { + trace!("len: {}", raw_tx.len()); + let tx: Transaction = match consensus::deserialize(&raw_tx) { Ok(tx) => tx, - Err(err) => {error!("error: unable to parse tx: {}\n{}",line,err);continue} + Err(err) => { + error!("error: unable to parse tx: {}\n{}", line, err); + continue; + } }; let txid = tx.compute_txid().to_string(); - trace!("txid: {}",txid); + trace!("txid: {}", txid); let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ?").unwrap(); - statement.bind((1,&txid[..])).unwrap(); + statement.bind((1, &txid[..])).unwrap(); if let Ok(State::Row) = statement.next() { trace!("already present"); - already_present=true; + already_present = true; continue; } let ntxid = tx.compute_ntxid(); let wtxid = tx.compute_wtxid(); let mut found = false; let locktime = tx.lock_time; - let mut our_address:String = "".to_string(); - let mut our_fees:u64 = 0; - for input in tx.input{ + let mut our_address: String = "".to_string(); + let mut our_fees: u64 = 0; + for input in tx.input { if !union_inps { sqlinps = format!("{sqlinps} UNION ALL"); - }else{ + } else { union_inps = false; } sqlinps = format!("{sqlinps} SELECT ?, ?, ?"); - pinps.push((lineinp,Value::String(txid.to_string()))); - pinps.push((lineinp+1,Value::String(input.previous_output.txid.to_string()))); - pinps.push((lineinp+2,Value::String(input.previous_output.vout.to_string()))); + pinps.push((lineinp, Value::String(txid.to_string()))); + pinps.push(( + lineinp + 1, + Value::String(input.previous_output.txid.to_string()), + )); + pinps.push(( + lineinp + 2, + Value::String(input.previous_output.vout.to_string()), + )); lineinp += 3; - } - if netconfig.fixed_fee ==0 { + if netconfig.fixed_fee == 0 { found = true; } - for (idx,output) in tx.output.into_iter().enumerate(){ + for (idx, output) in tx.output.into_iter().enumerate() { let script_pubkey = output.script_pubkey; - let address = match bitcoin::Address::from_script(script_pubkey.as_script(), netconfig.network){ + let address = match bitcoin::Address::from_script( + script_pubkey.as_script(), + netconfig.network, + ) { Ok(address) => address.to_string(), Err(_) => String::new(), }; let amount = output.value; - our_fees = netconfig.fixed_fee;//search wllexecutor output - if netconfig.xpub{ - let sql="select * from tbl_address where address=?"; + our_fees = netconfig.fixed_fee; //search wllexecutor output + if netconfig.xpub { + let sql = "select * from tbl_address where address=?"; let mut stmt = db.prepare(sql).expect("failed to fetch addresses"); - stmt.bind((1,Value::String(address.to_string()))).unwrap(); + stmt.bind((1, Value::String(address.to_string()))).unwrap(); if let Ok(State::Row) = stmt.next() { our_address = address.to_string(); } @@ -352,58 +438,61 @@ async fn echo_push(whole_body: &Bytes, our_fees = amount.to_sat(); our_address = netconfig.address.to_string(); found = true; - trace!("address and fees are correct {}: {}",our_address,our_fees); + trace!("address and fees are correct {}: {}", our_address, our_fees); } if !union_outs { sqlouts = format!("{sqlouts} UNION ALL"); - }else{ + } else { union_outs = false; } sqlouts = format!("{sqlouts} SELECT ?, ?, ?, ?"); - pouts.push((lineout,Value::String(txid.to_string()))); - pouts.push((lineout+1,Value::Integer(idx.try_into().unwrap()))); - pouts.push((lineout+2,Value::String(script_pubkey.to_string()))); - pouts.push((lineout+3,Value::Integer(amount.to_sat().try_into().unwrap()))); + pouts.push((lineout, Value::String(txid.to_string()))); + pouts.push((lineout + 1, Value::Integer(idx.try_into().unwrap()))); + pouts.push((lineout + 2, Value::String(script_pubkey.to_string()))); + pouts.push(( + lineout + 3, + Value::Integer(amount.to_sat().try_into().unwrap()), + )); lineout += 4; } if !found { error!("willexecutor output not found "); - return Ok(response) + return Ok(response); } else { if !union_tx { sqltxs = format!("{sqltxs} UNION ALL"); - }else{ + } else { union_tx = false; } sqltxs = format!("{sqltxs} SELECT ?, ?, ?, ?, ?, ?, ?, ?, ?"); - ptx.push((linenum,Value::String(txid))); - ptx.push((linenum+1,Value::String(wtxid.to_string()))); - ptx.push((linenum+2,Value::String(ntxid.to_string()))); - ptx.push((linenum+3,Value::String(line.to_string()))); - ptx.push((linenum+4,Value::String(locktime.to_string()))); - ptx.push((linenum+5,Value::String(req_time.to_string()))); - ptx.push((linenum+6,Value::String(netconfig.name.to_string()))); - ptx.push((linenum+7,Value::String(our_address.to_string()))); - ptx.push((linenum+8,Value::String(our_fees.to_string()))); + ptx.push((linenum, Value::String(txid))); + ptx.push((linenum + 1, Value::String(wtxid.to_string()))); + ptx.push((linenum + 2, Value::String(ntxid.to_string()))); + ptx.push((linenum + 3, Value::String(line.to_string()))); + ptx.push((linenum + 4, Value::String(locktime.to_string()))); + ptx.push((linenum + 5, Value::String(req_time.to_string()))); + ptx.push((linenum + 6, Value::String(netconfig.name.to_string()))); + ptx.push((linenum + 7, Value::String(our_address.to_string()))); + ptx.push((linenum + 8, Value::String(our_fees.to_string()))); linenum += 9; } - }else{ - trace!("rawTx len is: {}",raw_tx.len()); - debug!("{}",&sqltxs); + } else { + trace!("rawTx len is: {}", raw_tx.len()); + debug!("{}", &sqltxs); } - } + } if sqltxs.is_empty() && already_present { - return Ok(Response::new(full("already present"))) + return Ok(Response::new(full("already present"))); } let sqltxs = format!("{}{};", sqltxshead, sqltxs); let sqlinps = format!("{}{};", sqlinpshead, sqlinps); let sqlouts = format!("{}{};", sqloutshead, sqlouts); - if let Err(err) = execute_insert(&db, sqltxs, ptx, sqlinps, pinps, sqlouts, pouts){ - debug!("{}",err); + if let Err(err) = execute_insert(&db, sqltxs, ptx, sqlinps, pinps, sqlouts, pouts) { + debug!("{}", err); return Ok(response); - } + } Ok(Response::new(full("thx"))) -} +} fn match_uri<'a>(path: &str, uri: &'a str) -> Option<&'a str> { let re = Regex::new(path).unwrap(); @@ -415,53 +504,60 @@ fn match_uri<'a>(path: &str, uri: &'a str) -> Option<&'a str> { None } - async fn echo( req: Request, cfg: &MyConfig, - ip: &String + ip: &String, ) -> Result>, hyper::Error> { - let mut not_found = Response::new(empty()); *not_found.status_mut() = StatusCode::NOT_FOUND; let mut ret: Result>, hyper::Error> = Ok(not_found); let uri = req.uri().path().to_string(); - let remote_addr = req.headers().get("X-Real-IP").and_then(|value| value.to_str().ok()).and_then(|xff| xff.split(',').next()).map(|ip| ip.trim().to_string()).unwrap_or_else(|| ip.to_string()); - trace!("{}: {}",remote_addr,uri); + let remote_addr = req + .headers() + .get("X-Real-IP") + .and_then(|value| value.to_str().ok()) + .and_then(|xff| xff.split(',').next()) + .map(|ip| ip.trim().to_string()) + .unwrap_or_else(|| ip.to_string()); + trace!("{}: {}", remote_addr, uri); match *req.method() { // Serve some instructions at / Method::POST => { let whole_body = req.collect().await?.to_bytes(); - if let Some(param) = match_uri(r"^?/?(?P[^/]?+)?/pushtxs$",uri.as_str()) { + if let Some(param) = match_uri(r"^?/?(?P[^/]?+)?/pushtxs$", uri.as_str()) { //let whole_body = collect_body(req,512_000).await?; - ret = echo_push(&whole_body,cfg,param).await; + ret = echo_push(&whole_body, cfg, param).await; } - if uri=="/searchtx"{ + if uri == "/searchtx" { //let whole_body = collect_body(req,64).await?; - ret = echo_search(&whole_body,cfg).await; + ret = echo_search(&whole_body, cfg).await; } ret } Method::GET => { - if let Some(param) = match_uri(r"^?/?(?P[^/]?+)?/info$",uri.as_str()) { - ret = echo_info(param,cfg,remote_addr).await; + if let Some(param) = match_uri(r"^?/?(?P[^/]?+)?/stats$", uri.as_str()) { + ret = echo_stats(param, cfg, &remote_addr).await; } - if uri=="/version"{ - ret= echo_version().await; + if let Some(param) = match_uri(r"^?/?(?P[^/]?+)?/info$", uri.as_str()) { + ret = echo_info(param, cfg, &remote_addr).await; } - if uri=="/.pub_key.pem" { + if uri == "/version" { + ret = echo_version().await; + } + if uri == "/.pub_key.pem" { ret = echo_pub_key(cfg).await; } - if uri=="/"{ + if uri == "/" { ret = echo_home(cfg).await; } ret } // Return the 404 Not Found for other routes. - _ => ret + _ => ret, } } @@ -476,113 +572,120 @@ fn full>(chunk: T) -> BoxBody { .map_err(|never| match never {}) .boxed() } -fn parse_env(cfg: &Arc>){ +fn parse_env(cfg: &Arc>) { for (key, value) in std::env::vars() { debug!("ENVIRONMENT {key}: {value}"); } let mut cfg_lock = cfg.lock().unwrap(); if let Ok(value) = env::var("BAL_SERVER_DB_FILE") { - debug!("BAL_SERVER_DB_FILE: {}",value); - cfg_lock.db_file = value; + debug!("BAL_SERVER_DB_FILE: {}", value); + cfg_lock.db_file = value; } if let Ok(value) = env::var("BAL_SERVER_BIND_ADDRESS") { - debug!("BAL_SERVER_BIND_ADDRESS: {}",value); - cfg_lock.bind_address= value; + debug!("BAL_SERVER_BIND_ADDRESS: {}", value); + cfg_lock.bind_address = value; } if let Ok(value) = env::var("BAL_SERVER_BIND_PORT") { - debug!("BAL_SERVER_BIND_PORT: {}",value); - if let Ok(v) = value.parse::(){ + debug!("BAL_SERVER_BIND_PORT: {}", value); + if let Ok(v) = value.parse::() { cfg_lock.bind_port = v; } } - + if let Ok(value) = env::var("BAL_SERVER_PUB_KEY_PATH") { - debug!("BAL_SERVER_PUB_KEY_PATH: {}",value); + debug!("BAL_SERVER_PUB_KEY_PATH: {}", value); cfg_lock.pub_key_path = value; } - - if let Ok(value) = env::var("BAL_SERVER_INFO"){ - debug!("BAL_SERVER_INFO: {}",value); - cfg_lock.info = value; + if let Ok(value) = env::var("BAL_SERVER_INFO") { + debug!("BAL_SERVER_INFO: {}", value); + cfg_lock.info = value; } - cfg_lock = parse_env_netconfig(cfg_lock,"regtest"); - cfg_lock = parse_env_netconfig(cfg_lock,"signet"); - cfg_lock = parse_env_netconfig(cfg_lock,"testnet"); - drop(parse_env_netconfig(cfg_lock,"bitcoin")); - + cfg_lock = parse_env_netconfig(cfg_lock, "regtest"); + cfg_lock = parse_env_netconfig(cfg_lock, "signet"); + cfg_lock = parse_env_netconfig(cfg_lock, "testnet"); + drop(parse_env_netconfig(cfg_lock, "bitcoin")); } -fn parse_env_netconfig<'a>(mut cfg_lock: MutexGuard<'a, MyConfig>, chain: &'a str) -> MutexGuard<'a, MyConfig>{ - let cfg = match chain{ +fn parse_env_netconfig<'a>( + mut cfg_lock: MutexGuard<'a, MyConfig>, + chain: &'a str, +) -> MutexGuard<'a, MyConfig> { + let cfg = match chain { "regtest" => &mut cfg_lock.regtest, "signet" => &mut cfg_lock.signet, "testnet" => &mut cfg_lock.testnet, &_ => &mut cfg_lock.mainnet, }; - if let Ok(value) = env::var(format!("BAL_SERVER_{}_ADDRESS",chain.to_uppercase())) { - debug!("BAL_SERVER_{}_ADDRESS: {}",chain.to_uppercase(),value); - cfg.address = value; + if let Ok(value) = env::var(format!("BAL_SERVER_{}_ADDRESS", chain.to_uppercase())) { + debug!("BAL_SERVER_{}_ADDRESS: {}", chain.to_uppercase(), value); + cfg.address = value; if cfg.address.len() > 5 { if cfg.address[1..4] == *"pub" { - cfg.xpub=true; + cfg.xpub = true; trace!("is_xpub"); } - cfg.enabled=true; + cfg.enabled = true; } } - if let Ok(value) = env::var(format!("BAL_SERVER_{}_FIXED_FEE",chain.to_uppercase())) { - debug!("BAL_SERVER_{}_FIXED_FEE: {}",chain.to_uppercase(),value); - if let Ok(v) = value.parse::(){ + if let Ok(value) = env::var(format!("BAL_SERVER_{}_FIXED_FEE", chain.to_uppercase())) { + debug!("BAL_SERVER_{}_FIXED_FEE: {}", chain.to_uppercase(), value); + if let Ok(v) = value.parse::() { cfg.fixed_fee = v; } } cfg_lock } -fn init_network(db: &Connection, cfg: &MyConfig){ - for network in NETWORKS{ - let netconfig = MyConfig::get_net_config(cfg,network); - insert_xpub(db,&netconfig.name,&netconfig.address); +fn init_network(db: &Connection, cfg: &MyConfig) { + for network in NETWORKS { + let netconfig = MyConfig::get_net_config(cfg, network); + insert_xpub(db, &netconfig.name, &netconfig.address); } } #[tokio::main] async fn main() -> Result<(), Box> { env_logger::init(); - let cfg: Arc> =Arc::>::default(); + let cfg: Arc> = Arc::>::default(); parse_env(&cfg); - let cfg_lock = cfg.lock().unwrap(); let db = sqlite::open(&cfg_lock.db_file).unwrap(); create_database(&db); - init_network(&db,&cfg_lock); + init_network(&db, &cfg_lock); let addr = cfg_lock.bind_address.to_string(); let addr: IpAddr = addr.parse()?; - let listener = TcpListener::bind((addr,cfg_lock.bind_port)).await?; - info!("Listening on http://{}:{}", addr,cfg_lock.bind_port); - + let listener = TcpListener::bind((addr, cfg_lock.bind_port)).await?; + info!("Listening on http://{}:{}", addr, cfg_lock.bind_port); loop { let (stream, _) = listener.accept().await?; - let ip = stream.peer_addr()?.to_string().split(":").next().unwrap().to_string(); + let ip = stream + .peer_addr()? + .to_string() + .split(":") + .next() + .unwrap() + .to_string(); let io = TokioIo::new(stream); - + tokio::task::spawn({ let cfg = cfg_lock.clone(); async move { if let Err(err) = http1::Builder::new() - .serve_connection(io, service_fn(|req: Request| async { - echo(req,&cfg,&ip).await - })) + .serve_connection( + io, + service_fn(|req: Request| async { + echo(req, &cfg, &ip).await + }), + ) .await { error!("Error serving connection: {:?}", err); } - } }); } diff --git a/src/db.rs b/src/db.rs index a18828e..ab6ce1a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,7 +1,7 @@ -use sqlite::{ Connection, Value, State, Error }; -use log::{info, trace, error}; +use log::{error, info, trace}; +use sqlite::{Connection, Error, State, Value}; -pub fn create_database(db: &Connection){ +pub fn create_database(db: &Connection) { info!("database sanity check"); let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_tx (txid PRIMARY KEY, date_creation TIMESTAMP DEFAULT CURRENT_TIMESTAMP, date_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP, wtxid, ntxid, tx, locktime integer, network, network_fees, reqid, our_fees, our_address, status integer DEFAULT 0);"); let _ = db.execute("ALTER TABLE tbl_tx ADD COLUMN push_err TEXT"); @@ -9,17 +9,15 @@ pub fn create_database(db: &Connection){ let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_inp(id, txid, in_txid, in_vout);"); let _ = db.execute("CREATE UNIQUE INDEX ON tbl_inp(txid,in_txid,in_vout);"); - let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_out(id, txid, script_pubkey, amount, vout);"); + let _ = + db.execute("CREATE TABLE IF NOT EXISTS tbl_out(id, txid, script_pubkey, amount, vout);"); let _ = db.execute("CREATE UNIQUE INDEX ON tbl_out(txid, script_pubkey, amount, vout);"); - let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_xpub (id INTEGER PRIMARY KEY , network TEXT, xpub TEXT, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP,path_idx INTEGER DEFAULT -1);"); let _ = db.execute("CREATE UNIQUE INDEX idx_xpub ON tbl_xpub (network, xpub)"); - let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);"); - - let _ = db.execute("UPDATE tbl_tx set network='bitcoin' where network='mainnet');"); - + let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);"); + let _ = db.execute("UPDATE tbl_tx set network='bitcoin' where network='mainnet');"); } /* pub fn get_xpub_id(db: &Connection, network: &String, xpub: &String) -> Option{ @@ -33,76 +31,98 @@ pub fn create_database(db: &Connection){ } } */ -pub fn insert_xpub(db: &Connection, network: &String, xpub: &String){ +pub fn insert_xpub(db: &Connection, network: &String, xpub: &String) { if xpub != "" { trace!("going to insert: {} xpub:{}", network, xpub); - let mut stmt = db.prepare ("INSERT INTO tbl_xpub(network,xpub) VALUES(?, ?);").unwrap(); - let _ = stmt.bind((1,Value::String(network.to_string()))).unwrap(); - let _ = stmt.bind((2,Value::String(xpub.to_string()))).unwrap(); - let _ = stmt.next(); + let mut stmt = db + .prepare("INSERT INTO tbl_xpub(network,xpub) VALUES(?, ?);") + .unwrap(); + let _ = stmt.bind((1, Value::String(network.to_string()))).unwrap(); + let _ = stmt.bind((2, Value::String(xpub.to_string()))).unwrap(); + let _ = stmt.next(); } } -pub fn get_last_used_address_by_ip(db: &Connection, network: &String, xpub: &String, address: &String) -> Option{ +pub fn get_last_used_address_by_ip( + db: &Connection, + network: &String, + xpub: &String, + address: &String, +) -> Option { let mut stmt = db.prepare("SELECT tbl_address.address FROM tbl_xpub join tbl_address on(tbl_xpub.id = tbl_address.xpub) where tbl_xpub.network = ? and tbl_address.remote_address = ? and tbl_xpub.xpub = ? ORDER BY tbl_address.date_create DESC LIMIT 1;").unwrap(); - let _ = stmt.bind((1,Value::String(network.to_string()))); - let _ = stmt.bind((2,Value::String(address.to_string()))); - let _ = stmt.bind((3,Value::String(xpub.to_string()))); - if let Ok(State::Row) = stmt.next(){ - let address = stmt.read::("address").unwrap(); + let _ = stmt.bind((1, Value::String(network.to_string()))); + let _ = stmt.bind((2, Value::String(address.to_string()))); + let _ = stmt.bind((3, Value::String(xpub.to_string()))); + if let Ok(State::Row) = stmt.next() { + let address = stmt.read::("address").unwrap(); return Some(address); - }else{ + } else { return None; } - } -pub fn get_next_address_index(db: &Connection, network: &String, xpub: &String) -> (i64,i64){ +pub fn get_next_address_index(db: &Connection, network: &String, xpub: &String) -> (i64, i64) { let mut stmt = db.prepare("UPDATE tbl_xpub SET path_idx = path_idx + 1 WHERE network = ? and xpub= ? RETURNING path_idx,id;").unwrap(); - stmt.bind((1,Value::String(network.to_string()))).unwrap(); - stmt.bind((2,Value::String(xpub.to_string()))).unwrap(); - match stmt.next(){ - Ok(State::Row) =>{ - let next = stmt.read::("path_idx").unwrap(); - let id = stmt.read::("id").unwrap(); - return (id,next); - },Err(_)=> { - return (0,0); - },Ok(State::Done) =>{ - return (0,0); + stmt.bind((1, Value::String(network.to_string()))).unwrap(); + stmt.bind((2, Value::String(xpub.to_string()))).unwrap(); + match stmt.next() { + Ok(State::Row) => { + let next = stmt.read::("path_idx").unwrap(); + let id = stmt.read::("id").unwrap(); + return (id, next); + } + Err(_) => { + return (0, 0); + } + Ok(State::Done) => { + return (0, 0); } }; } -pub fn save_new_address(db: &Connection,xpub: i64,address: &String, path: &String,remote_addr: &String){ - let mut stmt = db.prepare("INSERT INTO tbl_address(address,path,xpub,remote_address) VALUES(?,?,?,?);").unwrap(); +pub fn save_new_address( + db: &Connection, + xpub: i64, + address: &String, + path: &String, + remote_addr: &String, +) { + let mut stmt = db + .prepare("INSERT INTO tbl_address(address,path,xpub,remote_address) VALUES(?,?,?,?);") + .unwrap(); - stmt.bind((1,Value::String(address.to_string()))).unwrap(); - stmt.bind((2,Value::String(path.to_string()))).unwrap(); - stmt.bind((3,Value::Integer(xpub))).unwrap(); - stmt.bind((4,Value::String(remote_addr.to_string()))).unwrap(); + stmt.bind((1, Value::String(address.to_string()))).unwrap(); + stmt.bind((2, Value::String(path.to_string()))).unwrap(); + stmt.bind((3, Value::Integer(xpub))).unwrap(); + stmt.bind((4, Value::String(remote_addr.to_string()))) + .unwrap(); let _ = stmt.next(); } -pub fn execute_insert(db: &Connection, - sqltxs: String, - ptx: Vec<(usize, Value)>, - sqlinp: String, - pinp: Vec<(usize, Value)>, - sqlout: String, - pout: Vec<(usize, Value)>) -> Result<(),Error>{ +pub fn execute_insert( + db: &Connection, + sqltxs: String, + ptx: Vec<(usize, Value)>, + sqlinp: String, + pinp: Vec<(usize, Value)>, + sqlout: String, + pout: Vec<(usize, Value)>, +) -> Result<(), Error> { let _ = db.execute("BEGIN TRANSACTION"); - let mut stmt = db.prepare(sqltxs.as_str()).expect("failed to prepare sqltxs"); - if let Err(err) = stmt.bind::<&[(_,Value)]>(&ptx[..]) { + let mut stmt = db + .prepare(sqltxs.as_str()) + .expect("failed to prepare sqltxs"); + if let Err(err) = stmt.bind::<&[(_, Value)]>(&ptx[..]) { error!("error binding transaction parameters: {}", err); let _ = db.execute("ROLLBACK"); return Err(err); - } if let Err(err) = stmt.next() { - error!("error inserting transactions {}",err); - let _ = db.execute("ROLLBACK"); - }else{ - let mut stmt = db.prepare(sqlinp.as_str()).expect("failed to prepare sqlinp"); - if let Err(err) = stmt.bind::<&[(_,Value)]>(&pinp[..]) { + error!("error inserting transactions {}", err); + let _ = db.execute("ROLLBACK"); + } else { + let mut stmt = db + .prepare(sqlinp.as_str()) + .expect("failed to prepare sqlinp"); + if let Err(err) = stmt.bind::<&[(_, Value)]>(&pinp[..]) { error!("error binding inputs parameters {}", err); let _ = db.execute("ROLLBACK"); return Err(err); @@ -111,10 +131,11 @@ pub fn execute_insert(db: &Connection, error!("error inserting inputs {}", err); let _ = db.execute("ROLLBACK"); return Err(err); - - }else{ - let mut stmt = db.prepare(sqlout.as_str()).expect("failed to prepare sqlout"); - if let Err(err) = stmt.bind::<&[(_,Value)]>(&pout[..]) { + } else { + let mut stmt = db + .prepare(sqlout.as_str()) + .expect("failed to prepare sqlout"); + if let Err(err) = stmt.bind::<&[(_, Value)]>(&pout[..]) { error!("error binding outs parameters {}", err); let _ = db.execute("ROLLBACK"); return Err(err); @@ -123,25 +144,20 @@ pub fn execute_insert(db: &Connection, error!("error inserting outs {}", err); let _ = db.execute("ROLLBACK"); return Err(err); - } } } let _ = db.execute("COMMIT"); Ok(()) - } -pub fn get_total_transaction_number(db: Connection, network: &String) -> Result { - let mut stmt = db.prepare("SELECT COUNT(*) as total_number FROM tbl_tx where network = ?;").unwrap(); - stmt.bind((1,Value::String(network.to_string()))).unwrap(); - match stmt.next(){ - Ok(State::Row)=>{ - Ok(stmt.read::("total_number").unwrap()) - }, +pub fn get_total_transaction_number(db: Connection, network: &String) -> Result { + let mut stmt = db + .prepare("SELECT COUNT(*) as total_number FROM tbl_tx where network = ?;") + .unwrap(); + stmt.bind((1, Value::String(network.to_string()))).unwrap(); + match stmt.next() { + Ok(State::Row) => Ok(stmt.read::("total_number").unwrap()), Ok(sqlite::State::Done) => todo!(), - Err(err)=>Err(err) - - + Err(err) => Err(err), } - } diff --git a/src/xpub.rs b/src/xpub.rs index b95a992..afa4591 100644 --- a/src/xpub.rs +++ b/src/xpub.rs @@ -1,17 +1,16 @@ -use sha2::{Digest, Sha256}; -use bitcoin::bip32::Xpub; -use std::str::FromStr; -use bitcoin::bip32::DerivationPath; -use bitcoin::key::Secp256k1; use bitcoin::Address; +use bitcoin::Network; use bitcoin::ScriptBuf; use bitcoin::WPubkeyHash; -use bitcoin::Network; +use bitcoin::bip32::DerivationPath; +use bitcoin::bip32::Xpub; use bitcoin::hashes::Hash; - +use bitcoin::key::Secp256k1; +use sha2::{Digest, Sha256}; +use std::str::FromStr; // Mainnet (BIP44/BIP49/BIP84) -enum BS58Prefix{ +enum BS58Prefix { Xpub, //Ypub, //Zpub, @@ -19,15 +18,13 @@ enum BS58Prefix{ //Vpub, //Upub } -const XPUB_PREFIX:[u8; 4] = [0x04, 0x88, 0xB2, 0x1E]; // xpub (Legacy P2PKH) +const XPUB_PREFIX: [u8; 4] = [0x04, 0x88, 0xB2, 0x1E]; // xpub (Legacy P2PKH) //const YPUB_PREFIX:[u8; 4] = [0x04, 0x9D, 0x7C, 0xB2]; // ypub (Nested SegWit P2SH-P2WPKH) //const ZPUB_PREFIX:[u8; 4] = [0x04, 0xB2, 0x47, 0x46]; // zpub (Native SegWit P2WPKH) //const TPUB_PREFIX:[u8; 4] = [0x04, 0x35, 0x87, 0xCF]; // tpub (Testnet Legacy P2PKH) //const VPUB_PREFIX:[u8; 4] = [0x04, 0x5F, 0x1C, 0xF6]; // vpub (Testnet Nested SegWit) //const UPUB_PREFIX:[u8; 4] = [0x04, 0x4A, 0x52, 0x62]; // upub (RegTest Nested SegWit) - - fn base58check_decode(s: &str) -> Result, String> { let data = bs58::decode(s).into_vec().map_err(|e| e.to_string())?; if data.len() < 4 { @@ -47,27 +44,33 @@ fn base58check_encode(data: &[u8]) -> String { bs58::encode(full).into_string() } -fn convert_to(zpub: &str,prefix: BS58Prefix) -> Result { - +fn convert_to(zpub: &str, prefix: BS58Prefix) -> Result { let mut data = base58check_decode(zpub)?; if data.len() < 4 { return Err("Non รจ una zpub valida.".to_string()); } - data.splice(0..4, match prefix { - BS58Prefix::Xpub => XPUB_PREFIX, - //BS58Prefix::Ypub => YPUB_PREFIX, - //BS58Prefix::Zpub => ZPUB_PREFIX, - //BS58Prefix::Vpub => VPUB_PREFIX, - //BS58Prefix::Tpub => TPUB_PREFIX, - //BS58Prefix::Upub => UPUB_PREFIX, - }); + data.splice( + 0..4, + match prefix { + BS58Prefix::Xpub => XPUB_PREFIX, + //BS58Prefix::Ypub => YPUB_PREFIX, + //BS58Prefix::Zpub => ZPUB_PREFIX, + //BS58Prefix::Vpub => VPUB_PREFIX, + //BS58Prefix::Tpub => TPUB_PREFIX, + //BS58Prefix::Upub => UPUB_PREFIX, + }, + ); Ok(base58check_encode(&data)) } -pub fn new_address_from_xpub(zpub: &str, index: i64,network: Network)-> Result<(String,String), Box>{ - let xpub = Xpub::from_str(&convert_to(zpub,BS58Prefix::Xpub)?)?; - let path = format!("m/0/{}",index); +pub fn new_address_from_xpub( + zpub: &str, + index: i64, + network: Network, +) -> Result<(String, String), Box> { + let xpub = Xpub::from_str(&convert_to(zpub, BS58Prefix::Xpub)?)?; + let path = format!("m/0/{}", index); let derivation_path = DerivationPath::from_str(path.as_str())?; let secp = Secp256k1::new(); let derived_xpub = xpub.derive_pub(&secp, &derivation_path)?; @@ -78,8 +81,7 @@ pub fn new_address_from_xpub(zpub: &str, index: i64,network: Network)-> Result<( //let script_pubkey = ScriptBuf::new_p2sh(&redeem_script.script_hash()); let address = Address::from_script(&redeem_script, network)?; //let address = Address::from_script(&script_pubkey, network)?; - Ok((address.to_string(),path.to_string())) - + Ok((address.to_string(), path.to_string())) } /* fn main() -> Result<(), Box>{ @@ -101,7 +103,7 @@ fn main() -> Result<(), Box>{ let witness_program = WPubkeyHash::hash(&pubkey_bytes); let redeem_script = ScriptBuf::new_p2wpkh(&witness_program); let script_pubkey = ScriptBuf::new_p2sh(&redeem_script.script_hash()); - + // Generate the Bitcoin SegWit (BIP49) address let network = Network::Bitcoin; let address = Address::from_script(&redeem_script, network)?;