expose stats

This commit is contained in:
2026-01-21 00:08:47 -04:00
parent fa98283df6
commit dcaab2ed6d
6 changed files with 814 additions and 667 deletions

View File

@@ -1,12 +1,12 @@
RUST_LOG=trace
BAL_PUSHER_DB_FILE="$(pwd)/bal.db"
export BAL_PUSHER_DB_FILE="$(pwd)/bal.db"
#export BAL_PUSHER_BITCOIN_COOKIE_FILE=/~/.bitcoin/.cookie
#export BAL_PUSHER_REGTEST_COOKIE_FILE=/~/.bitcoin/regtest/.cookie
#export BAL_PUSHER_TESTNET_COOKIE_FILE=/~/.bitcoin/testnet3/.cookie
#export BAL_PUSHER_SIGNET_COOKIE_FILE=/~/.bitcoin/signet/.cookie
BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332
export BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332
export BAL_PUSHER_SEND_STATS=true
export WELIST_SERVER_URL=http://localhost:8085
export BAL_SERVER_URL="http://127.0.0.1:9133"

View File

@@ -11,6 +11,7 @@ export BAL_SERVER_INFO="BAL devel willexecutor server"
export BAL_SERVER_BIND_ADDRESS="127.0.0.1"
export BAL_SERVER_BIND_PORT=9133
export BAL_SERVER_PUB_KEY_PATH="$WORKING_DIR/public_key.pem"
export BAL_SERVER_EXPOSE_STATS=true;
#export BAL_SERVER_BITCOIN_ADDRESS="your bitcoin address or xpub to recive payments here"
#export BAL_SERVER_BITCOIN_FIXED_FEE=50000

View File

