9 Commits

Author SHA1 Message Date
117ee7b932 env log 2026-02-06 12:07:15 -04:00
7604406fdf formatting code 2026-02-06 12:03:10 -04:00
8e322ca441 RPC.md 2026-01-29 13:36:06 -04:00
dcaab2ed6d expose stats 2026-01-21 00:08:47 -04:00
fa98283df6 version 2026-01-19 20:03:17 -04:00
d5fe22cc14 bal server route / 2025-11-03 09:36:56 -04:00
83749afddd update version 2025-11-03 08:53:58 -04:00
dd075508b7 fix bal-pusher zmq connection stability 2025-11-03 08:46:13 -04:00
4ac492ba79 fix bal-pusher zmq connection stability 2025-11-03 07:42:09 -04:00
11 changed files with 1219 additions and 723 deletions

12
Cargo.lock generated
View File

@@ -124,8 +124,8 @@ dependencies = [
] ]
[[package]] [[package]]
name = "bal-server" name = "bal_server"
version = "0.1.0" version = "0.2.3"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bitcoin", "bitcoin",
@@ -1315,9 +1315,9 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.81" version = "1.0.103"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba" checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@@ -1753,9 +1753,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.60" version = "2.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3" checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@@ -1,33 +1,33 @@
[package] [package]
name = "bal-server" name = "bal_server"
version = "0.1.0" version = "0.2.3"
edition = "2021" edition = "2024"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
base64 = "0.22.1" base64 = { version = "0.22.1" }
bs58 = "0.4.0" bs58 = { version = "0.4.0" }
bytes = "1.2" bytes = { version = "1.2" }
bitcoin = { version = "0.32.5" } bitcoin = { version = "0.32.5" }
bitcoincore-rpc = "0.19.0" bitcoincore-rpc = { version = "0.19.0" }
bitcoincore-rpc-json = "0.19.0" bitcoincore-rpc-json = { version = "0.19.0" }
byteorder = "1.5.0" byteorder = { version = "1.5.0" }
confy = "0.6.1" confy = { version = "0.6.1" }
chrono = "0.4.40" chrono = { version = "0.4.40" }
env_logger = "0.11.5" env_logger = { version = "0.11.5" }
hex = "0.4.3" hex = { version = "0.4.3" }
hex-conservative = "0.1.1" hex-conservative = { version = "0.1.1" }
hyper = { version = "1.3.1", features = ["http1","server"] } hyper = { version = "1.3.1", features = ["http1","server"] }
hyper-util = { version = "0.1.3", features = ["tokio"] } hyper-util = { version = "0.1.3", features = ["tokio"] }
http-body-util = "0.1" http-body-util = { version = "0.1" }
log = "0.4.21" log = { version = "0.4.21" }
openssl = { version = "0.10.74", features = ["vendored"] } openssl = { version = "0.10.74", features = ["vendored"] }
sha2 = "0.10.8" sha2 = { version = "0.10.8" }
serde = { version = "1.0.152", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.116" serde_json = { version = "1.0.116" }
sqlite = "0.34.0" sqlite = { version = "0.34.0" }
regex = "1.10.4" regex = { version = "1.10.4" }
reqwest = { version = "0.12.24", features = ["json","socks"] } reqwest = { version = "0.12.24", features = ["json","socks"] }
tokio = { version = "1", features = ["rt", "net","macros","rt-multi-thread"] } # Keep only necessary runtime components tokio = { version = "1", features = ["rt", "net","macros","rt-multi-thread"] } # Keep only necessary runtime components
zmq = "0.10.0" zmq = { version = "0.10.0" }

View File

@@ -79,7 +79,7 @@ To use `bal-pusher`, you need to compile and install Bitcoin with ZMQ (ZeroMQ) s
Once the application is installed and configured, you can start `bal-pusher` by running the following command: Once the application is installed and configured, you can start `bal-pusher` by running the following command:
```bash ```bash
$ bal-pusher $ bal-pusher [bitcoin|testnet|regtest|]
``` ```
This will start the service, which will listen for Bitcoin blocks via ZMQ and push transactions from the database when their locktime exceeds the median time past. This will start the service, which will listen for Bitcoin blocks via ZMQ and push transactions from the database when their locktime exceeds the median time past.

119
RPC.md Normal file
View File

@@ -0,0 +1,119 @@
# RPC Endpoint Documentation
## Introduction
This document outlines the various endpoints provided by the Will Executor Server, a specialized server designed for managing Bitcoin transactions. Each endpoint can be accessed via HTTP GET or POST
requests with specific parameters and return values.
## Endpoints
### 1. **Server Information**
- **Endpoint:** `GET /`
- **Description:** Returns general information about the server.
- **Example URL:** `https://we.bitcoin-after.life/`
- **Response:**
```plaintext
Will Executor Server
```
### 2. **Server Public Key**
- **Endpoint:** `GET /.pub_key.pem`
- **Description:** Returns the public key of the server.
- **Example URL:** `https://we.bitcoin-after.life/.pub_key.pem`
- **Response:**
```plaintext
-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEAy10MSrWabdfco1c5Jo1XuohSdXSk1S0YaoEYvqZR5VE=
-----END PUBLIC KEY-----
```
### 3. **Server Version**
- **Endpoint:** `GET /version`
- **Description:** Returns the version of the server.
- **Example URL:** `https://we.bitcoin-after.life/version`
- **Response:**
```plaintext
0.2.2
```
### 4. **Network Information**
- **Endpoint:** `GET /<network>/info`
- **Description:** Returns information about a specific network.
- **Example URL:** `https://we.bitcoin-after.life/bitcoin/info`
- **Response:**
```json
{
"chain": "bitcoin",
"address": "bc1q5z32sl8at9s3sxt7mfwe6th4jxua98a0mvg8yz",
"base_fee": 1000,
"info": "Will Executor Server",
"version": "0.2.2"
}
```
### 5. **Network Statistics**
- **Endpoint:** `GET /<network>/stats`
- **Description:** Returns statistics for a specific network.
- **Example URL:** `https://we.bitcoin-after.life/bitcoin/stats`
- **Response:**
```json
[
{
"report_date": "2025-10-21 03:10:09",
"chain": "bitcoin",
"totals": 63,
"waiting": 8,
"sent": 30,
"failed": 25,
"waiting_profit": 80000,
"sent_profit": 300000,
"missed_profit": 250000,
"unique_inputs": 38
}
]
```
### 6. **Search Transaction**
- **Endpoint:** `POST /searchtx`
- **Description:** Searches for a transaction by its ID.
- **Example URL:** `https://we.bitcoin-after.life/searchtx`
- **Request Data:**
```plaintext
241ac86bdbf1408198b8c6df77e88159b43a9bb3464e55197a9fed8fdd628895
```
- **Response:**
```json
{
"status": "1",
"our_address": "bcrt1q7ajty6q3g055vvy6ryql9y3jz76x5uv806skk6",
"time": "1733755921197410086",
"our_fees": "10000",
"tx": "0200000000010281c28321ff6bcbfcd894bf4536d9a9fb4f4b56470db487da36f8330d495528600100000000fdffffff81c28321ff6bcbfcd894bf4536d9a9fb4f4b56470db487da36f8330d495528600200000000fdffffff031027000000"
}
```
### 7. **Push Transactions**
- **Endpoint:** `POST /push`
- **Description:** Pushes one or more transactions to the network.
- **Example URL:** `https://we.bitcoin-after.life/push`
- **Request Data:**
```plaintext
0200000000010281c28321ff6bcbfcd894bf4536d9a9fb4f4b56470db487da36f8330d495528600100000000fdffffff81c28321ff6bcbfcd894bf4536d9a9fb4f4b56470db487da36f8330d495528600200000000fdffffff031027000000
```
- **Response:**
- If successful, it returns the hash of each transaction:
```plaintext
thx
```
- If a transaction is already present or bad data is received, it returns an error message:
```plaintext
{
already present // or Bad data received
}
```
## Error Handling
- **400 Bad Request:** Returned when the request contains invalid parameters.
- **500 Internal Server Error:** Returned when the server encounters an unexpected condition.
This documentation should help you effectively interact with the Will Executor Server using its provided endpoints.

View File

@@ -12,5 +12,5 @@ BAL_PUSHER_SEND_STATS=true
WELIST_SERVER_URL=http://welist.bitcoin-after.life WELIST_SERVER_URL=http://welist.bitcoin-after.life
SSL_KEY_PATH=/home/bal/privkey.pem SSL_KEY_PATH=/home/bal/privkey.pem
#your server domain. do not add https or final / only domain. #your server domain. do not final / only domain.
BAL_SERVER_URL="https://we.bitcoin-after.life" BAL_SERVER_URL="https://we.bitcoin-after.life"

View File

@@ -1,12 +1,12 @@
RUST_LOG=trace RUST_LOG=trace
BAL_PUSHER_DB_FILE="$(pwd)/bal.db" export BAL_PUSHER_DB_FILE="$(pwd)/bal.db"
#export BAL_PUSHER_BITCOIN_COOKIE_FILE=/~/.bitcoin/.cookie #export BAL_PUSHER_BITCOIN_COOKIE_FILE=/~/.bitcoin/.cookie
#export BAL_PUSHER_REGTEST_COOKIE_FILE=/~/.bitcoin/regtest/.cookie #export BAL_PUSHER_REGTEST_COOKIE_FILE=/~/.bitcoin/regtest/.cookie
#export BAL_PUSHER_TESTNET_COOKIE_FILE=/~/.bitcoin/testnet3/.cookie #export BAL_PUSHER_TESTNET_COOKIE_FILE=/~/.bitcoin/testnet3/.cookie
#export BAL_PUSHER_SIGNET_COOKIE_FILE=/~/.bitcoin/signet/.cookie #export BAL_PUSHER_SIGNET_COOKIE_FILE=/~/.bitcoin/signet/.cookie
BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332 export BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332
export BAL_PUSHER_SEND_STATS=true export BAL_PUSHER_SEND_STATS=true
export WELIST_SERVER_URL=http://localhost:8085 export WELIST_SERVER_URL=http://localhost:8085
export BAL_SERVER_URL="http://127.0.0.1:9133" export BAL_SERVER_URL="http://127.0.0.1:9133"

View File

@@ -11,6 +11,7 @@ export BAL_SERVER_INFO="BAL devel willexecutor server"
export BAL_SERVER_BIND_ADDRESS="127.0.0.1" export BAL_SERVER_BIND_ADDRESS="127.0.0.1"
export BAL_SERVER_BIND_PORT=9133 export BAL_SERVER_BIND_PORT=9133
export BAL_SERVER_PUB_KEY_PATH="$WORKING_DIR/public_key.pem" export BAL_SERVER_PUB_KEY_PATH="$WORKING_DIR/public_key.pem"
export BAL_SERVER_EXPOSE_STATS=true;
#export BAL_SERVER_BITCOIN_ADDRESS="your bitcoin address or xpub to recive payments here" #export BAL_SERVER_BITCOIN_ADDRESS="your bitcoin address or xpub to recive payments here"
#export BAL_SERVER_BITCOIN_FIXED_FEE=50000 #export BAL_SERVER_BITCOIN_FIXED_FEE=50000

View File

@@ -2,208 +2,201 @@ extern crate bitcoincore_rpc;
extern crate zmq; extern crate zmq;
use bitcoin::Network; use bitcoin::Network;
use bitcoincore_rpc::{bitcoin, Auth, Client, Error, RpcApi}; use bitcoincore_rpc::{Auth, Client, Error, RpcApi, bitcoin};
use bitcoincore_rpc_json::GetBlockchainInfoResult; use bitcoincore_rpc_json::GetBlockchainInfoResult;
use sqlite::{Value}; use byteorder::{LittleEndian, ReadBytesExt};
use serde::Serialize; use hex;
use log::{debug, error, info, trace, warn};
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize;
use serde_json::json; use serde_json::json;
use sqlite::{Connection, Value};
use std::collections::HashMap;
use std::env; use std::env;
use log::{info,warn,error,trace,debug}; use std::error::Error as StdError;
use zmq::{Context, Socket}; use std::io::Cursor;
use std::str; use std::str;
use std::{thread, time::Duration}; use std::{thread, time::Duration};
use std::collections::HashMap; use zmq::{Context, DEALER, DONTWAIT, Socket};
//use byteorder::{LittleEndian, ReadBytesExt};
//use std::io::Cursor;
use hex;
use std::error::Error as StdError;
use reqwest::Client as rClient; use base64::{Engine as _, engine::general_purpose};
use openssl::hash::MessageDigest; use openssl::hash::MessageDigest;
use openssl::pkey::{PKey}; use openssl::pkey::PKey;
use openssl::sign::Signer; use openssl::sign::Signer;
use openssl::sign::Verifier; use openssl::sign::Verifier;
use base64::{engine::general_purpose, Engine as _}; use reqwest::Client as rClient;
use std::fs; use std::fs;
use std::time::Instant;
const LOCKTIME_THRESHOLD: i64 = 5000000;
const VERSION: &str = "0.0.2";
#[derive(Debug, Clone, Serialize, Deserialize)]
const LOCKTIME_THRESHOLD:i64 = 5000000;
const VERSION:&str = "0.0.1";
#[derive(Debug, Clone,Serialize, Deserialize)]
struct MyConfig { struct MyConfig {
zmq_listener: String, zmq_listener: String,
requests_file: String, db_file: String,
db_file: String, bitcoin_dir: String,
bitcoin_dir: String, regtest: NetworkParams,
regtest: NetworkParams, testnet: NetworkParams,
testnet: NetworkParams, signet: NetworkParams,
signet: NetworkParams, mainnet: NetworkParams,
mainnet: NetworkParams, send_stats: bool,
send_stats: bool, url: String,
url: String, ssl_key_path: String,
secret_code: String,
ssl_key_path: String
} }
impl Default for MyConfig { impl Default for MyConfig {
fn default() -> Self { fn default() -> Self {
MyConfig { MyConfig {
zmq_listener: "tcp://127.0.0.1:28332".to_string(), zmq_listener: env::var("BAL_PUSHER_ZMQ_LISTENER")
requests_file: "rawrequests.log".to_string(), .unwrap_or("tcp://127.0.0.1:28332".to_string()),
db_file: "../bal.db".to_string(), db_file: env::var("BAL_PUSHER_DB_FILE").unwrap_or("bal.db".to_string()),
bitcoin_dir: "".to_string(), bitcoin_dir: env::var("BAL_PUSHER_BITCOIN_DIR").unwrap_or("".to_string()),
regtest: get_network_params_default(Network::Regtest), regtest: get_network_params_default(Network::Regtest),
testnet: get_network_params_default(Network::Testnet), testnet: get_network_params_default(Network::Testnet),
signet: get_network_params_default(Network::Signet), signet: get_network_params_default(Network::Signet),
mainnet: get_network_params_default(Network::Bitcoin), mainnet: get_network_params_default(Network::Bitcoin),
send_stats: false, send_stats: env::var("BAL_PUSHER_SEND_STATS")
url: "http://localhost/".to_string(), .unwrap_or("false".to_string())
secret_code: "xxx".to_string(), .parse::<bool>()
ssl_key_path: "privkey.pem".to_string(), .unwrap(),
url: env::var("BAL_SERVER_URL").unwrap_or("http://localhost/".to_string()),
ssl_key_path: env::var("SSL_KEY_PATH").unwrap_or("privkey.pem".to_string()),
} }
} }
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
struct NetworkParams { struct NetworkParams {
host: String, host: String,
port: u16, port: u16,
dir_path: String, dir_path: String,
db_field: String, db_field: String,
cookie_file: String, cookie_file: String,
rpc_user: String, rpc_user: String,
rpc_pass: String, rpc_pass: String,
} }
fn get_network_params(cfg: &MyConfig,network:Network)-> &NetworkParams{ fn get_network_params(cfg: &MyConfig, network: Network) -> &NetworkParams {
match network{ match network {
Network::Testnet => &cfg.testnet, Network::Testnet => &cfg.testnet,
Network::Signet => &cfg.signet, Network::Signet => &cfg.signet,
Network::Regtest => &cfg.regtest, Network::Regtest => &cfg.regtest,
_ => &cfg.mainnet _ => &cfg.mainnet,
} }
} }
fn get_network_params_default(network:Network) -> NetworkParams{ fn get_network_params_default(network: Network) -> NetworkParams {
match network { match network {
Network::Testnet => NetworkParams{ Network::Testnet => NetworkParams {
host: "http://i27.0.0.1".to_string(), host: "http://i27.0.0.1".to_string(),
port: 18332, port: 18332,
dir_path: "testnet3/".to_string(), dir_path: "testnet3/".to_string(),
db_field: "testnet".to_string(), db_field: "testnet".to_string(),
cookie_file: "".to_string(), cookie_file: "".to_string(),
rpc_user: "".to_string(), rpc_user: "".to_string(),
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
Network::Signet => NetworkParams{ Network::Signet => NetworkParams {
host: "http://127.0.0.1".to_string(), host: "http://127.0.0.1".to_string(),
port: 18332, port: 18332,
dir_path: "signet/".to_string(), dir_path: "signet/".to_string(),
db_field: "signet".to_string(), db_field: "signet".to_string(),
cookie_file: "".to_string(), cookie_file: "".to_string(),
rpc_user: "".to_string(), rpc_user: "".to_string(),
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
Network::Regtest => NetworkParams{ Network::Regtest => NetworkParams {
host: "http://127.0.0.1".to_string(), host: "http://127.0.0.1".to_string(),
port: 18443, port: 18443,
dir_path: "regtest/".to_string(), dir_path: "regtest/".to_string(),
db_field: "regtest".to_string(), db_field: "regtest".to_string(),
cookie_file: "".to_string(), cookie_file: "".to_string(),
rpc_user: "".to_string(), rpc_user: "".to_string(),
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
_ => NetworkParams{ _ => NetworkParams {
host: "http://127.0.0.1".to_string(), host: "http://127.0.0.1".to_string(),
port: 8332, port: 8332,
dir_path: "".to_string(), dir_path: "".to_string(),
db_field: "bitcoin".to_string(), db_field: "bitcoin".to_string(),
cookie_file: "".to_string(), cookie_file: "".to_string(),
rpc_user: "".to_string(), rpc_user: "".to_string(),
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
} }
} }
fn get_cookie_filename(network: &NetworkParams) ->Result<String,Box<dyn StdError>>{ fn get_cookie_filename(network: &NetworkParams) -> Result<String, Box<dyn StdError>> {
if network.cookie_file !=""{ if network.cookie_file != "" {
Ok(network.cookie_file.clone()) Ok(network.cookie_file.clone())
}else{ } else {
match env::var_os("HOME") { match env::var_os("HOME") {
Some(home) => { Some(home) => match home.to_str() {
match home.to_str(){ Some(home_str) => {
Some(home_str) => { let cookie_file_path =
let cookie_file_path = format!("{}/.bitcoin/{}.cookie",home_str, network.dir_path); format!("{}/.bitcoin/{}.cookie", home_str, network.dir_path);
Ok(cookie_file_path) Ok(cookie_file_path)
},
None => Err("wrong HOME value".into())
} }
None => Err("wrong HOME value".into()),
}, },
None => Err("Please Set HOME environment variable".into()) None => Err("Please Set HOME environment variable".into()),
} }
} }
} }
fn get_client_from_username(url: &String, network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{ fn get_client_from_username(
url: &String,
network: &NetworkParams,
) -> Result<(Client, GetBlockchainInfoResult), Box<dyn StdError>> {
if network.rpc_user != "" { if network.rpc_user != "" {
match Client::new(&url[..],Auth::UserPass(network.rpc_user.to_string(),network.rpc_pass.to_string())){ match Client::new(
Ok(client) => match client.get_blockchain_info(){ &url[..],
Ok(bcinfo) => Ok((client,bcinfo)), Auth::UserPass(network.rpc_user.to_string(), network.rpc_pass.to_string()),
Err(err) => Err(err.into()) ) {
} Ok(client) => match client.get_blockchain_info() {
Err(err)=>Err(err.into()) Ok(bcinfo) => Ok((client, bcinfo)),
Err(err) => Err(err.into()),
},
Err(err) => Err(err.into()),
} }
}else{ } else {
Err("Failed".into()) Err("Failed".into())
} }
} }
fn get_client_from_cookie(url: &String,network: &NetworkParams)->Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{ fn get_client_from_cookie(
match get_cookie_filename(network){ url: &String,
Ok(cookie) => { network: &NetworkParams,
match Client::new(&url[..], Auth::CookieFile(cookie.into())) { ) -> Result<(Client, GetBlockchainInfoResult), Box<dyn StdError>> {
Ok(client) => { match get_cookie_filename(network) {
match client.get_blockchain_info(){ Ok(cookie) => match Client::new(&url[..], Auth::CookieFile(cookie.into())) {
Ok(bcinfo) => { Ok(client) => match client.get_blockchain_info() {
Ok((client,bcinfo)) Ok(bcinfo) => Ok((client, bcinfo)),
}, Err(err) => Err(err.into()),
Err(err) => { },
Err(err.into()) Err(err) => Err(err.into()),
}
}
},
Err(err)=>Err(err.into())
}
}, },
Err(err)=>Err(err.into()) Err(err) => Err(err.into()),
} }
} }
fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{ fn get_client(
let url = format!("{}:{}/",network.host,&network.port); network: &NetworkParams,
match get_client_from_username(&url,network){ ) -> Result<(Client, GetBlockchainInfoResult), Box<dyn StdError>> {
Ok(client) =>{Ok(client)}, let url = format!("{}:{}/", network.host, &network.port);
Err(_) =>{ match get_client_from_username(&url, network) {
match get_client_from_cookie(&url,&network){ Ok(client) => Ok(client),
Ok(client)=>{ Err(_) => match get_client_from_cookie(&url, &network) {
Ok(client) Ok(client) => Ok(client),
}, Err(err) => Err(err.into()),
Err(err)=> Err(err.into()) },
}
}
} }
} }
async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Error> { async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Error> {
/*let url = args.next().expect("Usage: <rpc_url> <username> <password>"); /*let url = args.next().expect("Usage: <rpc_url> <username> <password>");
let user = args.next().expect("no user given"); let user = args.next().expect("no user given");
let pass = args.next().expect("no pass given"); let pass = args.next().expect("no pass given");
*/ */
//let network = Network::Regtest //let network = Network::Regtest
match get_client(network_params){ match get_client(network_params) {
Ok((rpc,bcinfo)) => { Ok((rpc, bcinfo)) => {
info!("connected"); info!("connected");
//let best_block_hash = rpc.get_best_block_hash()?; //let best_block_hash = rpc.get_best_block_hash()?;
//info!("best block hash: {}", best_block_hash); //info!("best block hash: {}", best_block_hash);
@@ -220,41 +213,44 @@ async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(
// time_sum += <u32 as Into<u64>>::into(block.header.time); // time_sum += <u32 as Into<u64>>::into(block.header.time);
//} //}
//let average_time = time_sum/11; //let average_time = time_sum/11;
info!("median time: {}",bcinfo.median_time); info!("median time: {}", bcinfo.median_time);
//info!("height time: {}",bcinfo.median_time); //info!("height time: {}",bcinfo.median_time);
info!("blocks: {}",bcinfo.blocks); info!("blocks: {}", bcinfo.blocks);
debug!("best block hash: {}",bcinfo.best_block_hash); debug!("best block hash: {}", bcinfo.best_block_hash);
let average_time = bcinfo.median_time; let average_time = bcinfo.median_time;
let db = sqlite::open(&cfg.db_file).unwrap(); let db = sqlite::open(&cfg.db_file).unwrap();
info!("db open {}", &cfg.db_file);
let sqlquery = "SELECT * FROM tbl_tx WHERE network = :network AND status = :status AND ( locktime < :bestblock_height OR locktime > :locktime_threshold AND locktime < :bestblock_time);"; let sqlquery = "SELECT * FROM tbl_tx WHERE network = :network AND status = :status AND ( locktime < :bestblock_height OR locktime > :locktime_threshold AND locktime < :bestblock_time);";
let query_tx = db.prepare(sqlquery).unwrap().into_iter(); let query_tx = db.prepare(sqlquery).unwrap().into_iter();
trace!("query_tx: {}",sqlquery); trace!("query_tx: {}", sqlquery);
trace!(":locktime_threshold: {}", LOCKTIME_THRESHOLD ); trace!(":locktime_threshold: {}", LOCKTIME_THRESHOLD);
trace!(":bestblock_time: {}", average_time); trace!(":bestblock_time: {}", average_time);
trace!(":bestblock_height: {}", bcinfo.blocks); trace!(":bestblock_height: {}", bcinfo.blocks);
trace!(":network: {}", network_params.db_field.clone()); trace!(":network: {}", network_params.db_field.clone());
trace!(":status: {}", 0); trace!(":status: {}", 0);
//let query_tx = db.prepare("SELECT * FROM tbl_tx where status = :status").unwrap().into_iter(); //let query_tx = db.prepare("SELECT * FROM tbl_tx where status = :status").unwrap().into_iter();
let mut pushed_txs:Vec<String> = Vec::new(); let mut pushed_txs: Vec<String> = Vec::new();
let mut invalid_txs: std::collections::HashMap<String, String> = HashMap::new(); let mut invalid_txs: std::collections::HashMap<String, String> = HashMap::new();
for row in query_tx.bind::<&[(_, Value)]>(&[ for row in query_tx
(":locktime_threshold", (LOCKTIME_THRESHOLD as i64).into()), .bind::<&[(_, Value)]>(
(":bestblock_time", (average_time as i64).into()), &[
(":bestblock_height", (bcinfo.blocks as i64).into()), (":locktime_threshold", (LOCKTIME_THRESHOLD as i64).into()),
(":network", network_params.db_field.clone().into()), (":bestblock_time", (average_time as i64).into()),
(":status", 0.into()), (":bestblock_height", (bcinfo.blocks as i64).into()),
][..]) (":network", network_params.db_field.clone().into()),
.unwrap() (":status", 0.into()),
.map(|row| row.unwrap()) ][..],
)
.unwrap()
.map(|row| row.unwrap())
{ {
let tx = row.read::<&str, _>("tx"); let tx = row.read::<&str, _>("tx");
let txid = row.read::<&str, _>("txid"); let txid = row.read::<&str, _>("txid");
let locktime = row.read::<i64,_>("locktime"); let locktime = row.read::<i64, _>("locktime");
info!("to be pushed: {}: {}",txid, locktime); info!("to be pushed: {}: {}", txid, locktime);
match rpc.send_raw_transaction(tx){ match rpc.send_raw_transaction(tx) {
Ok(o) => { Ok(o) => {
/*let mut file = OpenOptions::new() /*let mut file = OpenOptions::new()
.append(true) // Set the append option .append(true) // Set the append option
@@ -264,9 +260,9 @@ async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(
file.write_all(data.as_bytes())?; file.write_all(data.as_bytes())?;
drop(file); drop(file);
*/ */
info!("tx: {} pusshata PUSHED\n{}",txid,o); info!("tx: {} pusshata PUSHED\n{}", txid, o);
pushed_txs.push(txid.to_string()); pushed_txs.push(txid.to_string());
}, }
Err(err) => { Err(err) => {
/*let mut file = OpenOptions::new() /*let mut file = OpenOptions::new()
.append(true) // Set the append option .append(true) // Set the append option
@@ -276,253 +272,363 @@ async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(
file.write_all(data.as_bytes())?; file.write_all(data.as_bytes())?;
drop(file); drop(file);
*/ */
warn!("Error: {}\n{}",err,txid); warn!("Error: {}\n{}", err, txid);
//store err in invalid_txs //store err in invalid_txs
invalid_txs.insert(txid.to_string(), err.to_string()); invalid_txs.insert(txid.to_string(), err.to_string());
}
},
}; };
} }
if pushed_txs.len() > 0 { if pushed_txs.len() > 0 {
let sql = format!("UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",pushed_txs.join("','")); let sql = format!(
trace!("sqlok: {}",&sql); "UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",
pushed_txs.join("','")
);
trace!("sqlok: {}", &sql);
let _ = db.execute(&sql); let _ = db.execute(&sql);
} }
if invalid_txs.len() > 0 { if invalid_txs.len() > 0 {
for (txid,txerr) in &invalid_txs{ for (txid, txerr) in &invalid_txs {
//let _ = db.execute(format!("UPDATE tbl_tx SET status = 2 WHERE txid in ('{}'Yp);",invalid_txs.join("','"))); //let _ = db.execute(format!("UPDATE tbl_tx SET status = 2 WHERE txid in ('{}'Yp);",invalid_txs.join("','")));
let sql = format!("UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'"); let sql = format!(
trace!("sqlerror: {}",&sql); "UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'"
);
trace!("sqlerror: {}", &sql);
let _ = db.execute(&sql); let _ = db.execute(&sql);
} }
} }
let _ = send_stats_report(cfg, bcinfo).await; let _ = send_stats_report(cfg, bcinfo).await;
let _ = calculate_stats(&db, network_params.db_field.clone()).await;
} }
Err(erx)=>{ Err(erx) => {
panic!("impossible to get client {}",erx) panic!("impossible to get client {}", erx)
} }
} }
Ok(()) Ok(())
} }
async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> Result<(),reqwest::Error>{ async fn calculate_stats(db: &Connection, chain: String) -> Result<(), reqwest::Error> {
//let sql = "drop table if exists tbl_stats;";
let sql = "DELETE FROM tbl_stats WHERE chain = '{chain}';";
if let Err(err) = db.execute(&sql) {
error!("error deleting from tbl_stats where chain:{chain} error: {err}");
}
let sql = format!(
"INSERT INTO tbl_stats (
report_date, chain, totals, waiting, sent, failed,
waiting_profit, sent_profit, missed_profit, unique_inputs
)
VALUES (
CURRENT_TIMESTAMP,
'{chain}',
(SELECT COUNT(*) FROM tbl_tx WHERE network = '{chain}'),
(SELECT COUNT(*) FROM tbl_tx WHERE status = 0 AND network = '{chain}'),
(SELECT COUNT(*) FROM tbl_tx WHERE status = 1 AND network = '{chain}'),
(SELECT COUNT(*) FROM tbl_tx WHERE status = 2 AND network = '{chain}'),
(SELECT IFNULL(SUM(our_fees),0) FROM tbl_tx WHERE status = 0 AND network = '{chain}'),
(SELECT IFNULL(SUM(our_fees),0) FROM tbl_tx WHERE status = 1 AND network = '{chain}'),
(SELECT IFNULL(SUM(our_fees),0) FROM tbl_tx WHERE status = 2 AND network = '{chain}'),
(SELECT COUNT(DISTINCT tbl_inp.in_txid)
FROM tbl_inp
JOIN tbl_tx ON tbl_inp.txid = tbl_tx.txid
WHERE tbl_tx.status = 0 AND tbl_tx.network = '{chain}')
)
ON CONFLICT(chain) DO UPDATE SET
report_date = excluded.report_date,
totals = excluded.totals,
waiting = excluded.waiting,
sent = excluded.sent,
failed = excluded.failed,
waiting_profit = excluded.waiting_profit,
sent_profit = excluded.sent_profit,
missed_profit = excluded.missed_profit,
unique_inputs = excluded.unique_inputs;
"
);
/*
let sql = format!("CREATE TABLE tbl_stats AS
SELECT
CURRENT_TIMESTAMP AS report_date,
'{chain}' as chain,
(SELECT COUNT(*) FROM tbl_tx WHERE network ='{chain}') AS totals,
(SELECT COUNT(*) FROM tbl_tx WHERE status = 0 AND network ='{chain}') AS waiting,
(SELECT COUNT(*) FROM tbl_tx WHERE status = 1 AND network ='{chain}') AS sent,
(SELECT COUNT(*) FROM tbl_tx WHERE status = 2 AND network ='{chain}') AS failed,
(SELECT SUM(our_fees) FROM tbl_tx WHERE status = 0 AND network ='{chain}') AS waiting_profit,
(SELECT SUM(our_fees) OR 0 FROM tbl_tx WHERE status = 1 AND network ='{chain}') AS sent_profit,
(SELECT SUM(our_fees) FROM tbl_tx WHERE status = 2 AND network ='{chain}') AS missed_profit,
(SELECT COUNT(*) FROM tbl_inp JOIN tbl_tx ON(tbl_inp.txid = tbl_tx.txid) WHERE tbl_tx.status=0 AND tbl_tx.network ='{chain}') AS unique_inputs;
");
let sql = "UPDATE tbl_stats set
totals = (SELECT COUNT(*) FROM tbl_tx WHERE network ='{chain}'),
waiting = (SELECT COUNT(*) FROM tbl_tx WHERE status = 0 AND network ='{chain}'),
sent = (SELECT COUNT(*) FROM tbl_tx WHERE status = 1 AND network ='{chain}'),
failed = (SELECT COUNT(*) FROM tbl_tx WHERE status = 1 AND network ='{chain}'),
waiting_profit = (SELECT SUM(our_fees) FROM tbl_tx WHERE status = 0 AND network ='{chain}'),
sent_profit = (SELECT SUM(our_fees) FROM tbl_tx WHERE status = 0 AND network ='{chain}'),
missed_profit = (SELECT SUM(our_fees) FROM tbl_tx WHERE status = 0 AND network ='{chain}')
unique_inputs = (SELECT COUNT(*) FROM tbl_inp JOIN tbl_tx ON(tbl_inp.txid = tbl_tx.txid) WHERE tbl_tx.status=0 AND tbl_tx.network ='{chain}')
WHERE chain = '{chain}'
*/
if let Err(err) = db.execute(&sql) {
error!("error inserting creating stats table {err}");
} else {
info!("tbl_stats creation success");
}
Ok(())
}
async fn send_stats_report(
cfg: &MyConfig,
bcinfo: GetBlockchainInfoResult,
) -> Result<(), reqwest::Error> {
if cfg.send_stats { if cfg.send_stats {
debug!("sending report to welist"); debug!("sending report to welist");
let welist_url=env::var("WELIST_SERVER_URL").unwrap_or("https://welist.bitcoin-after.life".to_string()); let welist_url = env::var("WELIST_SERVER_URL")
.unwrap_or("https://welist.bitcoin-after.life".to_string());
let client = rClient::new(); let client = rClient::new();
let url = format!("{}/ping",welist_url); let url = format!("{}/ping", welist_url);
let chain=bcinfo.chain.to_string().to_lowercase(); debug!("welist url: {}", url);
let message = format!("{0}{1}{2}{3}{4}",cfg.url,chain,bcinfo.blocks,bcinfo.median_time,bcinfo.best_block_hash); let chain = bcinfo.chain.to_string().to_lowercase();
let sign = sign_message(cfg.ssl_key_path.as_str(),&message.as_str()); let message = format!(
let response = client.post(url) "{0}{1}{2}{3}{4}",
.header("User-Agent", format!("bal-pusher/{}",VERSION)) cfg.url, chain, bcinfo.blocks, bcinfo.median_time, bcinfo.best_block_hash
);
trace!("message to be sent: {}", message);
let sign = sign_message(cfg.ssl_key_path.as_str(), &message.as_str());
let response = client
.post(url)
.header("User-Agent", format!("bal-pusher/{}", VERSION))
.json(&json!( .json(&json!(
{ {
"url": cfg.url, "url": cfg.url,
"chain": chain, "chain": chain,
"height": bcinfo.blocks, "height": bcinfo.blocks,
"median_time": bcinfo.median_time, "median_time": bcinfo.median_time,
"last_block_hash": bcinfo.best_block_hash, "last_block_hash": bcinfo.best_block_hash,
"signature": sign, "signature": sign,
})) }))
.send().await?; .send()
.await?;
if !response.status().is_success() {
warn!(
"Non-success response: {} {}",
response.status(),
response.status().canonical_reason().unwrap_or("")
);
}
let body = &(response.text().await?); let body = &(response.text().await?);
trace!("Body: {}", body); info!("Report to welist({})\tSent: {}", welist_url, body);
}else { } else {
debug!("Not sending stats"); debug!("Not sending stats");
} }
Ok(()) Ok(())
} }
fn sign_message(private_key_path: &str, message: &str) -> String { fn sign_message(private_key_path: &str, message: &str) -> String {
let key_data = fs::read(private_key_path).unwrap(); let key_data = fs::read(private_key_path).unwrap();
let private_key = PKey::private_key_from_pem(&key_data).unwrap(); let private_key = PKey::private_key_from_pem(&key_data).unwrap();
let mut signer = Signer::new_without_digest(&private_key).unwrap(); let mut signer = Signer::new_without_digest(&private_key).unwrap();
let signature = signer.sign_oneshot_to_vec(message.as_bytes()).unwrap(); let signature = signer.sign_oneshot_to_vec(message.as_bytes()).unwrap();
let signature_b64 = general_purpose::STANDARD.encode(&signature); let signature_b64 = general_purpose::STANDARD.encode(&signature);
signature_b64 signature_b64
} }
fn parse_env(cfg: &mut MyConfig){ fn parse_env(cfg: &mut MyConfig) {
match env::var("BAL_PUSHER_ZMQ_LISTENER") { cfg.regtest = parse_env_netconfig(cfg, "regtest");
Ok(value) => { cfg.signet = parse_env_netconfig(cfg, "signet");
cfg.zmq_listener = value;}, cfg.testnet = parse_env_netconfig(cfg, "testnet");
Err(_) => {}, drop(parse_env_netconfig(cfg, "bitcoin"));
}
match env::var("BAL_PUSHER_REQUEST_FILE") {
Ok(value) => {
cfg.requests_file = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_DB_FILE") {
Ok(value) => {
cfg.db_file = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_BITCOIN_DIR") {
Ok(value) => {
cfg.bitcoin_dir = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_SEND_STATS") {
Ok(value) => {
cfg.send_stats = value.parse::<bool>().unwrap();
},
Err(_) => {},
}
match env::var("BAL_SERVER_URL") {
Ok(value) => {
cfg.url= value;},
Err(_) => {},
}
match env::var("WELIST_SECRET_CODE") {
Ok(value) => {
cfg.secret_code = value;},
Err(_) => {},
}
match env::var("SSL_KEY_PATH") {
Ok(value) => {
cfg.ssl_key_path = value;},
Err(_) => {},
}
cfg.regtest = parse_env_netconfig(cfg,"regtest");
cfg.signet = parse_env_netconfig(cfg,"signet");
cfg.testnet = parse_env_netconfig(cfg,"testnet");
drop(parse_env_netconfig(cfg,"bitcoin"));
} }
fn parse_env_netconfig(cfg_lock: &mut MyConfig, chain: &str) -> NetworkParams{ fn parse_env_netconfig(cfg_lock: &mut MyConfig, chain: &str) -> NetworkParams {
//fn parse_env_netconfig(cfg_lock: &MutexGuard<MyConfig>, chain: &str) -> &NetworkParams{ //fn parse_env_netconfig(cfg_lock: &MutexGuard<MyConfig>, chain: &str) -> &NetworkParams{
let cfg = match chain{ let cfg = match chain {
"regtest" => &mut cfg_lock.regtest, "regtest" => &mut cfg_lock.regtest,
"signet" => &mut cfg_lock.signet, "signet" => &mut cfg_lock.signet,
"testnet" => &mut cfg_lock.testnet, "testnet" => &mut cfg_lock.testnet,
&_ => &mut cfg_lock.mainnet, &_ => &mut cfg_lock.mainnet,
}; };
match env::var(format!("BAL_PUSHER_{}_HOST",chain.to_uppercase())) { match env::var(format!("BAL_PUSHER_{}_HOST", chain.to_uppercase())) {
Ok(value) => { cfg.host= value; },
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_PORT",chain.to_uppercase())) {
Ok(value) => { Ok(value) => {
match value.parse::<u64>(){ cfg.host = value;
Ok(value) =>{ cfg.port = value.try_into().unwrap(); },
Err(_) => {},
}
} }
Err(_) => {}, Err(_) => {}
} }
match env::var(format!("BAL_PUSHER_{}_DIR_PATH",chain.to_uppercase())) { match env::var(format!("BAL_PUSHER_{}_PORT", chain.to_uppercase())) {
Ok(value) => { cfg.dir_path = value; }, Ok(value) => match value.parse::<u64>() {
Err(_) => {}, Ok(value) => {
cfg.port = value.try_into().unwrap();
}
Err(_) => {}
},
Err(_) => {}
} }
match env::var(format!("BAL_PUSHER_{}_DB_FIELD",chain.to_uppercase())) { match env::var(format!("BAL_PUSHER_{}_DIR_PATH", chain.to_uppercase())) {
Ok(value) => { cfg.db_field = value; },
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_COOKIE_FILE",chain.to_uppercase())) {
Ok(value) => { Ok(value) => {
cfg.cookie_file = value; }, cfg.dir_path = value;
Err(_) => {}, }
Err(_) => {}
} }
match env::var(format!("BAL_PUSHER_{}_RPC_USER",chain.to_uppercase())) { match env::var(format!("BAL_PUSHER_{}_DB_FIELD", chain.to_uppercase())) {
Ok(value) => { cfg.rpc_user = value; }, Ok(value) => {
Err(_) => {}, cfg.db_field = value;
}
Err(_) => {}
} }
match env::var(format!("BAL_PUSHER_{}_RPC_PASSWORD",chain.to_uppercase())) { match env::var(format!("BAL_PUSHER_{}_COOKIE_FILE", chain.to_uppercase())) {
Ok(value) => { cfg.rpc_pass = value; }, Ok(value) => {
Err(_) => {}, cfg.cookie_file = value;
}
Err(_) => {}
}
match env::var(format!("BAL_PUSHER_{}_RPC_USER", chain.to_uppercase())) {
Ok(value) => {
cfg.rpc_user = value;
}
Err(_) => {}
}
match env::var(format!("BAL_PUSHER_{}_RPC_PASSWORD", chain.to_uppercase())) {
Ok(value) => {
cfg.rpc_pass = value;
}
Err(_) => {}
} }
cfg.clone() cfg.clone()
} }
fn get_default_config()-> MyConfig { fn check_zmq_connection(endpoint: &str) -> bool {
let file = confy::get_configuration_file_path("bal-pusher",None).expect("Error while getting path"); trace!("check zmq connection");
info!("Default configuration file path is: {:#?}", file); let context = Context::new();
confy::load("bal-pusher",None).expect("cant_load") let socket = match context.socket(DEALER) {
} Ok(sock) => sock,
#[tokio::main] Err(_) => return false,
async fn main()-> std::io::Result<()>{
env_logger::init();
let mut cfg: MyConfig = match env::var("BAL_PUSHER_CONFIG_FILE") {
Ok(value) => {
match confy::load_path(&value){
Ok(val) => {
info!("The configuration file path is: {:#?}", value);
val
},
Err(err) => {
error!("{}",err);
get_default_config()
}
}
},
Err(_) => {
get_default_config()
},
}; };
if socket.connect(endpoint).is_err() {
return false;
}
// Try to send an empty message non-blocking
socket.send("", DONTWAIT).is_ok()
}
// Add this struct to monitor connection health
struct ConnectionMonitor {
last_message_time: Instant,
timeout: Duration,
consecutive_timeouts: u32,
max_consecutive_timeouts: u32,
}
impl ConnectionMonitor {
fn new(timeout_secs: u64, max_timeouts: u32) -> Self {
Self {
last_message_time: Instant::now(),
timeout: Duration::from_secs(timeout_secs),
consecutive_timeouts: 0,
max_consecutive_timeouts: max_timeouts,
}
}
fn update(&mut self) {
self.last_message_time = Instant::now();
self.consecutive_timeouts = 0;
}
fn check_connection(&mut self) -> ConnectionStatus {
let elapsed = self.last_message_time.elapsed();
if elapsed > self.timeout {
self.consecutive_timeouts += 1;
if self.consecutive_timeouts >= self.max_consecutive_timeouts {
ConnectionStatus::Lost(elapsed)
} else {
ConnectionStatus::Warning(elapsed)
}
} else {
ConnectionStatus::Healthy
}
}
fn reset(&mut self) {
self.consecutive_timeouts = 0;
self.last_message_time = Instant::now();
}
}
enum ConnectionStatus {
Healthy,
Warning(Duration),
Lost(Duration),
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
env_logger::init();
let mut cfg = MyConfig::default();
let dbfile = env::var("BAL_PUSHER_DB_FILE").unwrap();
parse_env(&mut cfg); parse_env(&mut cfg);
let mut args = std::env::args(); let mut args = std::env::args();
let _exe_name = args.next().unwrap(); let _exe_name = args.next().unwrap();
let arg_network = match args.next(){ let arg_network = match args.next() {
Some(nargs) => nargs, Some(nargs) => nargs,
None => "bitcoin".to_string() None => "bitcoin".to_string(),
}; };
let network = match arg_network.as_str(){ let network = match arg_network.as_str() {
"testnet" => Network::Testnet, "testnet" => Network::Testnet,
"signet" => Network::Signet, "signet" => Network::Signet,
"regtest" => Network::Regtest, "regtest" => Network::Regtest,
_ => Network::Bitcoin, _ => Network::Bitcoin,
}; };
info!("Network: {}", arg_network);
info!("Network: {}",arg_network); let network_params = get_network_params(&cfg, network);
let network_params = get_network_params(&cfg,network);
let context = Context::new(); let context = Context::new();
let socket: Socket = context.socket(zmq::SUB).unwrap(); let socket: Socket = context.socket(zmq::SUB).unwrap();
let zmq_address = cfg.zmq_listener.clone(); let zmq_address = cfg.zmq_listener.clone();
info!("zmq listening on: {}",zmq_address); info!("zmq listening on: {}", zmq_address);
socket.connect(&zmq_address).unwrap(); socket.connect(&zmq_address).unwrap();
socket.set_subscribe(b"").unwrap(); socket.set_subscribe(b"").unwrap();
let _ = main_result(&cfg,network_params).await; let _ = main_result(&cfg, network_params).await;
info!("waiting new blocks.."); info!("waiting new blocks..");
let mut last_seq:Vec<u8>=[0;4].to_vec(); let mut last_seq: Vec<u8> = [0; 4].to_vec();
let mut counter = 0;
let max = 100;
loop { loop {
let message = socket.recv_multipart(0).unwrap(); let message = socket.recv_multipart(0).unwrap();
let topic = message[0].clone(); let topic = message[0].clone();
let body = message[1].clone(); let body = message[1].clone();
let seq = message[2].clone(); let seq = message[2].clone();
if last_seq >= seq {
continue
}
last_seq = seq; last_seq = seq;
//let mut sequence_str = "Unknown".to_string(); debug!(
/*if seq.len()==4{ "ZMQ:GET TOPIC: {}",
let mut rdr = Cursor::new(seq); String::from_utf8(topic.clone()).expect("invalid topic")
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer"); );
sequence_str = sequence.to_string();
}*/
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
trace!("ZMQ:GET BODY: {}", hex::encode(&body)); trace!("ZMQ:GET BODY: {}", hex::encode(&body));
if topic == b"hashblock" { if topic == b"hashblock" {
info!("NEW BLOCK: {}", hex::encode(&body)); info!("NEW BLOCK: {}", hex::encode(&body));
//let cfg = cfg.clone(); let _ = main_result(&cfg, network_params).await;
let _ = main_result(&cfg,network_params).await;
} }
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
} }
} }
fn seq_to_str(seq: &Vec<u8>) -> String {
if seq.len() == 4 {
let mut rdr = Cursor::new(seq);
let sequence = rdr
.read_u32::<LittleEndian>()
.expect("Failed to read integer");
return sequence.to_string();
}
"Unknown".to_string()
}

View File

@@ -1,42 +1,39 @@
use bytes::Bytes; use bytes::Bytes;
use http_body_util::{ combinators::BoxBody, BodyExt, Empty, Full }; use http_body_util::{BodyExt, Empty, Full, combinators::BoxBody};
use hyper::server::conn::http1; use hyper::server::conn::http1;
use hyper::service::service_fn; use hyper::service::service_fn;
use hyper::{ Method, Request, Response, StatusCode }; use hyper::{Method, Request, Response, StatusCode};
use tokio::net::TcpListener;
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use std::net::IpAddr;
use std::env; use std::env;
use std::net::IpAddr;
//use std::time::{SystemTime,UNIX_EPOCH}; //use std::time::{SystemTime,UNIX_EPOCH};
use std::fs; use std::fs;
use std::sync::{ Arc, Mutex, MutexGuard }; use std::sync::{Arc, Mutex, MutexGuard};
//use std::net::SocketAddr; //use std::net::SocketAddr;
use sqlite::{Connection, State, Value};
use std::collections::HashMap; use std::collections::HashMap;
use sqlite::{ State, Value, Connection };
use bitcoin::{ consensus, Transaction, Network }; use bitcoin::{Network, Transaction, consensus};
use hex_conservative::FromHex;
use regex::Regex;
use serde::{ Serialize, Deserialize};
use log::{ info, error, trace, debug};
use serde_json;
use chrono::Utc; use chrono::Utc;
use hex_conservative::FromHex;
use log::{debug, error, info, trace};
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json;
#[path = "../db.rs"] use bal_server::db::{
mod db; create_database, execute_insert, get_last_used_address_by_ip, get_next_address_index,
use crate::db::{ create_database, get_next_address_index, insert_xpub, save_new_address, get_last_used_address_by_ip, execute_insert }; insert_xpub, save_new_address,
};
use bal_server::xpub::new_address_from_xpub;
const VERSION: &str = env!("CARGO_PKG_VERSION");
#[path = "../xpub.rs"] const NETWORKS: [&str; 4] = ["bitcoin", "testnet", "signet", "regtest"];
mod xpub; #[derive(Debug, Clone, Serialize, Deserialize)]
use crate::xpub::new_address_from_xpub;
const VERSION:&str="0.2.1";
const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"];
#[derive(Debug, Clone,Serialize, Deserialize)]
struct NetConfig { struct NetConfig {
address: String, address: String,
fixed_fee: u64, fixed_fee: u64,
@@ -47,58 +44,75 @@ struct NetConfig {
} }
impl NetConfig { impl NetConfig {
fn default_network(name:String, network: Network) -> Self { fn default_network(name: String, network: Network) -> Self {
NetConfig { NetConfig {
address: "".to_string(), address: "".to_string(),
fixed_fee: 50000, fixed_fee: 50000,
xpub: false, xpub: false,
name, name,
network, network,
enabled: false, enabled: false,
} }
} }
} }
#[derive(Debug, Serialize, Deserialize,Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
struct MyConfig { struct MyConfig {
regtest: NetConfig, regtest: NetConfig,
signet: NetConfig, signet: NetConfig,
testnet: NetConfig, testnet: NetConfig,
mainnet: NetConfig, mainnet: NetConfig,
info: String, info: String,
bind_address: String, bind_address: String,
bind_port: u16, // Changed to u16 for port numbers bind_port: u16, // Changed to u16 for port numbers
db_file: String, db_file: String,
pub_key_path: String, pub_key_path: String,
expose_stats: bool,
} }
#[derive(Debug,Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct Info { pub struct InfoResponse {
pub address: String, pub address: String,
pub base_fee: u64, pub base_fee: u64,
pub chain: String, pub chain: String,
pub info: String, pub info: String,
pub version: String pub version: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StatsResponse {
pub report_date: String,
pub chain: String,
pub totals: i64,
pub waiting: i64,
pub sent: i64,
pub failed: i64,
pub waiting_profit: i64,
pub sent_profit: i64,
pub missed_profit: i64,
pub unique_inputs: i64,
} }
impl Default for MyConfig { impl Default for MyConfig {
fn default() -> Self { fn default() -> Self {
MyConfig { MyConfig {
regtest: NetConfig::default_network("regtest".to_string(), Network::Regtest), regtest: NetConfig::default_network("regtest".to_string(), Network::Regtest),
signet: NetConfig::default_network("signet".to_string(), Network::Signet), signet: NetConfig::default_network("signet".to_string(), Network::Signet),
testnet: NetConfig::default_network("testnet".to_string(), Network::Testnet), testnet: NetConfig::default_network("testnet".to_string(), Network::Testnet),
mainnet: NetConfig::default_network("bitcoin".to_string(), Network::Bitcoin), mainnet: NetConfig::default_network("bitcoin".to_string(), Network::Bitcoin),
bind_address: "127.0.0.1".to_string(), bind_address: "127.0.0.1".to_string(),
bind_port: 9137, bind_port: 9137,
db_file: "bal.db".to_string(), db_file: "bal.db".to_string(),
info: "Will Executor Server".to_string(), info: "Will Executor Server".to_string(),
pub_key_path: "public_key.pem".to_string(), pub_key_path: "public_key.pem".to_string(),
expose_stats: env::var("BAL_SERVER_EXPOSE_STATS")
.unwrap_or("false".to_string())
.parse::<bool>()
.unwrap(),
} }
} }
} }
impl MyConfig { impl MyConfig {
fn get_net_config(&self, param: &str) -> &NetConfig{ fn get_net_config(&self, param: &str) -> &NetConfig {
match param { match param {
"regtest" => &self.regtest, "regtest" => &self.regtest,
"testnet" => &self.testnet, "testnet" => &self.testnet,
@@ -108,80 +122,161 @@ impl MyConfig {
} }
} }
async fn echo_version( async fn echo_version() -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
Ok(Response::new(full(VERSION))) Ok(Response::new(full(VERSION)))
} }
async fn echo_home(cfg: &MyConfig) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
debug!("echo_home: {}", cfg.info);
Ok(Response::new(full(cfg.info.clone())))
}
async fn echo_pub_key( async fn echo_pub_key(
cfg: &MyConfig, cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let pub_key = fs::read_to_string(&cfg.pub_key_path) let pub_key = fs::read_to_string(&cfg.pub_key_path)
.expect(format!("Failed to read public key file {}",cfg.pub_key_path).as_str()); .expect(format!("Failed to read public key file {}", cfg.pub_key_path).as_str());
Ok(Response::new(full(pub_key))) Ok(Response::new(full(pub_key)))
} }
async fn echo_info( async fn echo_stats(
param: &str, param: &str,
cfg: &MyConfig, cfg: &MyConfig,
remote_addr: String,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo info!!!{}",param); info!("echo stats!!! {} - {}", param, cfg.expose_stats);
let netconfig=MyConfig::get_net_config(cfg,param); let netconfig = MyConfig::get_net_config(cfg, param);
if !netconfig.enabled { if !netconfig.enabled {
debug!("network disabled {}",param); debug!("network disabled {}", param);
return Ok(Response::new(full("network disabled"))); return Ok(Response::new(full("network disabled")));
}
let sql = format!(
"SELECT
report_date,
chain,
totals,
waiting,
sent,
failed,
waiting_profit,
sent_profit,
missed_profit,
unique_inputs FROM tbl_stats where chain = '{}'
",
netconfig.name
);
let mut stats: Vec<StatsResponse> = vec![];
let db = sqlite::open(&cfg.db_file).unwrap();
let _ = db.iterate(&sql, |pairs| {
let row: HashMap<_, _> = pairs
.into_iter()
.map(|(k, v)| (k.to_string(), v.map(|s| s)))
.collect();
//let row:HashMap<_,_>= pairs.into_iter().collect();
println!("row report date {}", row["report_date"].clone().unwrap());
dbg!(&row);
stats.push(StatsResponse {
report_date: row["report_date"].clone().unwrap().to_string(),
chain: row["chain"].clone().unwrap().to_string(),
totals: row["totals"].clone().unwrap().parse::<i64>().unwrap(),
waiting: row["waiting"].clone().unwrap().parse::<i64>().unwrap(),
sent: row["sent"].clone().unwrap().parse::<i64>().unwrap(),
failed: row["failed"].clone().unwrap().parse::<i64>().unwrap(),
waiting_profit: row["waiting_profit"]
.clone()
.unwrap()
.parse::<i64>()
.unwrap(),
sent_profit: row["sent_profit"].clone().unwrap().parse::<i64>().unwrap(),
missed_profit: row["missed_profit"]
.clone()
.unwrap()
.parse::<i64>()
.unwrap(),
unique_inputs: row["unique_inputs"]
.clone()
.unwrap()
.parse::<i64>()
.unwrap(),
});
true
});
match serde_json::to_string(&stats) {
Ok(json_data) => {
debug!("echo info reply: {}", json_data);
return Ok(Response::new(full(json_data)));
} }
let address = match netconfig.xpub{ Err(err) => Ok(Response::new(full(format!("error:{}", err)))),
false => { }
let address = netconfig.address.to_string(); }
trace!("is address: {}",&address);
address async fn echo_info(
}, param: &str,
true => { cfg: &MyConfig,
let db = sqlite::open(&cfg.db_file).unwrap(); remote_addr: &String,
match get_last_used_address_by_ip(&db,&netconfig.name,&netconfig.address,&remote_addr){ ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
Some(address)=>address, info!("echo info!!!{}", param);
None => { let netconfig = MyConfig::get_net_config(cfg, param);
let next = get_next_address_index(&db,&netconfig.name,&netconfig.address); if !netconfig.enabled {
let address = new_address_from_xpub(&netconfig.address,next.1,netconfig.network).unwrap(); debug!("network disabled {}", param);
save_new_address(&db,next.0,&address.0,&address.1,&remote_addr); return Ok(Response::new(full("network disabled")));
debug!("save new address {} {}",address.0,address.1); }
trace!("next {} {}",next.0,next.1); let address = match netconfig.xpub {
address.0 false => {
} let address = netconfig.address.to_string();
trace!("is address: {}", &address);
address
}
true => {
let db = sqlite::open(&cfg.db_file).unwrap();
match get_last_used_address_by_ip(
&db,
&netconfig.name,
&netconfig.address,
&remote_addr,
) {
Some(address) => address,
None => {
let next = get_next_address_index(&db, &netconfig.name, &netconfig.address);
let address =
new_address_from_xpub(&netconfig.address, next.1, netconfig.network)
.unwrap();
save_new_address(&db, next.0, &address.0, &address.1, &remote_addr);
debug!("save new address {} {}", address.0, address.1);
trace!("next {} {}", next.0, next.1);
address.0
} }
} }
};
let info = Info{
address,
base_fee: netconfig.fixed_fee,
chain: netconfig.network.to_string(),
info: cfg.info.to_string(),
version: VERSION.to_string()
};
trace!("address: {:#?}",info);
match serde_json::to_string(&info){
Ok(json_data) => {
debug!("echo info reply: {}", json_data);
return Ok(Response::new(full(json_data)));
},
Err(err) => Ok(Response::new(full(format!("error:{}",err))))
} }
};
let info = InfoResponse {
address,
base_fee: netconfig.fixed_fee,
chain: netconfig.network.to_string(),
info: cfg.info.to_string(),
version: VERSION.to_string(),
};
trace!("address: {:#?}", info);
match serde_json::to_string(&info) {
Ok(json_data) => {
debug!("echo info reply: {}", json_data);
return Ok(Response::new(full(json_data)));
}
Err(err) => Ok(Response::new(full(format!("error:{}", err)))),
}
} }
async fn echo_search(whole_body: &Bytes, async fn echo_search(
cfg: &MyConfig, whole_body: &Bytes,
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo search!!!"); info!("echo search!!!");
let strbody = std::str::from_utf8(whole_body).unwrap(); let strbody = std::str::from_utf8(whole_body).unwrap();
info!("{}",strbody); info!("{}", strbody);
let mut response = Response::new(full("Bad data received".to_owned())); let mut response = Response::new(full("Bad data received".to_owned()));
*response.status_mut() = StatusCode::BAD_REQUEST; *response.status_mut() = StatusCode::BAD_REQUEST;
if !strbody.is_empty() && strbody.len()<=70 { if !strbody.is_empty() && strbody.len() <= 70 {
let db = sqlite::open(&cfg.db_file).unwrap(); let db = sqlite::open(&cfg.db_file).unwrap();
let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ? LIMIT 1").unwrap(); let mut statement = db
.prepare("SELECT * FROM tbl_tx WHERE txid = ? LIMIT 1")
.unwrap();
statement.bind((1, strbody)).unwrap(); statement.bind((1, strbody)).unwrap();
if let Ok(State::Row) = statement.next() { if let Ok(State::Row) = statement.next() {
@@ -232,30 +327,31 @@ async fn echo_search(whole_body: &Bytes,
None None
} }
}; };
response = match serde_json::to_string(&response_data){ response = match serde_json::to_string(&response_data) {
Ok(json_data) => Response::new(full(json_data)), Ok(json_data) => Response::new(full(json_data)),
Err(_) => { response } Err(_) => response,
}; };
return Ok(response); return Ok(response);
} }
} }
Ok(response) Ok(response)
} }
async fn echo_push(whole_body: &Bytes, async fn echo_push(
cfg: &MyConfig, whole_body: &Bytes,
param: &str, cfg: &MyConfig,
param: &str,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
//let whole_body = req.collect().await?.to_bytes(); //let whole_body = req.collect().await?.to_bytes();
trace!("echo_push");
let strbody = std::str::from_utf8(whole_body).unwrap(); let strbody = std::str::from_utf8(whole_body).unwrap();
let mut response = Response::new(full("Bad data received".to_owned())); let mut response = Response::new(full("Bad data received".to_owned()));
let mut response_not_enable = Response::new(full("Network not enabled".to_owned())); let mut response_not_enable = Response::new(full("Network not enabled".to_owned()));
*response.status_mut() = StatusCode::BAD_REQUEST; *response.status_mut() = StatusCode::BAD_REQUEST;
*response_not_enable.status_mut()=StatusCode::BAD_REQUEST; *response_not_enable.status_mut() = StatusCode::BAD_REQUEST;
let netconfig = MyConfig::get_net_config(cfg,param); let netconfig = MyConfig::get_net_config(cfg, param);
if !netconfig.enabled{ if !netconfig.enabled {
trace!("network not enabled {}", &netconfig.name);
return Ok(response_not_enable); return Ok(response_not_enable);
} }
let req_time = Utc::now().timestamp_nanos_opt().unwrap(); // Returns i64 let req_time = Utc::now().timestamp_nanos_opt().unwrap(); // Returns i64
@@ -273,75 +369,86 @@ async fn echo_push(whole_body: &Bytes,
let mut union_inps = true; let mut union_inps = true;
let mut union_outs = true; let mut union_outs = true;
let mut already_present = false; let mut already_present = false;
let mut ptx:Vec<(usize, Value)> = vec![]; let mut ptx: Vec<(usize, Value)> = vec![];
let mut pinps:Vec<(usize, Value)> = vec![]; let mut pinps: Vec<(usize, Value)> = vec![];
let mut pouts:Vec<(usize, Value)> = vec![]; let mut pouts: Vec<(usize, Value)> = vec![];
let mut linenum = 1; let mut linenum = 1;
let mut lineinp = 1; let mut lineinp = 1;
let mut lineout = 1; let mut lineout = 1;
for line in lines { for line in lines {
if line.is_empty(){ if line.is_empty() {
trace!("line len is: {}",line.len()); trace!("line len is: {}", line.len());
continue continue;
} }
let linea = format!("{req_time}:{line}"); let linea = format!("{req_time}:{line}");
info!("New Tx: {}", linea); info!("New Tx: {}", linea);
let raw_tx = match Vec::<u8>::from_hex(line) { let raw_tx = match Vec::<u8>::from_hex(line) {
Ok(raw_tx) => raw_tx, Ok(raw_tx) => raw_tx,
Err(err) => { Err(err) => {
error!("rawtx error: {}",err); error!("rawtx error: {}", err);
continue continue;
} }
}; };
if !raw_tx.is_empty(){ if !raw_tx.is_empty() {
trace!("len: {}",raw_tx.len()); trace!("len: {}", raw_tx.len());
let tx: Transaction = match consensus::deserialize(&raw_tx){ let tx: Transaction = match consensus::deserialize(&raw_tx) {
Ok(tx) => tx, Ok(tx) => tx,
Err(err) => {error!("error: unable to parse tx: {}\n{}",line,err);continue} Err(err) => {
error!("error: unable to parse tx: {}\n{}", line, err);
continue;
}
}; };
let txid = tx.compute_txid().to_string(); let txid = tx.compute_txid().to_string();
trace!("txid: {}",txid); trace!("txid: {}", txid);
let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ?").unwrap(); let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ?").unwrap();
statement.bind((1,&txid[..])).unwrap(); statement.bind((1, &txid[..])).unwrap();
if let Ok(State::Row) = statement.next() { if let Ok(State::Row) = statement.next() {
trace!("already present"); trace!("already present");
already_present=true; already_present = true;
continue; continue;
} }
let ntxid = tx.compute_ntxid(); let ntxid = tx.compute_ntxid();
let wtxid = tx.compute_wtxid(); let wtxid = tx.compute_wtxid();
let mut found = false; let mut found = false;
let locktime = tx.lock_time; let locktime = tx.lock_time;
let mut our_address:String = "".to_string(); let mut our_address: String = "".to_string();
let mut our_fees:u64 = 0; let mut our_fees: u64 = 0;
for input in tx.input{ for input in tx.input {
if !union_inps { if !union_inps {
sqlinps = format!("{sqlinps} UNION ALL"); sqlinps = format!("{sqlinps} UNION ALL");
}else{ } else {
union_inps = false; union_inps = false;
} }
sqlinps = format!("{sqlinps} SELECT ?, ?, ?"); sqlinps = format!("{sqlinps} SELECT ?, ?, ?");
pinps.push((lineinp,Value::String(txid.to_string()))); pinps.push((lineinp, Value::String(txid.to_string())));
pinps.push((lineinp+1,Value::String(input.previous_output.txid.to_string()))); pinps.push((
pinps.push((lineinp+2,Value::String(input.previous_output.vout.to_string()))); lineinp + 1,
Value::String(input.previous_output.txid.to_string()),
));
pinps.push((
lineinp + 2,
Value::String(input.previous_output.vout.to_string()),
));
lineinp += 3; lineinp += 3;
} }
if netconfig.fixed_fee ==0 { if netconfig.fixed_fee == 0 {
found = true; found = true;
} }
for (idx,output) in tx.output.into_iter().enumerate(){ for (idx, output) in tx.output.into_iter().enumerate() {
let script_pubkey = output.script_pubkey; let script_pubkey = output.script_pubkey;
let address = match bitcoin::Address::from_script(script_pubkey.as_script(), netconfig.network){ let address = match bitcoin::Address::from_script(
script_pubkey.as_script(),
netconfig.network,
) {
Ok(address) => address.to_string(), Ok(address) => address.to_string(),
Err(_) => String::new(), Err(_) => String::new(),
}; };
let amount = output.value; let amount = output.value;
our_fees = netconfig.fixed_fee;//search wllexecutor output our_fees = netconfig.fixed_fee; //search wllexecutor output
if netconfig.xpub{ if netconfig.xpub {
let sql="select * from tbl_address where address=?"; let sql = "select * from tbl_address where address=?";
let mut stmt = db.prepare(sql).expect("failed to fetch addresses"); let mut stmt = db.prepare(sql).expect("failed to fetch addresses");
stmt.bind((1,Value::String(address.to_string()))).unwrap(); stmt.bind((1, Value::String(address.to_string()))).unwrap();
if let Ok(State::Row) = stmt.next() { if let Ok(State::Row) = stmt.next() {
our_address = address.to_string(); our_address = address.to_string();
} }
@@ -350,56 +457,59 @@ async fn echo_push(whole_body: &Bytes,
} }
if address == our_address && amount.to_sat() >= netconfig.fixed_fee { if address == our_address && amount.to_sat() >= netconfig.fixed_fee {
our_fees = amount.to_sat(); our_fees = amount.to_sat();
our_address = netconfig.address.to_string(); //our_address = netconfig.address.to_string();
found = true; found = true;
trace!("address and fees are correct {}: {}",our_address,our_fees); trace!("address and fees are correct {}: {}", our_address, our_fees);
} }
if !union_outs { if !union_outs {
sqlouts = format!("{sqlouts} UNION ALL"); sqlouts = format!("{sqlouts} UNION ALL");
}else{ } else {
union_outs = false; union_outs = false;
} }
sqlouts = format!("{sqlouts} SELECT ?, ?, ?, ?"); sqlouts = format!("{sqlouts} SELECT ?, ?, ?, ?");
pouts.push((lineout,Value::String(txid.to_string()))); pouts.push((lineout, Value::String(txid.to_string())));
pouts.push((lineout+1,Value::Integer(idx.try_into().unwrap()))); pouts.push((lineout + 1, Value::Integer(idx.try_into().unwrap())));
pouts.push((lineout+2,Value::String(script_pubkey.to_string()))); pouts.push((lineout + 2, Value::String(script_pubkey.to_string())));
pouts.push((lineout+3,Value::Integer(amount.to_sat().try_into().unwrap()))); pouts.push((
lineout + 3,
Value::Integer(amount.to_sat().try_into().unwrap()),
));
lineout += 4; lineout += 4;
} }
if !found { if !found {
error!("willexecutor output not found "); error!("willexecutor output not found ");
return Ok(response) return Ok(response);
} else { } else {
if !union_tx { if !union_tx {
sqltxs = format!("{sqltxs} UNION ALL"); sqltxs = format!("{sqltxs} UNION ALL");
}else{ } else {
union_tx = false; union_tx = false;
} }
sqltxs = format!("{sqltxs} SELECT ?, ?, ?, ?, ?, ?, ?, ?, ?"); sqltxs = format!("{sqltxs} SELECT ?, ?, ?, ?, ?, ?, ?, ?, ?");
ptx.push((linenum,Value::String(txid))); ptx.push((linenum, Value::String(txid)));
ptx.push((linenum+1,Value::String(wtxid.to_string()))); ptx.push((linenum + 1, Value::String(wtxid.to_string())));
ptx.push((linenum+2,Value::String(ntxid.to_string()))); ptx.push((linenum + 2, Value::String(ntxid.to_string())));
ptx.push((linenum+3,Value::String(line.to_string()))); ptx.push((linenum + 3, Value::String(line.to_string())));
ptx.push((linenum+4,Value::String(locktime.to_string()))); ptx.push((linenum + 4, Value::String(locktime.to_string())));
ptx.push((linenum+5,Value::String(req_time.to_string()))); ptx.push((linenum + 5, Value::String(req_time.to_string())));
ptx.push((linenum+6,Value::String(netconfig.name.to_string()))); ptx.push((linenum + 6, Value::String(netconfig.name.to_string())));
ptx.push((linenum+7,Value::String(our_address.to_string()))); ptx.push((linenum + 7, Value::String(our_address.to_string())));
ptx.push((linenum+8,Value::String(our_fees.to_string()))); ptx.push((linenum + 8, Value::String(our_fees.to_string())));
linenum += 9; linenum += 9;
} }
}else{ } else {
trace!("rawTx len is: {}",raw_tx.len()); trace!("rawTx len is: {}", raw_tx.len());
debug!("{}",&sqltxs); debug!("{}", &sqltxs);
} }
} }
if sqltxs.is_empty() && already_present { if sqltxs.is_empty() && already_present {
return Ok(Response::new(full("already present"))) return Ok(Response::new(full("already present")));
} }
let sqltxs = format!("{}{};", sqltxshead, sqltxs); let sqltxs = format!("{}{};", sqltxshead, sqltxs);
let sqlinps = format!("{}{};", sqlinpshead, sqlinps); let sqlinps = format!("{}{};", sqlinpshead, sqlinps);
let sqlouts = format!("{}{};", sqloutshead, sqlouts); let sqlouts = format!("{}{};", sqloutshead, sqlouts);
if let Err(err) = execute_insert(&db, sqltxs, ptx, sqlinps, pinps, sqlouts, pouts){ if let Err(err) = execute_insert(&db, sqltxs, ptx, sqlinps, pinps, sqlouts, pouts) {
debug!("{}",err); debug!("{}", err);
return Ok(response); return Ok(response);
} }
Ok(Response::new(full("thx"))) Ok(Response::new(full("thx")))
@@ -415,50 +525,60 @@ fn match_uri<'a>(path: &str, uri: &'a str) -> Option<&'a str> {
None None
} }
async fn echo( async fn echo(
req: Request<hyper::body::Incoming>, req: Request<hyper::body::Incoming>,
cfg: &MyConfig, cfg: &MyConfig,
ip: &String ip: &String,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let mut not_found = Response::new(empty()); let mut not_found = Response::new(empty());
*not_found.status_mut() = StatusCode::NOT_FOUND; *not_found.status_mut() = StatusCode::NOT_FOUND;
let mut ret: Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> = Ok(not_found); let mut ret: Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> = Ok(not_found);
let uri = req.uri().path().to_string(); let uri = req.uri().path().to_string();
let remote_addr = req.headers().get("X-Real-IP").and_then(|value| value.to_str().ok()).and_then(|xff| xff.split(',').next()).map(|ip| ip.trim().to_string()).unwrap_or_else(|| ip.to_string()); let remote_addr = req
trace!("{}: {}",remote_addr,uri); .headers()
.get("X-Real-IP")
.and_then(|value| value.to_str().ok())
.and_then(|xff| xff.split(',').next())
.map(|ip| ip.trim().to_string())
.unwrap_or_else(|| ip.to_string());
trace!("{}: {}", remote_addr, uri);
match *req.method() { match *req.method() {
// Serve some instructions at / // Serve some instructions at /
Method::POST => { Method::POST => {
let whole_body = req.collect().await?.to_bytes(); let whole_body = req.collect().await?.to_bytes();
if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/pushtxs$",uri.as_str()) { if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/pushtxs$", uri.as_str()) {
//let whole_body = collect_body(req,512_000).await?; //let whole_body = collect_body(req,512_000).await?;
ret = echo_push(&whole_body,cfg,param).await; ret = echo_push(&whole_body, cfg, param).await;
} }
if uri=="/searchtx"{ if uri == "/searchtx" {
//let whole_body = collect_body(req,64).await?; //let whole_body = collect_body(req,64).await?;
ret = echo_search(&whole_body,cfg).await; ret = echo_search(&whole_body, cfg).await;
} }
ret ret
} }
Method::GET => { Method::GET => {
if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/info$",uri.as_str()) { if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/stats$", uri.as_str()) {
ret = echo_info(param,cfg,remote_addr).await; ret = echo_stats(param, cfg).await;
} }
if uri=="/version"{ if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/info$", uri.as_str()) {
ret= echo_version().await; ret = echo_info(param, cfg, &remote_addr).await;
} }
if uri=="/.pub_key.pem" { if uri == "/version" {
ret = echo_version().await;
}
if uri == "/.pub_key.pem" {
ret = echo_pub_key(cfg).await; ret = echo_pub_key(cfg).await;
} }
if uri == "/" {
ret = echo_home(cfg).await;
}
ret ret
} }
// Return the 404 Not Found for other routes. // Return the 404 Not Found for other routes.
_ => ret _ => ret,
} }
} }
@@ -473,113 +593,120 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
.map_err(|never| match never {}) .map_err(|never| match never {})
.boxed() .boxed()
} }
fn parse_env(cfg: &Arc<Mutex<MyConfig>>){ fn parse_env(cfg: &Arc<Mutex<MyConfig>>) {
for (key, value) in std::env::vars() { //for (key, value) in std::env::vars() {
debug!("ENVIRONMENT {key}: {value}"); // debug!("ENVIRONMENT {key}: {value}");
} //}
let mut cfg_lock = cfg.lock().unwrap(); let mut cfg_lock = cfg.lock().unwrap();
if let Ok(value) = env::var("BAL_SERVER_DB_FILE") { if let Ok(value) = env::var("BAL_SERVER_DB_FILE") {
debug!("BAL_SERVER_DB_FILE: {}",value); debug!("BAL_SERVER_DB_FILE: {}", value);
cfg_lock.db_file = value; cfg_lock.db_file = value;
} }
if let Ok(value) = env::var("BAL_SERVER_BIND_ADDRESS") { if let Ok(value) = env::var("BAL_SERVER_BIND_ADDRESS") {
debug!("BAL_SERVER_BIND_ADDRESS: {}",value); debug!("BAL_SERVER_BIND_ADDRESS: {}", value);
cfg_lock.bind_address= value; cfg_lock.bind_address = value;
} }
if let Ok(value) = env::var("BAL_SERVER_BIND_PORT") { if let Ok(value) = env::var("BAL_SERVER_BIND_PORT") {
debug!("BAL_SERVER_BIND_PORT: {}",value); debug!("BAL_SERVER_BIND_PORT: {}", value);
if let Ok(v) = value.parse::<u16>(){ if let Ok(v) = value.parse::<u16>() {
cfg_lock.bind_port = v; cfg_lock.bind_port = v;
} }
} }
if let Ok(value) = env::var("BAL_SERVER_PUB_KEY_PATH") { if let Ok(value) = env::var("BAL_SERVER_PUB_KEY_PATH") {
debug!("BAL_SERVER_PUB_KEY_PATH: {}",value); debug!("BAL_SERVER_PUB_KEY_PATH: {}", value);
cfg_lock.pub_key_path = value; cfg_lock.pub_key_path = value;
} }
if let Ok(value) = env::var("BAL_SERVER_INFO") {
if let Ok(value) = env::var("BAL_SERVER_INFO"){ debug!("BAL_SERVER_INFO: {}", value);
debug!("BAL_SERVER_INFO: {}",value);
cfg_lock.info = value; cfg_lock.info = value;
} }
cfg_lock = parse_env_netconfig(cfg_lock,"regtest"); cfg_lock = parse_env_netconfig(cfg_lock, "regtest");
cfg_lock = parse_env_netconfig(cfg_lock,"signet"); cfg_lock = parse_env_netconfig(cfg_lock, "signet");
cfg_lock = parse_env_netconfig(cfg_lock,"testnet"); cfg_lock = parse_env_netconfig(cfg_lock, "testnet");
drop(parse_env_netconfig(cfg_lock,"bitcoin")); drop(parse_env_netconfig(cfg_lock, "bitcoin"));
} }
fn parse_env_netconfig<'a>(mut cfg_lock: MutexGuard<'a, MyConfig>, chain: &'a str) -> MutexGuard<'a, MyConfig>{ fn parse_env_netconfig<'a>(
let cfg = match chain{ mut cfg_lock: MutexGuard<'a, MyConfig>,
chain: &'a str,
) -> MutexGuard<'a, MyConfig> {
let cfg = match chain {
"regtest" => &mut cfg_lock.regtest, "regtest" => &mut cfg_lock.regtest,
"signet" => &mut cfg_lock.signet, "signet" => &mut cfg_lock.signet,
"testnet" => &mut cfg_lock.testnet, "testnet" => &mut cfg_lock.testnet,
&_ => &mut cfg_lock.mainnet, &_ => &mut cfg_lock.mainnet,
}; };
if let Ok(value) = env::var(format!("BAL_SERVER_{}_ADDRESS",chain.to_uppercase())) { if let Ok(value) = env::var(format!("BAL_SERVER_{}_ADDRESS", chain.to_uppercase())) {
debug!("BAL_SERVER_{}_ADDRESS: {}",chain.to_uppercase(),value); debug!("BAL_SERVER_{}_ADDRESS: {}", chain.to_uppercase(), value);
cfg.address = value; cfg.address = value;
if cfg.address.len() > 5 { if cfg.address.len() > 5 {
if cfg.address[1..4] == *"pub" { if cfg.address[1..4] == *"pub" {
cfg.xpub=true; cfg.xpub = true;
trace!("is_xpub"); trace!("is_xpub");
} }
cfg.enabled=true; cfg.enabled = true;
} }
} }
if let Ok(value) = env::var(format!("BAL_SERVER_{}_FIXED_FEE",chain.to_uppercase())) { if let Ok(value) = env::var(format!("BAL_SERVER_{}_FIXED_FEE", chain.to_uppercase())) {
debug!("BAL_SERVER_{}_FIXED_FEE: {}",chain.to_uppercase(),value); debug!("BAL_SERVER_{}_FIXED_FEE: {}", chain.to_uppercase(), value);
if let Ok(v) = value.parse::<u64>(){ if let Ok(v) = value.parse::<u64>() {
cfg.fixed_fee = v; cfg.fixed_fee = v;
} }
} }
cfg_lock cfg_lock
} }
fn init_network(db: &Connection, cfg: &MyConfig){ fn init_network(db: &Connection, cfg: &MyConfig) {
for network in NETWORKS{ for network in NETWORKS {
let netconfig = MyConfig::get_net_config(cfg,network); let netconfig = MyConfig::get_net_config(cfg, network);
insert_xpub(db,&netconfig.name,&netconfig.address); insert_xpub(db, &netconfig.name, &netconfig.address);
} }
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
env_logger::init(); env_logger::init();
let cfg: Arc<Mutex<MyConfig>> =Arc::<Mutex<MyConfig>>::default(); let cfg: Arc<Mutex<MyConfig>> = Arc::<Mutex<MyConfig>>::default();
parse_env(&cfg); parse_env(&cfg);
let cfg_lock = cfg.lock().unwrap(); let cfg_lock = cfg.lock().unwrap();
let db = sqlite::open(&cfg_lock.db_file).unwrap(); let db = sqlite::open(&cfg_lock.db_file).unwrap();
create_database(&db); create_database(&db);
init_network(&db,&cfg_lock); init_network(&db, &cfg_lock);
let addr = cfg_lock.bind_address.to_string(); let addr = cfg_lock.bind_address.to_string();
let addr: IpAddr = addr.parse()?; let addr: IpAddr = addr.parse()?;
let listener = TcpListener::bind((addr,cfg_lock.bind_port)).await?; let listener = TcpListener::bind((addr, cfg_lock.bind_port)).await?;
info!("Listening on http://{}:{}", addr,cfg_lock.bind_port); info!("Listening on http://{}:{}", addr, cfg_lock.bind_port);
loop { loop {
let (stream, _) = listener.accept().await?; let (stream, _) = listener.accept().await?;
let ip = stream.peer_addr()?.to_string().split(":").next().unwrap().to_string(); let ip = stream
.peer_addr()?
.to_string()
.split(":")
.next()
.unwrap()
.to_string();
let io = TokioIo::new(stream); let io = TokioIo::new(stream);
tokio::task::spawn({ tokio::task::spawn({
let cfg = cfg_lock.clone(); let cfg = cfg_lock.clone();
async move { async move {
if let Err(err) = http1::Builder::new() if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(|req: Request<hyper::body::Incoming>| async { .serve_connection(
echo(req,&cfg,&ip).await io,
})) service_fn(|req: Request<hyper::body::Incoming>| async {
echo(req, &cfg, &ip).await
}),
)
.await .await
{ {
error!("Error serving connection: {:?}", err); error!("Error serving connection: {:?}", err);
} }
} }
}); });
} }

152
src/db.rs
View File

@@ -1,7 +1,7 @@
use sqlite::{ Connection, Value, State, Error }; use log::{error, info, trace};
use log::{info, trace, error}; use sqlite::{Connection, Error, State, Value};
pub fn create_database(db: &Connection){ pub fn create_database(db: &Connection) {
info!("database sanity check"); info!("database sanity check");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_tx (txid PRIMARY KEY, date_creation TIMESTAMP DEFAULT CURRENT_TIMESTAMP, date_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP, wtxid, ntxid, tx, locktime integer, network, network_fees, reqid, our_fees, our_address, status integer DEFAULT 0);"); let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_tx (txid PRIMARY KEY, date_creation TIMESTAMP DEFAULT CURRENT_TIMESTAMP, date_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP, wtxid, ntxid, tx, locktime integer, network, network_fees, reqid, our_fees, our_address, status integer DEFAULT 0);");
let _ = db.execute("ALTER TABLE tbl_tx ADD COLUMN push_err TEXT"); let _ = db.execute("ALTER TABLE tbl_tx ADD COLUMN push_err TEXT");
@@ -9,17 +9,15 @@ pub fn create_database(db: &Connection){
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_inp(id, txid, in_txid, in_vout);"); let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_inp(id, txid, in_txid, in_vout);");
let _ = db.execute("CREATE UNIQUE INDEX ON tbl_inp(txid,in_txid,in_vout);"); let _ = db.execute("CREATE UNIQUE INDEX ON tbl_inp(txid,in_txid,in_vout);");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_out(id, txid, script_pubkey, amount, vout);"); let _ =
db.execute("CREATE TABLE IF NOT EXISTS tbl_out(id, txid, script_pubkey, amount, vout);");
let _ = db.execute("CREATE UNIQUE INDEX ON tbl_out(txid, script_pubkey, amount, vout);"); let _ = db.execute("CREATE UNIQUE INDEX ON tbl_out(txid, script_pubkey, amount, vout);");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_xpub (id INTEGER PRIMARY KEY , network TEXT, xpub TEXT, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP,path_idx INTEGER DEFAULT -1);"); let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_xpub (id INTEGER PRIMARY KEY , network TEXT, xpub TEXT, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP,path_idx INTEGER DEFAULT -1);");
let _ = db.execute("CREATE UNIQUE INDEX idx_xpub ON tbl_xpub (network, xpub)"); let _ = db.execute("CREATE UNIQUE INDEX idx_xpub ON tbl_xpub (network, xpub)");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);"); let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);");
let _ = db.execute("UPDATE tbl_tx set network='bitcoin' where network='mainnet');"); let _ = db.execute("UPDATE tbl_tx set network='bitcoin' where network='mainnet');");
} }
/* /*
pub fn get_xpub_id(db: &Connection, network: &String, xpub: &String) -> Option<i64>{ pub fn get_xpub_id(db: &Connection, network: &String, xpub: &String) -> Option<i64>{
@@ -33,76 +31,98 @@ pub fn create_database(db: &Connection){
} }
} }
*/ */
pub fn insert_xpub(db: &Connection, network: &String, xpub: &String){ pub fn insert_xpub(db: &Connection, network: &String, xpub: &String) {
if xpub != "" { if xpub != "" {
trace!("going to insert: {} xpub:{}", network, xpub); trace!("going to insert: {} xpub:{}", network, xpub);
let mut stmt = db.prepare ("INSERT INTO tbl_xpub(network,xpub) VALUES(?, ?);").unwrap(); let mut stmt = db
let _ = stmt.bind((1,Value::String(network.to_string()))).unwrap(); .prepare("INSERT INTO tbl_xpub(network,xpub) VALUES(?, ?);")
let _ = stmt.bind((2,Value::String(xpub.to_string()))).unwrap(); .unwrap();
let _ = stmt.bind((1, Value::String(network.to_string()))).unwrap();
let _ = stmt.bind((2, Value::String(xpub.to_string()))).unwrap();
let _ = stmt.next(); let _ = stmt.next();
} }
} }
pub fn get_last_used_address_by_ip(db: &Connection, network: &String, xpub: &String, address: &String) -> Option<String>{ pub fn get_last_used_address_by_ip(
db: &Connection,
network: &String,
xpub: &String,
address: &String,
) -> Option<String> {
let mut stmt = db.prepare("SELECT tbl_address.address FROM tbl_xpub join tbl_address on(tbl_xpub.id = tbl_address.xpub) where tbl_xpub.network = ? and tbl_address.remote_address = ? and tbl_xpub.xpub = ? ORDER BY tbl_address.date_create DESC LIMIT 1;").unwrap(); let mut stmt = db.prepare("SELECT tbl_address.address FROM tbl_xpub join tbl_address on(tbl_xpub.id = tbl_address.xpub) where tbl_xpub.network = ? and tbl_address.remote_address = ? and tbl_xpub.xpub = ? ORDER BY tbl_address.date_create DESC LIMIT 1;").unwrap();
let _ = stmt.bind((1,Value::String(network.to_string()))); let _ = stmt.bind((1, Value::String(network.to_string())));
let _ = stmt.bind((2,Value::String(address.to_string()))); let _ = stmt.bind((2, Value::String(address.to_string())));
let _ = stmt.bind((3,Value::String(xpub.to_string()))); let _ = stmt.bind((3, Value::String(xpub.to_string())));
if let Ok(State::Row) = stmt.next(){ if let Ok(State::Row) = stmt.next() {
let address = stmt.read::<String,_>("address").unwrap(); let address = stmt.read::<String, _>("address").unwrap();
return Some(address); return Some(address);
}else{ } else {
return None; return None;
} }
} }
pub fn get_next_address_index(db: &Connection, network: &String, xpub: &String) -> (i64,i64){ pub fn get_next_address_index(db: &Connection, network: &String, xpub: &String) -> (i64, i64) {
let mut stmt = db.prepare("UPDATE tbl_xpub SET path_idx = path_idx + 1 WHERE network = ? and xpub= ? RETURNING path_idx,id;").unwrap(); let mut stmt = db.prepare("UPDATE tbl_xpub SET path_idx = path_idx + 1 WHERE network = ? and xpub= ? RETURNING path_idx,id;").unwrap();
stmt.bind((1,Value::String(network.to_string()))).unwrap(); stmt.bind((1, Value::String(network.to_string()))).unwrap();
stmt.bind((2,Value::String(xpub.to_string()))).unwrap(); stmt.bind((2, Value::String(xpub.to_string()))).unwrap();
match stmt.next(){ match stmt.next() {
Ok(State::Row) =>{ Ok(State::Row) => {
let next = stmt.read::<i64,_>("path_idx").unwrap(); let next = stmt.read::<i64, _>("path_idx").unwrap();
let id = stmt.read::<i64,_>("id").unwrap(); let id = stmt.read::<i64, _>("id").unwrap();
return (id,next); return (id, next);
},Err(_)=> { }
return (0,0); Err(_) => {
},Ok(State::Done) =>{ return (0, 0);
return (0,0); }
Ok(State::Done) => {
return (0, 0);
} }
}; };
} }
pub fn save_new_address(db: &Connection,xpub: i64,address: &String, path: &String,remote_addr: &String){ pub fn save_new_address(
let mut stmt = db.prepare("INSERT INTO tbl_address(address,path,xpub,remote_address) VALUES(?,?,?,?);").unwrap(); db: &Connection,
xpub: i64,
address: &String,
path: &String,
remote_addr: &String,
) {
let mut stmt = db
.prepare("INSERT INTO tbl_address(address,path,xpub,remote_address) VALUES(?,?,?,?);")
.unwrap();
stmt.bind((1,Value::String(address.to_string()))).unwrap(); stmt.bind((1, Value::String(address.to_string()))).unwrap();
stmt.bind((2,Value::String(path.to_string()))).unwrap(); stmt.bind((2, Value::String(path.to_string()))).unwrap();
stmt.bind((3,Value::Integer(xpub))).unwrap(); stmt.bind((3, Value::Integer(xpub))).unwrap();
stmt.bind((4,Value::String(remote_addr.to_string()))).unwrap(); stmt.bind((4, Value::String(remote_addr.to_string())))
.unwrap();
let _ = stmt.next(); let _ = stmt.next();
} }
pub fn execute_insert(db: &Connection, pub fn execute_insert(
sqltxs: String, db: &Connection,
ptx: Vec<(usize, Value)>, sqltxs: String,
sqlinp: String, ptx: Vec<(usize, Value)>,
pinp: Vec<(usize, Value)>, sqlinp: String,
sqlout: String, pinp: Vec<(usize, Value)>,
pout: Vec<(usize, Value)>) -> Result<(),Error>{ sqlout: String,
pout: Vec<(usize, Value)>,
) -> Result<(), Error> {
let _ = db.execute("BEGIN TRANSACTION"); let _ = db.execute("BEGIN TRANSACTION");
let mut stmt = db.prepare(sqltxs.as_str()).expect("failed to prepare sqltxs"); let mut stmt = db
if let Err(err) = stmt.bind::<&[(_,Value)]>(&ptx[..]) { .prepare(sqltxs.as_str())
.expect("failed to prepare sqltxs");
if let Err(err) = stmt.bind::<&[(_, Value)]>(&ptx[..]) {
error!("error binding transaction parameters: {}", err); error!("error binding transaction parameters: {}", err);
let _ = db.execute("ROLLBACK"); let _ = db.execute("ROLLBACK");
return Err(err); return Err(err);
} }
if let Err(err) = stmt.next() { if let Err(err) = stmt.next() {
error!("error inserting transactions {}",err); error!("error inserting transactions {}", err);
let _ = db.execute("ROLLBACK"); let _ = db.execute("ROLLBACK");
}else{ } else {
let mut stmt = db.prepare(sqlinp.as_str()).expect("failed to prepare sqlinp"); let mut stmt = db
if let Err(err) = stmt.bind::<&[(_,Value)]>(&pinp[..]) { .prepare(sqlinp.as_str())
.expect("failed to prepare sqlinp");
if let Err(err) = stmt.bind::<&[(_, Value)]>(&pinp[..]) {
error!("error binding inputs parameters {}", err); error!("error binding inputs parameters {}", err);
let _ = db.execute("ROLLBACK"); let _ = db.execute("ROLLBACK");
return Err(err); return Err(err);
@@ -111,10 +131,11 @@ pub fn execute_insert(db: &Connection,
error!("error inserting inputs {}", err); error!("error inserting inputs {}", err);
let _ = db.execute("ROLLBACK"); let _ = db.execute("ROLLBACK");
return Err(err); return Err(err);
} else {
}else{ let mut stmt = db
let mut stmt = db.prepare(sqlout.as_str()).expect("failed to prepare sqlout"); .prepare(sqlout.as_str())
if let Err(err) = stmt.bind::<&[(_,Value)]>(&pout[..]) { .expect("failed to prepare sqlout");
if let Err(err) = stmt.bind::<&[(_, Value)]>(&pout[..]) {
error!("error binding outs parameters {}", err); error!("error binding outs parameters {}", err);
let _ = db.execute("ROLLBACK"); let _ = db.execute("ROLLBACK");
return Err(err); return Err(err);
@@ -123,25 +144,20 @@ pub fn execute_insert(db: &Connection,
error!("error inserting outs {}", err); error!("error inserting outs {}", err);
let _ = db.execute("ROLLBACK"); let _ = db.execute("ROLLBACK");
return Err(err); return Err(err);
} }
} }
} }
let _ = db.execute("COMMIT"); let _ = db.execute("COMMIT");
Ok(()) Ok(())
} }
pub fn get_total_transaction_number(db: Connection, network: &String) -> Result<i64,Error> { pub fn get_total_transaction_number(db: Connection, network: &String) -> Result<i64, Error> {
let mut stmt = db.prepare("SELECT COUNT(*) as total_number FROM tbl_tx where network = ?;").unwrap(); let mut stmt = db
stmt.bind((1,Value::String(network.to_string()))).unwrap(); .prepare("SELECT COUNT(*) as total_number FROM tbl_tx where network = ?;")
match stmt.next(){ .unwrap();
Ok(State::Row)=>{ stmt.bind((1, Value::String(network.to_string()))).unwrap();
Ok(stmt.read::<i64,_>("total_number").unwrap()) match stmt.next() {
}, Ok(State::Row) => Ok(stmt.read::<i64, _>("total_number").unwrap()),
Ok(sqlite::State::Done) => todo!(), Ok(sqlite::State::Done) => todo!(),
Err(err)=>Err(err) Err(err) => Err(err),
} }
} }

View File

@@ -1,32 +1,153 @@
use sha2::{Digest, Sha256}; //use bs58;
use bitcoin::bip32::Xpub;
use std::str::FromStr;
use bitcoin::bip32::DerivationPath;
use bitcoin::key::Secp256k1;
use bitcoin::Address; use bitcoin::Address;
use bitcoin::Network;
use bitcoin::ScriptBuf; use bitcoin::ScriptBuf;
use bitcoin::WPubkeyHash; use bitcoin::WPubkeyHash;
use bitcoin::Network; use bitcoin::bip32::DerivationPath;
use bitcoin::bip32::Xpub;
use bitcoin::hashes::Hash; use bitcoin::hashes::Hash;
use bitcoin::key::Secp256k1;
use sha2::{Digest, Sha256};
use std::str::FromStr;
// Mainnet (BIP44/BIP49/BIP84) // Mainnet (BIP44/BIP49/BIP84)
enum BS58Prefix{ enum BS58Prefix {
Xpub, Xpub,
//Ypub, Ypub,
//Zpub, Zpub,
//Tpub, Tpub,
//Vpub, Vpub,
//Upub Upub,
} }
const XPUB_PREFIX:[u8; 4] = [0x04, 0x88, 0xB2, 0x1E]; // xpub (Legacy P2PKH) const XPUB_PREFIX: [u8; 4] = [0x04, 0x88, 0xB2, 0x1E]; // xpub (Legacy P2PKH)
//const YPUB_PREFIX:[u8; 4] = [0x04, 0x9D, 0x7C, 0xB2]; // ypub (Nested SegWit P2SH-P2WPKH) const YPUB_PREFIX: [u8; 4] = [0x04, 0x9D, 0x7C, 0xB2]; // ypub (Nested SegWit P2SH-P2WPKH)
//const ZPUB_PREFIX:[u8; 4] = [0x04, 0xB2, 0x47, 0x46]; // zpub (Native SegWit P2WPKH) const ZPUB_PREFIX: [u8; 4] = [0x04, 0xB2, 0x47, 0x46]; // zpub (Native SegWit P2WPKH)
//const TPUB_PREFIX:[u8; 4] = [0x04, 0x35, 0x87, 0xCF]; // tpub (Testnet Legacy P2PKH) const TPUB_PREFIX: [u8; 4] = [0x04, 0x35, 0x87, 0xCF]; // tpub (Testnet Legacy P2PKH)
//const VPUB_PREFIX:[u8; 4] = [0x04, 0x5F, 0x1C, 0xF6]; // vpub (Testnet Nested SegWit) const VPUB_PREFIX: [u8; 4] = [0x04, 0x5F, 0x1C, 0xF6]; // vpub (Testnet Nested SegWit)
//const UPUB_PREFIX:[u8; 4] = [0x04, 0x4A, 0x52, 0x62]; // upub (RegTest Nested SegWit) const UPUB_PREFIX: [u8; 4] = [0x04, 0x4A, 0x52, 0x62]; // upub (RegTest Nested SegWit)
// Constants from Bitcoin Core's checksum algorithm
const INPUT_CHARSET: &[u8] = b"0123456789()[],'/*abcdefgh@:$%{}IJKLMNOPQRSTUVWXYZ&+-.;<=>?!^_|~ijklmnopqrstuvwxyzABCDEFGH`#\"\\ ";
const CHECKSUM_CHARSET: &[u8] = b"qpzry9x8gf2tvdw0s3jn54khce6mua7l";
// Polynomial modulo function used in checksum calculation (same as in Bitcoin Core)
fn poly_mod(mut c: u64, val: u64) -> u64 {
let c0 = c >> 35;
c = ((c & 0x7ffffffff) << 5) ^ val;
if c0 & 1 > 0 {
c ^= 0xf5dee51989
};
if c0 & 2 > 0 {
c ^= 0xa9fdca3312
};
if c0 & 4 > 0 {
c ^= 0x1bab10e32d
};
if c0 & 8 > 0 {
c ^= 0x3706b1677a
};
if c0 & 16 > 0 {
c ^= 0x644d626ffd
};
c
}
// Calculate checksum for a descriptor string
fn calc_checksum(desc: &str) -> Result<String, String> {
// Separate descriptor from any existing checksum
let desc = match desc.split_once('#') {
Some((d, _)) => d,
None => desc,
};
let mut c: u64 = 1;
let mut cls: u64 = 0;
let mut clscount: u64 = 0;
// Process each character in the descriptor
for ch in desc.as_bytes() {
let pos = match INPUT_CHARSET.iter().position(|b| b == ch) {
Some(p) => p as u64,
None => return Err(format!("Invalid character in descriptor: {}", *ch as char)),
};
c = poly_mod(c, pos & 31);
cls = cls * 3 + (pos >> 5);
clscount += 1;
if clscount == 3 {
c = poly_mod(c, cls);
cls = 0;
clscount = 0;
}
}
if clscount > 0 {
c = poly_mod(c, cls);
}
// Final steps in checksum calculation
for _ in 0..8 {
c = poly_mod(c, 0);
}
c ^= 1;
// Convert checksum to characters
let mut checksum = String::with_capacity(8);
for j in 0..8 {
let idx = ((c >> (5 * (7 - j))) & 31) as usize;
checksum.push(CHECKSUM_CHARSET[idx] as char);
}
Ok(checksum)
}
pub fn get_bitcoincore_descriptor(xpub: &String) -> String {
let fingerprint = calculate_fingerprint(xpub);
let mut bip = 84;
let cpub = xpub.to_string();
match &xpub[0..4] {
"vpub" => {
bip = 84;
}
"zpub" => {
bip = 84;
}
&_ => {
bip = 84;
}
};
let descriptor = format!(
"wpkh([{}/84h/0h/0h]{}/0/*)",
fingerprint,
convert_xpub(xpub)
);
let descriptor = match calc_checksum(&descriptor) {
Ok(checksum) => {
let clean_descriptor = descriptor.split('#').next().unwrap_or(&descriptor);
format!("{}#{}", clean_descriptor, checksum)
}
Err(err) => {
eprintln!("Error: {}", err);
"".to_string()
}
};
descriptor
//format!("{}#{}",descriptor,checksum)
}
fn convert_xpub(xpub: &String) -> String {
if xpub[0..4] == *"xpub" || xpub[0..4] == *"ypub" || xpub[0..4] == *"zpub" {
return convert_to(xpub, BS58Prefix::Xpub).unwrap();
} else {
return convert_to(xpub, BS58Prefix::Tpub).unwrap();
}
}
pub fn calculate_fingerprint(tpub: &str) -> String {
let xpub = Xpub::from_str(&convert_to(tpub, BS58Prefix::Xpub).unwrap()).unwrap();
let fp = xpub.fingerprint();
let pp = xpub.parent_fingerprint;
format!("{}", fp)
}
fn base58check_decode(s: &str) -> Result<Vec<u8>, String> { fn base58check_decode(s: &str) -> Result<Vec<u8>, String> {
let data = bs58::decode(s).into_vec().map_err(|e| e.to_string())?; let data = bs58::decode(s).into_vec().map_err(|e| e.to_string())?;
@@ -34,7 +155,7 @@ fn base58check_decode(s: &str) -> Result<Vec<u8>, String> {
return Err("Data troppo corta".to_string()); return Err("Data troppo corta".to_string());
} }
let (payload, checksum) = data.split_at(data.len() - 4); let (payload, checksum) = data.split_at(data.len() - 4);
let hash = Sha256::digest(Sha256::digest(payload)); let hash = Sha256::digest(&Sha256::digest(payload));
if hash[0..4] != checksum[..] { if hash[0..4] != checksum[..] {
return Err("Checksum invalido".to_string()); return Err("Checksum invalido".to_string());
} }
@@ -42,33 +163,39 @@ fn base58check_decode(s: &str) -> Result<Vec<u8>, String> {
} }
fn base58check_encode(data: &[u8]) -> String { fn base58check_encode(data: &[u8]) -> String {
let checksum = &Sha256::digest(Sha256::digest(data))[0..4]; let checksum = &Sha256::digest(&Sha256::digest(data))[0..4];
let full = [data, checksum].concat(); let full = [data, checksum].concat();
bs58::encode(full).into_string() bs58::encode(full).into_string()
} }
fn convert_to(zpub: &str,prefix: BS58Prefix) -> Result<String, String> { fn convert_to(zpub: &str, prefix: BS58Prefix) -> Result<String, String> {
let mut data = base58check_decode(zpub)?; let mut data = base58check_decode(zpub)?;
if data.len() < 4 { if data.len() < 4 {
return Err("Non è una zpub valida.".to_string()); return Err("Non è una zpub valida.".to_string());
} }
data.splice(0..4, match prefix { data.splice(
BS58Prefix::Xpub => XPUB_PREFIX, 0..4,
//BS58Prefix::Ypub => YPUB_PREFIX, match prefix {
//BS58Prefix::Zpub => ZPUB_PREFIX, BS58Prefix::Xpub => XPUB_PREFIX,
//BS58Prefix::Vpub => VPUB_PREFIX, BS58Prefix::Ypub => YPUB_PREFIX,
//BS58Prefix::Tpub => TPUB_PREFIX, BS58Prefix::Zpub => ZPUB_PREFIX,
//BS58Prefix::Upub => UPUB_PREFIX, BS58Prefix::Vpub => VPUB_PREFIX,
}); BS58Prefix::Tpub => TPUB_PREFIX,
BS58Prefix::Upub => UPUB_PREFIX,
},
);
Ok(base58check_encode(&data)) Ok(base58check_encode(&data))
} }
pub fn new_address_from_xpub(zpub: &str, index: i64,network: Network)-> Result<(String,String), Box<dyn std::error::Error>>{ pub fn new_address_from_xpub(
let xpub = Xpub::from_str(&convert_to(zpub,BS58Prefix::Xpub)?)?; zpub: &str,
let path = format!("m/0/{}",index); index: i64,
let derivation_path = DerivationPath::from_str(path.as_str())?; network: Network,
) -> Result<(String, String), Box<dyn std::error::Error>> {
let xpub = Xpub::from_str(&convert_to(zpub, BS58Prefix::Xpub)?)?;
let path = format!("m/0/{}", index);
let derivation_path = DerivationPath::from_str(&path.as_str())?;
let secp = Secp256k1::new(); let secp = Secp256k1::new();
let derived_xpub = xpub.derive_pub(&secp, &derivation_path)?; let derived_xpub = xpub.derive_pub(&secp, &derivation_path)?;
let public_key = derived_xpub.public_key; let public_key = derived_xpub.public_key;
@@ -78,20 +205,21 @@ pub fn new_address_from_xpub(zpub: &str, index: i64,network: Network)-> Result<(
//let script_pubkey = ScriptBuf::new_p2sh(&redeem_script.script_hash()); //let script_pubkey = ScriptBuf::new_p2sh(&redeem_script.script_hash());
let address = Address::from_script(&redeem_script, network)?; let address = Address::from_script(&redeem_script, network)?;
//let address = Address::from_script(&script_pubkey, network)?; //let address = Address::from_script(&script_pubkey, network)?;
Ok((address.to_string(),path.to_string())) Ok((address.to_string(), path.to_string()))
} }
/* /*
fn main() -> Result<(), Box<dyn std::error::Error>>{ fn main() -> Result<(), Box<dyn std::error::Error>>{
//let zpub = "xpub6C29v8gxCXREHUzoGNfqqFqZWxTVEmYtmZshuzfSwBKNmfYQxoizRziCkkUUA4WwJZkJs2i7nttRiC6MQG7mxZpouXeYkTZe3U52RyPAeo2"; match convert_to(zpub,BS58Prefix::Tpub) {
//let zpub = "vpub5Ut36m34VebUUjdhYaxJCjSPqk3ZR8bA2MXLmbHRQCycAxy5Q1GFPJspLkJywJjBgQnvU3rmwPKTPp1ELLWeXrve3zBufpZR4MRCCTNHzsn"; Ok(tpub) => println!("XPUB: {}", tpub),
let zpub = "zpub6qdfveGrxBQN3z8paZ88EHpCn5MGXpUoHwQmHhPbj4rPQtUjbWyCHrJFYZGVY7MsmVbDaeu4JYqRqcdLzMx78wZFEWbLrF9FG3gr2MPQC5H";
match convert_to(zpub,BS58Prefix::Xpub) {
Ok(xpub) => println!("XPUB: {}", xpub),
Err(e) => eprintln!("Errore: {}", e), Err(e) => eprintln!("Errore: {}", e),
} }
let xpub = Xpub::from_str(&convert_to(zpub,BS58Prefix::Xpub)?)?; let fingerprint = base58check_encode(&calculate_fingerprint(zpub));
println!("ZPUB: {}, FINGERPRINT: {}",zpub,fingerprint);
let xpub = Xpub::from_str(&convert_to(zpub,BS58Prefix::Xpub)?)?;
let tpub = convert_to(zpub,BS58Prefix::Tpub)?;
let fingerprint = base58check_encode(&calculate_fingerprint(&tpub));
println!("TPUB: {}, FINGERPRINT: {}",tpub,fingerprint);
let derivation_path = DerivationPath::from_str("m/0/0")?; let derivation_path = DerivationPath::from_str("m/0/0")?;
let secp = Secp256k1::new(); let secp = Secp256k1::new();
let derived_xpub = xpub.derive_pub(&secp, &derivation_path)?; let derived_xpub = xpub.derive_pub(&secp, &derivation_path)?;
@@ -108,5 +236,4 @@ fn main() -> Result<(), Box<dyn std::error::Error>>{
let address = Address::from_script(&script_pubkey, network)?; let address = Address::from_script(&script_pubkey, network)?;
Ok(()) Ok(())
} }*/
*/