@@ -2,45 +2,38 @@ extern crate bitcoincore_rpc;
extern crate zmq;
use bitcoin::Network;
use bitcoincore_rpc::{bitcoin, Auth, Client, Error, RpcApi};
use bitcoincore_rpc::{Auth, Client, Error, RpcApi, bitcoin};
use bitcoincore_rpc_json::GetBlockchainInfoResult;
use sqlite::{Value};
use serde::Serialize;
use byteorder::{LittleEndian, ReadBytesExt};
use hex;
use log::{debug, error, info, trace, warn};
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use sqlite::{Value, Connection};
use std::collections::HashMap;
use std::env;
use log::{info,warn,error,trace,debug};
use zmq::{Context, Socket, DEALER, DONTWAIT};
use std::error::Error as StdError;
use std::io::Cursor;
use std::str;
use std::{thread, time::Duration};
use std::collections::HashMap;
use byteorder::{LittleEndian, ReadBytesExt};
use std::io::Cursor;
use hex;
use std::error::Error as StdError;
use zmq::{Context, DEALER, DONTWAIT, Socket};
use reqwest::Client as rClient;
use base64::{Engine as _, engine::general_purpose};
use openssl::hash::MessageDigest;
use openssl::pkey::{PKey};
use openssl::pkey::PKey;
use openssl::sign::Signer;
use openssl::sign::Verifier;
use base64::{engine::general_purpose, Engine as _};
use reqwest::Client as rClient;
use std::fs;
use std::time::Instant;
const LOCKTIME_THRESHOLD: i64 = 5000000;
const VERSION: &str = "0.0.2";
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MyConfig {
zmq_listener: String,
requests_file: String,
db_file: String,
bitcoin_dir: String,
regtest: NetworkParams,
@@ -49,25 +42,22 @@ struct MyConfig {
mainnet: NetworkParams,
send_stats: bool,
url: String,
secret_code: String,
ssl_key_path: String
ssl_key_path: String,
}
impl Default for MyConfig {
fn default() -> Self {
MyConfig {
zmq_listener: "tcp://127.0.0.1:28332".to_string(),
requests_file: "rawrequests.log".to_string(),
db_file: "../bal.db".to_string(),
bitcoin_dir: "".to_string(),
zmq_listener: env::var("BAL_PUSHER_ZMQ_LISTENER").unwrap_or("tcp://127.0.0.1:28332".to_string()),
db_file: env::var("BAL_PUSHER_DB_FILE").unwrap_or("bal.db".to_string()),
bitcoin_dir: env::var("BAL_PUSHER_BITCOIN_DIR").unwrap_or("".to_string()),
regtest: get_network_params_default(Network::Regtest),
testnet: get_network_params_default(Network::Testnet),
signet: get_network_params_default(Network::Signet),
mainnet: get_network_params_default(Network::Bitcoin),
send_stats: false,
url: "http://localhost/".to_string(),
secret_code: "xxx".to_string(),
ssl_key_path: "privkey.pem".to_string(),
send_stats: env::var("BAL_PUSHER_SEND_STATS").unwrap_or("false".to_string()).parse::<bool>().unwrap(),
url: env::var("BAL_SERVER_URL").unwrap_or("http://localhost/".to_string()),
ssl_key_path: env::var("SSL_KEY_PATH").unwrap_or("privkey.pem".to_string()),
}
}
}
@@ -87,7 +77,7 @@ fn get_network_params(cfg: &MyConfig,network:Network)-> &NetworkParams{
Network::Testnet => &cfg.testnet,
Network::Signet => &cfg.signet,
Network::Regtest => &cfg.regtest,
_ => &cfg.mainnet
_ => &cfg.mainnet,
}
}
fn get_network_params_default(network: Network) -> NetworkParams {
@@ -136,71 +126,66 @@ fn get_cookie_filename(network: &NetworkParams) ->Result<String,Box<dyn StdError
Ok(network.cookie_file.clone())
} else {
match env::var_os("HOME") {
Some(home) => {
match home.to_str(){
Some(home) => match home.to_str() {
Some(home_str) => {
let cookie_file_path = format!("{}/.bitcoin/{}.cookie",home_str, network.dir_path);
let cookie_file_path =
format!("{}/.bitcoin/{}.cookie", home_str, network.dir_path);
Ok(cookie_file_path)
}
None => Err("wrong HOME value".into()),
},
None => Err("wrong HOME value".into())
}
},
None => Err("Please Set HOME environment variable".into())
None => Err("Please Set HOME environment variable".into()),
}
}
}
fn get_client_from_username(url: &String, network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{
fn get_client_from_username(
url: &String,
network: &NetworkParams,
) -> Result<(Client, GetBlockchainInfoResult), Box<dyn StdError>> {
if network.rpc_user != "" {
match Client::new(&url[..],Auth::UserPass(network.rpc_user.to_string(),network.rpc_pass.to_string())){
match Client::new(
&url[..],
Auth::UserPass(network.rpc_user.to_string(), network.rpc_pass.to_string()),
) {
Ok(client) => match client.get_blockchain_info() {
Ok(bcinfo) => Ok((client, bcinfo)),
Err(err) => Err(err.into())
}
Err(err)=>Err(err.into())
Err(err) => Err(err.into()),
},
Err(err) => Err(err.into()),
}
} else {
Err("Failed".into())
}
}
fn get_client_from_cookie(url: &String,network: &NetworkParams)->Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{
fn get_client_from_cookie(
url: &String,
network: &NetworkParams,
) -> Result<(Client, GetBlockchainInfoResult), Box<dyn StdError>> {
match get_cookie_filename(network) {
Ok(cookie) => {
match Client::new(&url[..], Auth::CookieFile(cookie.into())) {
Ok(client) => {
match client.get_blockchain_info(){
Ok(bcinfo) => {
Ok((client,bcinfo))
Ok(cookie) => match Client::new(&url[..], Auth::CookieFile(cookie.into())) {
Ok(client) => match client.get_blockchain_info() {
Ok(bcinfo) => Ok((client, bcinfo)),
Err(err) => Err(err.into()),
},
Err(err) => {
Err(err.into())
}
}
Err(err) => Err(err.into()),
},
Err(err)=>Err(err.into())
}
},
Err(err)=>Err(err.into())
Err(err) => Err(err.into()),
}
}
fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{
fn get_client(
network: &NetworkParams,
) -> Result<(Client, GetBlockchainInfoResult), Box<dyn StdError>> {
let url = format!("{}:{}/", network.host, &network.port);
match get_client_from_username(&url, network) {
Ok(client) =>{Ok(client)},
Err(_) =>{
match get_client_from_cookie(&url,&network){
Ok(client)=>{
Ok(client)
Ok(client) => Ok(client),
Err(_) => match get_client_from_cookie(&url, &network) {
Ok(client) => Ok(client),
Err(err) => Err(err.into()),
},
Err(err)=> Err(err.into())
}
}
}
}
async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Error> {
/*let url = args.next().expect("Usage: <rpc_url> <username> <password>");
let user = args.next().expect("no user given");
let pass = args.next().expect("no pass given");
@@ -231,7 +216,7 @@ async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(
let average_time = bcinfo.median_time;
let db = sqlite::open(&cfg.db_file).unwrap();
info!("db open {}",&cfg.db_file);
let sqlquery = "SELECT * FROM tbl_tx WHERE network = :network AND status = :status AND ( locktime < :bestblock_height OR locktime > :locktime_threshold AND locktime < :bestblock_time);";
let query_tx = db.prepare(sqlquery).unwrap().into_iter();
@@ -244,13 +229,16 @@ async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(
//let query_tx = db.prepare("SELECT * FROM tbl_tx where status = :status").unwrap().into_iter();
let mut pushed_txs: Vec<String> = Vec::new();
let mut invalid_txs: std::collections::HashMap<String, String> = HashMap::new();
for row in query_tx.bind::<&[(_, Value)]>(&[
for row in query_tx
.bind::<&[(_, Value)]>(
&[
(":locktime_threshold", (LOCKTIME_THRESHOLD as i64).into()),
(":bestblock_time", (average_time as i64).into()),
(":bestblock_height", (bcinfo.blocks as i64).into()),
(":network", network_params.db_field.clone().into()),
(":status", 0.into()),
][..])
][..],
)
.unwrap()
.map(|row| row.unwrap())
{
@@ -270,7 +258,7 @@ async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(
*/
info!("tx: {} pusshata PUSHED\n{}", txid, o);
pushed_txs.push(txid.to_string());
},
}
Err(err) => {
/*let mut file = OpenOptions::new()
.append(true) // Set the append option
@@ -283,25 +271,30 @@ async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(
warn!("Error: {}\n{}", err, txid);
//store err in invalid_txs
invalid_txs.insert(txid.to_string(), err.to_string());
},
}
};
}
if pushed_txs.len() > 0 {
let sql = format!("UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",pushed_txs.join("','"));
let sql = format!(
"UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",
pushed_txs.join("','")
);
trace!("sqlok: {}", &sql);
let _ = db.execute(&sql);
}
if invalid_txs.len() > 0 {
for (txid, txerr) in &invalid_txs {
//let _ = db.execute(format!("UPDATE tbl_tx SET status = 2 WHERE txid in ('{}'Yp);",invalid_txs.join("','")));
let sql = format!("UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'");
let sql = format!(
"UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'"
);
trace!("sqlerror: {}", &sql);
let _ = db.execute(&sql);
}
}
let _ = send_stats_report(cfg, bcinfo).await;
let _ = calculate_stats(&db,network_params.db_field.clone()).await;
}
Err(erx) => {
panic!("impossible to get client {}", erx)
@@ -309,19 +302,97 @@ async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(
}
Ok(())
}
async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> Result<(),reqwest::Error>{
async fn calculate_stats(db: &Connection,chain: String) -> Result<(), reqwest::Error> {
//let sql = "drop table if exists tbl_stats;";
let sql = "DELETE FROM tbl_stats WHERE chain = '{chain}';";
if let Err(err) = db.execute(&sql){
error!("error deleting from tbl_stats where chain:{chain} error: {err}");
}
let sql = format!("INSERT INTO tbl_stats (
report_date, chain, totals, waiting, sent, failed,
waiting_profit, sent_profit, missed_profit, unique_inputs
)
VALUES (
CURRENT_TIMESTAMP,
'{chain}',
(SELECT COUNT(*) FROM tbl_tx WHERE network = '{chain}'),
(SELECT COUNT(*) FROM tbl_tx WHERE status = 0 AND network = '{chain}'),
(SELECT COUNT(*) FROM tbl_tx WHERE status = 1 AND network = '{chain}'),
(SELECT COUNT(*) FROM tbl_tx WHERE status = 2 AND network = '{chain}'),
(SELECT IFNULL(SUM(our_fees),0) FROM tbl_tx WHERE status = 0 AND network = '{chain}'),
(SELECT IFNULL(SUM(our_fees),0) FROM tbl_tx WHERE status = 1 AND network = '{chain}'),
(SELECT IFNULL(SUM(our_fees),0) FROM tbl_tx WHERE status = 2 AND network = '{chain}'),
(SELECT COUNT(DISTINCT tbl_inp.in_txid) -- or appropriate input identifier
FROM tbl_inp
JOIN tbl_tx ON tbl_inp.txid = tbl_tx.txid
WHERE tbl_tx.status = 0 AND tbl_tx.network = '{chain}')
)
ON CONFLICT(chain) DO UPDATE SET
report_date = excluded.report_date,
totals = excluded.totals,
waiting = excluded.waiting,
sent = excluded.sent,
failed = excluded.failed,
waiting_profit = excluded.waiting_profit,
sent_profit = excluded.sent_profit,
missed_profit = excluded.missed_profit,
unique_inputs = excluded.unique_inputs;
");
/*
let sql = format!("CREATE TABLE tbl_stats AS
SELECT
CURRENT_TIMESTAMP AS report_date,
'{chain}' as chain,
(SELECT COUNT(*) FROM tbl_tx WHERE network ='{chain}') AS totals,
(SELECT COUNT(*) FROM tbl_tx WHERE status = 0 AND network ='{chain}') AS waiting,
(SELECT COUNT(*) FROM tbl_tx WHERE status = 1 AND network ='{chain}') AS sent,
(SELECT COUNT(*) FROM tbl_tx WHERE status = 2 AND network ='{chain}') AS failed,
(SELECT SUM(our_fees) FROM tbl_tx WHERE status = 0 AND network ='{chain}') AS waiting_profit,
(SELECT SUM(our_fees) OR 0 FROM tbl_tx WHERE status = 1 AND network ='{chain}') AS sent_profit,
(SELECT SUM(our_fees) FROM tbl_tx WHERE status = 2 AND network ='{chain}') AS missed_profit,
(SELECT COUNT(*) FROM tbl_inp JOIN tbl_tx ON(tbl_inp.txid = tbl_tx.txid) WHERE tbl_tx.status=0 AND tbl_tx.network ='{chain}') AS unique_inputs;
");
let sql = "UPDATE tbl_stats set
totals = (SELECT COUNT(*) FROM tbl_tx WHERE network ='{chain}'),
waiting = (SELECT COUNT(*) FROM tbl_tx WHERE status = 0 AND network ='{chain}'),
sent = (SELECT COUNT(*) FROM tbl_tx WHERE status = 1 AND network ='{chain}'),
failed = (SELECT COUNT(*) FROM tbl_tx WHERE status = 1 AND network ='{chain}'),
waiting_profit = (SELECT SUM(our_fees) FROM tbl_tx WHERE status = 0 AND network ='{chain}'),
sent_profit = (SELECT SUM(our_fees) FROM tbl_tx WHERE status = 0 AND network ='{chain}'),
missed_profit = (SELECT SUM(our_fees) FROM tbl_tx WHERE status = 0 AND network ='{chain}')
unique_inputs = (SELECT COUNT(*) FROM tbl_inp JOIN tbl_tx ON(tbl_inp.txid = tbl_tx.txid) WHERE tbl_tx.status=0 AND tbl_tx.network ='{chain}')
WHERE chain = '{chain}'
*/
if let Err(err) = db.execute(&sql){
error!("error inserting creating stats table {err}");
}
else{
info!("tbl_stats creation success");
}
Ok(())
}
async fn send_stats_report(
cfg: &MyConfig,
bcinfo: GetBlockchainInfoResult,
) -> Result<(), reqwest::Error> {
if cfg.send_stats {
debug!("sending report to welist");
let welist_url=env::var("WELIST_SERVER_URL").unwrap_or("https://welist.bitcoin-after.life".to_string());
let welist_url = env::var("WELIST_SERVER_URL")
.unwrap_or("https://welist.bitcoin-after.life".to_string());
let client = rClient::new();
let url = format!("{}/ping", welist_url);
debug!("welist url: {}", url);
let chain = bcinfo.chain.to_string().to_lowercase();
let message = format!("{0}{1}{2}{3}{4}",cfg.url,chain,bcinfo.blocks,bcinfo.median_time,bcinfo.best_block_hash);
let message = format!(
"{0}{1}{2}{3}{4}",
cfg.url, chain, bcinfo.blocks, bcinfo.median_time, bcinfo.best_block_hash
);
trace!("message to be sent: {}", message);
let sign = sign_message(cfg.ssl_key_path.as_str(), &message.as_str());
let response = client.post(url)
let response = client
.post(url)
.header("User-Agent", format!("bal-pusher/{}", VERSION))
.json(&json!(
{
@@ -332,9 +403,14 @@ async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> R
"last_block_hash": bcinfo.best_block_hash,
"signature": sign,
}))
.send().await?;
.send()
.await?;
if !response.status().is_success() {
warn!("Non-success response: {} {}", response.status(), response.status().canonical_reason().unwrap_or(""));
warn!(
"Non-success response: {} {}",
response.status(),
response.status().canonical_reason().unwrap_or("")
);
}
let body = &(response.text().await?);
@@ -343,8 +419,6 @@ async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> R
debug!("Not sending stats");
}
Ok(())
}
fn sign_message(private_key_path: &str, message: &str) -> String {
let key_data = fs::read(private_key_path).unwrap();
@@ -354,59 +428,16 @@ fn sign_message(private_key_path: &str, message: &str) -> String {
let signature = signer.sign_oneshot_to_vec(message.as_bytes()).unwrap();
let signature_b64 = general_purpose::STANDARD.encode(&signature);
signature_b64
}
fn parse_env(cfg: &mut MyConfig) {
match env::var("BAL_PUSHER_ZMQ_LISTENER") {
Ok(value) => {
cfg.zmq_listener = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_REQUEST_FILE") {
Ok(value) => {
cfg.requests_file = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_DB_FILE") {
Ok(value) => {
cfg.db_file = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_BITCOIN_DIR") {
Ok(value) => {
cfg.bitcoin_dir = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_SEND_STATS") {
Ok(value) => {
cfg.send_stats = value.parse::<bool>().unwrap();
},
Err(_) => {},
}
match env::var("BAL_SERVER_URL") {
Ok(value) => {
cfg.url= value;},
Err(_) => {},
}
match env::var("WELIST_SECRET_CODE") {
Ok(value) => {
cfg.secret_code = value;},
Err(_) => {},
}
match env::var("SSL_KEY_PATH") {
Ok(value) => {
cfg.ssl_key_path = value;},
Err(_) => {},
}
cfg.regtest = parse_env_netconfig(cfg, "regtest");
cfg.signet = parse_env_netconfig(cfg, "signet");
cfg.testnet = parse_env_netconfig(cfg, "testnet");
drop(parse_env_netconfig(cfg, "bitcoin"));
}
fn parse_env_netconfig(cfg_lock: &mut MyConfig, chain: &str) -> NetworkParams {
//fn parse_env_netconfig(cfg_lock: &MutexGuard<MyConfig>, chain: &str) -> &NetworkParams{
@@ -417,47 +448,53 @@ fn parse_env_netconfig(cfg_lock: &mut MyConfig, chain: &str) -> NetworkParams{
&_ => &mut cfg_lock.mainnet,
};
match env::var(format!("BAL_PUSHER_{}_HOST", chain.to_uppercase())) {
Ok(value) => { cfg.host= value; },
Err(_) => {},
Ok(value) => {
cfg.host = value;
}
Err(_) => {}
}
match env::var(format!("BAL_PUSHER_{}_PORT", chain.to_uppercase())) {
Ok(value) => match value.parse::<u64>() {
Ok(value) => {
match value.parse::<u64>(){
Ok(value) =>{ cfg.port = value.try_into().unwrap(); },
Err(_) => {},
cfg.port = value.try_into().unwrap();
}
}
Err(_) => {},
Err(_) => {}
},
Err(_) => {}
}
match env::var(format!("BAL_PUSHER_{}_DIR_PATH", chain.to_uppercase())) {
Ok(value) => { cfg.dir_path = value; },
Err(_) => {},
Ok(value) => {
cfg.dir_path = value;
}
Err(_) => {}
}
match env::var(format!("BAL_PUSHER_{}_DB_FIELD", chain.to_uppercase())) {
Ok(value) => { cfg.db_field = value; },
Err(_) => {},
Ok(value) => {
cfg.db_field = value;
}
Err(_) => {}
}
match env::var(format!("BAL_PUSHER_{}_COOKIE_FILE", chain.to_uppercase())) {
Ok(value) => {
cfg.cookie_file = value; },
Err(_) => {},
cfg.cookie_file = value;
}
Err(_) => {}
}
match env::var(format!("BAL_PUSHER_{}_RPC_USER", chain.to_uppercase())) {
Ok(value) => { cfg.rpc_user = value; },
Err(_) => {},
Ok(value) => {
cfg.rpc_user = value;
}
Err(_) => {}
}
match env::var(format!("BAL_PUSHER_{}_RPC_PASSWORD", chain.to_uppercase())) {
Ok(value) => { cfg.rpc_pass = value; },
Err(_) => {},
Ok(value) => {
cfg.rpc_pass = value;
}
Err(_) => {}
}
cfg.clone()
}
fn get_default_config()-> MyConfig {
let file = confy::get_configuration_file_path("bal-pusher",None).expect("Error while getting path");
info!("Default configuration file path is: {:#?}", file);
confy::load("bal-pusher",None).expect("cant_load")
}
fn check_zmq_connection(endpoint: &str) -> bool {
trace!("check zmq connection");
@@ -529,40 +566,23 @@ enum ConnectionStatus {
#[tokio::main]
async fn main() -> std::io::Result<()> {
env_logger::init();
let mut cfg: MyConfig = match env::var("BAL_PUSHER_CONFIG_FILE") {
Ok(value) => {
match confy::load_path(&value){
Ok(val) => {
info!("The configuration file path is: {:#?}", value);
val
},
Err(err) => {
error!("{}",err);
get_default_config()
}
}
},
Err(_) => {
get_default_config()
},
};
let mut cfg = MyConfig::default();
let dbfile = env::var("BAL_PUSHER_DB_FILE").unwrap();
parse_env(&mut cfg);
let mut args = std::env::args();
let _exe_name = args.next().unwrap();
let arg_network = match args.next() {
Some(nargs) => nargs,
None => "bitcoin".to_string()
None => "bitcoin".to_string(),
};
let network = match arg_network.as_str() {
"testnet" => Network::Testnet,
"signet" => Network::Signet,
"regtest" => Network::Regtest,
_ => Network::Bitcoin,
};
info!("Network: {}", arg_network);
let network_params = get_network_params(&cfg, network);
@@ -586,7 +606,10 @@ async fn main()-> std::io::Result<()>{
let body = message[1].clone();
let seq = message[2].clone();
last_seq = seq;
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
debug!(
"ZMQ:GET TOPIC: {}",
String::from_utf8(topic.clone()).expect("invalid topic")
);
trace!("ZMQ:GET BODY: {}", hex::encode(&body));
if topic == b"hashblock" {
info!("NEW BLOCK: {}", hex::encode(&body));
@@ -598,7 +621,9 @@ async fn main()-> std::io::Result<()>{
fn seq_to_str(seq: &Vec<u8>) -> String {
if seq.len() == 4 {
let mut rdr = Cursor::new(seq);
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
let sequence = rdr
.read_u32::<LittleEndian>()
.expect("Failed to read integer");
return sequence.to_string();
}
"Unknown".to_string()

View File

@@ -1,32 +1,34 @@
use bytes::Bytes;
use http_body_util::{ combinators::BoxBody, BodyExt, Empty, Full };
use http_body_util::{BodyExt, Empty, Full, combinators::BoxBody};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use tokio::net::TcpListener;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use std::net::IpAddr;
use std::env;
use std::net::IpAddr;
//use std::time::{SystemTime,UNIX_EPOCH};
use std::fs;
use std::sync::{Arc, Mutex, MutexGuard};
//use std::net::SocketAddr;
use sqlite::{Connection, State, Value};
use std::collections::HashMap;
use sqlite::{ State, Value, Connection };
use bitcoin::{ consensus, Transaction, Network };
use bitcoin::{Network, Transaction, consensus};
use hex_conservative::FromHex;
use regex::Regex;
use serde::{ Serialize, Deserialize};
use log::{ info, error, trace, debug};
use serde_json;
use chrono::Utc;
use hex_conservative::FromHex;
use log::{debug, error, info, trace};
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json;
use bal_server::db::{ create_database, get_next_address_index, insert_xpub, save_new_address, get_last_used_address_by_ip, execute_insert };
use bal_server::db::{
create_database, execute_insert, get_last_used_address_by_ip, get_next_address_index,
insert_xpub, save_new_address,
};
use bal_server::xpub::new_address_from_xpub;
const VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -65,6 +67,7 @@ struct MyConfig {
bind_port: u16, // Changed to u16 for port numbers
db_file: String,
pub_key_path: String,
expose_stats: bool,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -73,9 +76,21 @@ pub struct Info {
pub base_fee: u64,
pub chain: String,
pub info: String,
pub version: String
pub version: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Stats {
pub report_date: String,
pub chain: String,
pub totals:i64,
pub waiting:i64,
pub sent:i64,
pub failed:i64,
pub waiting_profit:i64,
pub sent_profit:i64,
pub missed_profit:i64,
pub unique_inputs:i64,
}
impl Default for MyConfig {
fn default() -> Self {
@@ -89,6 +104,7 @@ impl Default for MyConfig {
db_file: "bal.db".to_string(),
info: "Will Executor Server".to_string(),
pub_key_path: "public_key.pem".to_string(),
expose_stats:env::var("BAL_SERVER_EXPOSE_STATS").unwrap_or("false".to_string()).parse::<bool>().unwrap(),
}
}
}
@@ -103,12 +119,10 @@ impl MyConfig {
}
}
async fn echo_version(
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
async fn echo_version() -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
Ok(Response::new(full(VERSION)))
}
async fn echo_home(cfg: &MyConfig
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
async fn echo_home(cfg: &MyConfig) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
debug!("echo_home: {}", cfg.info);
Ok(Response::new(full(cfg.info.clone())))
}
@@ -119,10 +133,65 @@ async fn echo_pub_key(
.expect(format!("Failed to read public key file {}", cfg.pub_key_path).as_str());
Ok(Response::new(full(pub_key)))
}
async fn echo_stats(
param: &str,
cfg: &MyConfig,
remote_addr: &String,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo stats!!! {} - {}", param,cfg.expose_stats);
let netconfig = MyConfig::get_net_config(cfg, param);
if !netconfig.enabled {
debug!("network disabled {}", param);
return Ok(Response::new(full("network disabled")));
}
let sql = format!("SELECT
report_date,
chain,
totals,
waiting,
sent,
failed,
waiting_profit,
sent_profit,
missed_profit,
unique_inputs FROM tbl_stats where chain = '{}'
", netconfig.name);
let mut stats:Vec<Stats>=vec![];
let db = sqlite::open(&cfg.db_file).unwrap();
db.iterate(&sql,|pairs|{
let row: HashMap<_, _> = pairs.into_iter().map(|(k,v)| (k.to_string(), v.map(|s| s))).collect();
//let row:HashMap<_,_>= pairs.into_iter().collect();
println!("row report date {}",row["report_date"].clone().unwrap());
dbg!(&row);
stats.push(Stats{
report_date: row["report_date"].clone().unwrap().to_string(),
chain: row["chain"].clone().unwrap().to_string(),
totals: row["totals"].clone().unwrap().parse::<i64>().unwrap(),
waiting: row["waiting"].clone().unwrap().parse::<i64>().unwrap(),
sent: row["sent"].clone().unwrap().parse::<i64>().unwrap(),
failed: row["failed"].clone().unwrap().parse::<i64>().unwrap(),
waiting_profit: row["waiting_profit"].clone().unwrap().parse::<i64>().unwrap(),
sent_profit: row["sent_profit"].clone().unwrap().parse::<i64>().unwrap(),
missed_profit:row["missed_profit"].clone().unwrap().parse::<i64>().unwrap(),
unique_inputs: row["unique_inputs"].clone().unwrap().parse::<i64>().unwrap(),
});
true
});
match serde_json::to_string(&stats) {
Ok(json_data) => {
debug!("echo info reply: {}", json_data);
return Ok(Response::new(full(json_data)));
}
Err(err) => Ok(Response::new(full(format!("error:{}", err)))),
}
}
async fn echo_info(
param: &str,
cfg: &MyConfig,
remote_addr: String,
remote_addr: &String,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo info!!!{}", param);
let netconfig = MyConfig::get_net_config(cfg, param);
@@ -135,14 +204,21 @@ async fn echo_info(
let address = netconfig.address.to_string();
trace!("is address: {}", &address);
address
},
}
true => {
let db = sqlite::open(&cfg.db_file).unwrap();
match get_last_used_address_by_ip(&db,&netconfig.name,&netconfig.address,&remote_addr){
match get_last_used_address_by_ip(
&db,
&netconfig.name,
&netconfig.address,
&remote_addr,
) {
Some(address) => address,
None => {
let next = get_next_address_index(&db, &netconfig.name, &netconfig.address);
let address = new_address_from_xpub(&netconfig.address,next.1,netconfig.network).unwrap();
let address =
new_address_from_xpub(&netconfig.address, next.1, netconfig.network)
.unwrap();
save_new_address(&db, next.0, &address.0, &address.1, &remote_addr);
debug!("save new address {} {}", address.0, address.1);
trace!("next {} {}", next.0, next.1);
@@ -156,21 +232,19 @@ async fn echo_info(
base_fee: netconfig.fixed_fee,
chain: netconfig.network.to_string(),
info: cfg.info.to_string(),
version: VERSION.to_string()
version: VERSION.to_string(),
};
trace!("address: {:#?}", info);
match serde_json::to_string(&info) {
Ok(json_data) => {
debug!("echo info reply: {}", json_data);
return Ok(Response::new(full(json_data)));
},
Err(err) => Ok(Response::new(full(format!("error:{}",err))))
}
Err(err) => Ok(Response::new(full(format!("error:{}", err)))),
}
async fn echo_search(whole_body: &Bytes,
}
async fn echo_search(
whole_body: &Bytes,
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo search!!!");
@@ -181,7 +255,9 @@ async fn echo_search(whole_body: &Bytes,
*response.status_mut() = StatusCode::BAD_REQUEST;
if !strbody.is_empty() && strbody.len() <= 70 {
let db = sqlite::open(&cfg.db_file).unwrap();
let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ? LIMIT 1").unwrap();
let mut statement = db
.prepare("SELECT * FROM tbl_tx WHERE txid = ? LIMIT 1")
.unwrap();
statement.bind((1, strbody)).unwrap();
if let Ok(State::Row) = statement.next() {
@@ -234,17 +310,16 @@ async fn echo_search(whole_body: &Bytes,
};
response = match serde_json::to_string(&response_data) {
Ok(json_data) => Response::new(full(json_data)),
Err(_) => { response }
Err(_) => response,
};
return Ok(response);
}
}
Ok(response)
}
async fn echo_push(whole_body: &Bytes,
async fn echo_push(
whole_body: &Bytes,
cfg: &MyConfig,
param: &str,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
@@ -282,7 +357,7 @@ async fn echo_push(whole_body: &Bytes,
for line in lines {
if line.is_empty() {
trace!("line len is: {}", line.len());
continue
continue;
}
let linea = format!("{req_time}:{line}");
info!("New Tx: {}", linea);
@@ -290,14 +365,17 @@ async fn echo_push(whole_body: &Bytes,
Ok(raw_tx) => raw_tx,
Err(err) => {
error!("rawtx error: {}", err);
continue
continue;
}
};
if !raw_tx.is_empty() {
trace!("len: {}", raw_tx.len());
let tx: Transaction = match consensus::deserialize(&raw_tx) {
Ok(tx) => tx,
Err(err) => {error!("error: unable to parse tx: {}\n{}",line,err);continue}
Err(err) => {
error!("error: unable to parse tx: {}\n{}", line, err);
continue;
}
};
let txid = tx.compute_txid().to_string();
trace!("txid: {}", txid);
@@ -322,17 +400,25 @@ async fn echo_push(whole_body: &Bytes,
}
sqlinps = format!("{sqlinps} SELECT ?, ?, ?");
pinps.push((lineinp, Value::String(txid.to_string())));
pinps.push((lineinp+1,Value::String(input.previous_output.txid.to_string())));
pinps.push((lineinp+2,Value::String(input.previous_output.vout.to_string())));
pinps.push((
lineinp + 1,
Value::String(input.previous_output.txid.to_string()),
));
pinps.push((
lineinp + 2,
Value::String(input.previous_output.vout.to_string()),
));
lineinp += 3;
}
if netconfig.fixed_fee == 0 {
found = true;
}
for (idx, output) in tx.output.into_iter().enumerate() {
let script_pubkey = output.script_pubkey;
let address = match bitcoin::Address::from_script(script_pubkey.as_script(), netconfig.network){
let address = match bitcoin::Address::from_script(
script_pubkey.as_script(),
netconfig.network,
) {
Ok(address) => address.to_string(),
Err(_) => String::new(),
};
@@ -363,12 +449,15 @@ async fn echo_push(whole_body: &Bytes,
pouts.push((lineout, Value::String(txid.to_string())));
pouts.push((lineout + 1, Value::Integer(idx.try_into().unwrap())));
pouts.push((lineout + 2, Value::String(script_pubkey.to_string())));
pouts.push((lineout+3,Value::Integer(amount.to_sat().try_into().unwrap())));
pouts.push((
lineout + 3,
Value::Integer(amount.to_sat().try_into().unwrap()),
));
lineout += 4;
}
if !found {
error!("willexecutor output not found ");
return Ok(response)
return Ok(response);
} else {
if !union_tx {
sqltxs = format!("{sqltxs} UNION ALL");
@@ -393,7 +482,7 @@ async fn echo_push(whole_body: &Bytes,
}
}
if sqltxs.is_empty() && already_present {
return Ok(Response::new(full("already present")))
return Ok(Response::new(full("already present")));
}
let sqltxs = format!("{}{};", sqltxshead, sqltxs);
let sqlinps = format!("{}{};", sqlinpshead, sqlinps);
@@ -415,20 +504,24 @@ fn match_uri<'a>(path: &str, uri: &'a str) -> Option<&'a str> {
None
}
async fn echo(
req: Request<hyper::body::Incoming>,
cfg: &MyConfig,
ip: &String
ip: &String,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let mut not_found = Response::new(empty());
*not_found.status_mut() = StatusCode::NOT_FOUND;
let mut ret: Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> = Ok(not_found);
let uri = req.uri().path().to_string();
let remote_addr = req.headers().get("X-Real-IP").and_then(|value| value.to_str().ok()).and_then(|xff| xff.split(',').next()).map(|ip| ip.trim().to_string()).unwrap_or_else(|| ip.to_string());
let remote_addr = req
.headers()
.get("X-Real-IP")
.and_then(|value| value.to_str().ok())
.and_then(|xff| xff.split(',').next())
.map(|ip| ip.trim().to_string())
.unwrap_or_else(|| ip.to_string());
trace!("{}: {}", remote_addr, uri);
match *req.method() {
// Serve some instructions at /
@@ -445,8 +538,11 @@ async fn echo(
ret
}
Method::GET => {
if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/stats$", uri.as_str()) {
ret = echo_stats(param, cfg, &remote_addr).await;
}
if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/info$", uri.as_str()) {
ret = echo_info(param,cfg,remote_addr).await;
ret = echo_info(param, cfg, &remote_addr).await;
}
if uri == "/version" {
ret = echo_version().await;
@@ -461,7 +557,7 @@ async fn echo(
}
// Return the 404 Not Found for other routes.
_ => ret
_ => ret,
}
}
@@ -501,7 +597,6 @@ fn parse_env(cfg: &Arc<Mutex<MyConfig>>){
cfg_lock.pub_key_path = value;
}
if let Ok(value) = env::var("BAL_SERVER_INFO") {
debug!("BAL_SERVER_INFO: {}", value);
cfg_lock.info = value;
@@ -510,9 +605,11 @@ fn parse_env(cfg: &Arc<Mutex<MyConfig>>){
cfg_lock = parse_env_netconfig(cfg_lock, "signet");
cfg_lock = parse_env_netconfig(cfg_lock, "testnet");
drop(parse_env_netconfig(cfg_lock, "bitcoin"));
}
fn parse_env_netconfig<'a>(mut cfg_lock: MutexGuard<'a, MyConfig>, chain: &'a str) -> MutexGuard<'a, MyConfig>{
fn parse_env_netconfig<'a>(
mut cfg_lock: MutexGuard<'a, MyConfig>,
chain: &'a str,
) -> MutexGuard<'a, MyConfig> {
let cfg = match chain {
"regtest" => &mut cfg_lock.regtest,
"signet" => &mut cfg_lock.signet,
@@ -552,7 +649,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cfg: Arc<Mutex<MyConfig>> = Arc::<Mutex<MyConfig>>::default();
parse_env(&cfg);
let cfg_lock = cfg.lock().unwrap();
let db = sqlite::open(&cfg_lock.db_file).unwrap();
@@ -565,24 +661,31 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind((addr, cfg_lock.bind_port)).await?;
info!("Listening on http://{}:{}", addr, cfg_lock.bind_port);
loop {
let (stream, _) = listener.accept().await?;
let ip = stream.peer_addr()?.to_string().split(":").next().unwrap().to_string();
let ip = stream
.peer_addr()?
.to_string()
.split(":")
.next()
.unwrap()
.to_string();
let io = TokioIo::new(stream);
tokio::task::spawn({
let cfg = cfg_lock.clone();
async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(|req: Request<hyper::body::Incoming>| async {
.serve_connection(
io,
service_fn(|req: Request<hyper::body::Incoming>| async {
echo(req, &cfg, &ip).await
}))
}),
)
.await
{
error!("Error serving connection: {:?}", err);
}
}
});
}

View File

@@ -1,5 +1,5 @@
use sqlite::{ Connection, Value, State, Error };
use log::{info, trace, error};
use log::{error, info, trace};
use sqlite::{Connection, Error, State, Value};
pub fn create_database(db: &Connection) {
info!("database sanity check");
@@ -9,17 +9,15 @@ pub fn create_database(db: &Connection){
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_inp(id, txid, in_txid, in_vout);");
let _ = db.execute("CREATE UNIQUE INDEX ON tbl_inp(txid,in_txid,in_vout);");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_out(id, txid, script_pubkey, amount, vout);");
let _ =
db.execute("CREATE TABLE IF NOT EXISTS tbl_out(id, txid, script_pubkey, amount, vout);");
let _ = db.execute("CREATE UNIQUE INDEX ON tbl_out(txid, script_pubkey, amount, vout);");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_xpub (id INTEGER PRIMARY KEY , network TEXT, xpub TEXT, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP,path_idx INTEGER DEFAULT -1);");
let _ = db.execute("CREATE UNIQUE INDEX idx_xpub ON tbl_xpub (network, xpub)");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);");
let _ = db.execute("UPDATE tbl_tx set network='bitcoin' where network='mainnet');");
}
/*
pub fn get_xpub_id(db: &Connection, network: &String, xpub: &String) -> Option<i64>{
@@ -36,14 +34,21 @@ pub fn create_database(db: &Connection){
pub fn insert_xpub(db: &Connection, network: &String, xpub: &String) {
if xpub != "" {
trace!("going to insert: {} xpub:{}", network, xpub);
let mut stmt = db.prepare ("INSERT INTO tbl_xpub(network,xpub) VALUES(?, ?);").unwrap();
let mut stmt = db
.prepare("INSERT INTO tbl_xpub(network,xpub) VALUES(?, ?);")
.unwrap();
let _ = stmt.bind((1, Value::String(network.to_string()))).unwrap();
let _ = stmt.bind((2, Value::String(xpub.to_string()))).unwrap();
let _ = stmt.next();
}
}
pub fn get_last_used_address_by_ip(db: &Connection, network: &String, xpub: &String, address: &String) -> Option<String>{
pub fn get_last_used_address_by_ip(
db: &Connection,
network: &String,
xpub: &String,
address: &String,
) -> Option<String> {
let mut stmt = db.prepare("SELECT tbl_address.address FROM tbl_xpub join tbl_address on(tbl_xpub.id = tbl_address.xpub) where tbl_xpub.network = ? and tbl_address.remote_address = ? and tbl_xpub.xpub = ? ORDER BY tbl_address.date_create DESC LIMIT 1;").unwrap();
let _ = stmt.bind((1, Value::String(network.to_string())));
let _ = stmt.bind((2, Value::String(address.to_string())));
@@ -54,7 +59,6 @@ pub fn get_last_used_address_by_ip(db: &Connection, network: &String, xpub: &Str
} else {
return None;
}
}
pub fn get_next_address_index(db: &Connection, network: &String, xpub: &String) -> (i64, i64) {
let mut stmt = db.prepare("UPDATE tbl_xpub SET path_idx = path_idx + 1 WHERE network = ? and xpub= ? RETURNING path_idx,id;").unwrap();
@@ -65,43 +69,59 @@ pub fn get_next_address_index(db: &Connection, network: &String, xpub: &String)
let next = stmt.read::<i64, _>("path_idx").unwrap();
let id = stmt.read::<i64, _>("id").unwrap();
return (id, next);
},Err(_)=> {
}
Err(_) => {
return (0, 0);
},Ok(State::Done) =>{
}
Ok(State::Done) => {
return (0, 0);
}
};
}
pub fn save_new_address(db: &Connection,xpub: i64,address: &String, path: &String,remote_addr: &String){
let mut stmt = db.prepare("INSERT INTO tbl_address(address,path,xpub,remote_address) VALUES(?,?,?,?);").unwrap();
pub fn save_new_address(
db: &Connection,
xpub: i64,
address: &String,
path: &String,
remote_addr: &String,
) {
let mut stmt = db
.prepare("INSERT INTO tbl_address(address,path,xpub,remote_address) VALUES(?,?,?,?);")
.unwrap();
stmt.bind((1, Value::String(address.to_string()))).unwrap();
stmt.bind((2, Value::String(path.to_string()))).unwrap();
stmt.bind((3, Value::Integer(xpub))).unwrap();
stmt.bind((4,Value::String(remote_addr.to_string()))).unwrap();
stmt.bind((4, Value::String(remote_addr.to_string())))
.unwrap();
let _ = stmt.next();
}
pub fn execute_insert(db: &Connection,
pub fn execute_insert(
db: &Connection,
sqltxs: String,
ptx: Vec<(usize, Value)>,
sqlinp: String,
pinp: Vec<(usize, Value)>,
sqlout: String,
pout: Vec<(usize, Value)>) -> Result<(),Error>{
pout: Vec<(usize, Value)>,
) -> Result<(), Error> {
let _ = db.execute("BEGIN TRANSACTION");
let mut stmt = db.prepare(sqltxs.as_str()).expect("failed to prepare sqltxs");
let mut stmt = db
.prepare(sqltxs.as_str())
.expect("failed to prepare sqltxs");
if let Err(err) = stmt.bind::<&[(_, Value)]>(&ptx[..]) {
error!("error binding transaction parameters: {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
}
if let Err(err) = stmt.next() {
error!("error inserting transactions {}", err);
let _ = db.execute("ROLLBACK");
} else {
let mut stmt = db.prepare(sqlinp.as_str()).expect("failed to prepare sqlinp");
let mut stmt = db
.prepare(sqlinp.as_str())
.expect("failed to prepare sqlinp");
if let Err(err) = stmt.bind::<&[(_, Value)]>(&pinp[..]) {
error!("error binding inputs parameters {}", err);
let _ = db.execute("ROLLBACK");
@@ -111,9 +131,10 @@ pub fn execute_insert(db: &Connection,
error!("error inserting inputs {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
} else {
let mut stmt = db.prepare(sqlout.as_str()).expect("failed to prepare sqlout");
let mut stmt = db
.prepare(sqlout.as_str())
.expect("failed to prepare sqlout");
if let Err(err) = stmt.bind::<&[(_, Value)]>(&pout[..]) {
error!("error binding outs parameters {}", err);
let _ = db.execute("ROLLBACK");
@@ -123,25 +144,20 @@ pub fn execute_insert(db: &Connection,
error!("error inserting outs {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
}
}
}
let _ = db.execute("COMMIT");
Ok(())
}
pub fn get_total_transaction_number(db: Connection, network: &String) -> Result<i64, Error> {
let mut stmt = db.prepare("SELECT COUNT(*) as total_number FROM tbl_tx where network = ?;").unwrap();
let mut stmt = db
.prepare("SELECT COUNT(*) as total_number FROM tbl_tx where network = ?;")
.unwrap();
stmt.bind((1, Value::String(network.to_string()))).unwrap();
match stmt.next() {
Ok(State::Row)=>{
Ok(stmt.read::<i64,_>("total_number").unwrap())
},
Ok(State::Row) => Ok(stmt.read::<i64, _>("total_number").unwrap()),
Ok(sqlite::State::Done) => todo!(),
Err(err)=>Err(err)
Err(err) => Err(err),
}
}

View File

@@ -1,14 +1,13 @@
use sha2::{Digest, Sha256};
use bitcoin::bip32::Xpub;
use std::str::FromStr;
use bitcoin::bip32::DerivationPath;
use bitcoin::key::Secp256k1;
use bitcoin::Address;
use bitcoin::Network;
use bitcoin::ScriptBuf;
use bitcoin::WPubkeyHash;
use bitcoin::Network;
use bitcoin::bip32::DerivationPath;
use bitcoin::bip32::Xpub;
use bitcoin::hashes::Hash;
use bitcoin::key::Secp256k1;
use sha2::{Digest, Sha256};
use std::str::FromStr;
// Mainnet (BIP44/BIP49/BIP84)
enum BS58Prefix {
@@ -26,8 +25,6 @@ const XPUB_PREFIX:[u8; 4] = [0x04, 0x88, 0xB2, 0x1E]; // xpub (Legacy P2PKH)
//const VPUB_PREFIX:[u8; 4] = [0x04, 0x5F, 0x1C, 0xF6]; // vpub (Testnet Nested SegWit)
//const UPUB_PREFIX:[u8; 4] = [0x04, 0x4A, 0x52, 0x62]; // upub (RegTest Nested SegWit)
fn base58check_decode(s: &str) -> Result<Vec<u8>, String> {
let data = bs58::decode(s).into_vec().map_err(|e| e.to_string())?;
if data.len() < 4 {
@@ -48,24 +45,30 @@ fn base58check_encode(data: &[u8]) -> String {
}
fn convert_to(zpub: &str, prefix: BS58Prefix) -> Result<String, String> {
let mut data = base58check_decode(zpub)?;
if data.len() < 4 {
return Err("Non è una zpub valida.".to_string());
}
data.splice(0..4, match prefix {
data.splice(
0..4,
match prefix {
BS58Prefix::Xpub => XPUB_PREFIX,
//BS58Prefix::Ypub => YPUB_PREFIX,
//BS58Prefix::Zpub => ZPUB_PREFIX,
//BS58Prefix::Vpub => VPUB_PREFIX,
//BS58Prefix::Tpub => TPUB_PREFIX,
//BS58Prefix::Upub => UPUB_PREFIX,
});
},
);
Ok(base58check_encode(&data))
}
pub fn new_address_from_xpub(zpub: &str, index: i64,network: Network)-> Result<(String,String), Box<dyn std::error::Error>>{
pub fn new_address_from_xpub(
zpub: &str,
index: i64,
network: Network,
) -> Result<(String, String), Box<dyn std::error::Error>> {
let xpub = Xpub::from_str(&convert_to(zpub, BS58Prefix::Xpub)?)?;
let path = format!("m/0/{}", index);
let derivation_path = DerivationPath::from_str(path.as_str())?;
@@ -79,7 +82,6 @@ pub fn new_address_from_xpub(zpub: &str, index: i64,network: Network)-> Result<(
let address = Address::from_script(&redeem_script, network)?;
//let address = Address::from_script(&script_pubkey, network)?;
Ok((address.to_string(), path.to_string()))
}
/*
fn main() -> Result<(), Box<dyn std::error::Error>>{