Compare commits

...

22 Commits
v0.1.0 ... main

Author SHA1 Message Date
d5fe22cc14
bal server route / 2025-11-03 09:36:56 -04:00
83749afddd
update version 2025-11-03 08:53:58 -04:00
dd075508b7
fix bal-pusher zmq connection stability 2025-11-03 08:46:13 -04:00
4ac492ba79
fix bal-pusher zmq connection stability 2025-11-03 07:42:09 -04:00
ae52b2b4e5
. 2025-11-01 19:02:15 -04:00
7c8ed123aa
send times to welist
ED25519 times signing
2025-11-01 18:34:35 -04:00
d937ee9364
send time to welist server 2025-10-31 16:01:15 -04:00
3018e64fb7
release: bal-server-0.2.2 2025-10-31 15:13:09 -04:00
2234bb9147
bugdfix 2025-09-23 13:55:00 -04:00
fe1c4ee2c8
bal-pusher.env 2025-07-19 07:15:53 -04:00
ca10284479
logs about current block 2025-07-19 07:12:18 -04:00
afd21a5f2a
bal-pusher readme 2025-07-16 04:50:40 -04:00
d154567aeb
release: bal-server-0.2.1 2025-07-16 04:26:02 -04:00
8965a06dbe
release: bal-server-0.2.1 2025-07-16 04:25:17 -04:00
134504e870
release: bal-server-0.2.1 2025-07-16 03:58:42 -04:00
66f34cb29f
pusher fix 2025-07-16 03:47:28 -04:00
36edfcd073
fixed network db_field 2025-07-15 15:18:37 -04:00
9e89ae884e
fixed db field network 2025-07-15 15:12:21 -04:00
bitcoinafterlife
713eb8fbaa xpub related update 2025-05-22 19:41:05 -04:00
bitcoinafterlife
793b6dccf0 xpub & Some improvement 2025-05-06 21:21:13 -04:00
bitcoinafterlife
19c077256a release: bal-server-0.1.0 2025-04-21 21:30:15 -04:00
bitcoinafterlife
71e3805b8f release: bal-server-0.1.0 2025-04-21 13:35:26 -04:00
13 changed files with 3254 additions and 667 deletions

1781
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -5,18 +5,29 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
hyper = { version = "1.2", features = ["full"] }
tokio = { version = "1", features = ["full"] }
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["full"] }
base64 = "0.22.1"
bs58 = "0.4.0"
bytes = "1.2"
serde = { version = "1.0.152", features = ["derive"] } # <- Only one serde version needed (serde or serde_derive)
serde_json = "1.0.116"
bitcoin = { version = "0.32.5" }
bitcoincore-rpc = "0.19.0"
bitcoincore-rpc-json = "0.19.0"
byteorder = "1.5.0"
confy = "0.6.1"
bitcoin = "0.31.0"
sqlite = "0.34.0"
hex-conservative = "0.1.1"
regex = "1.10.4"
log = "0.4.21"
chrono = "0.4.40"
env_logger = "0.11.5"
hex = "0.4.3"
hex-conservative = "0.1.1"
hyper = { version = "1.3.1", features = ["http1","server"] }
hyper-util = { version = "0.1.3", features = ["tokio"] }
http-body-util = "0.1"
log = "0.4.21"
openssl = { version = "0.10.74", features = ["vendored"] }
sha2 = "0.10.8"
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.116"
sqlite = "0.34.0"
regex = "1.10.4"
reqwest = { version = "0.12.24", features = ["json","socks"] }
tokio = { version = "1", features = ["rt", "net","macros","rt-multi-thread"] } # Keep only necessary runtime components
zmq = "0.10.0"

View File

@ -1,10 +1,11 @@
# bal-server
## Installation
```bash
$ git clone ....
$ cd bal-server
$ openssl genpkey -algorithm ED25519 -out private_key.pem
$ openssl pkey -in private_key.pem -pubout -out public_key.pem
$ cargo build --release
$ sudo cp target/release/bal-server /usr/local/bin
$ bal-server
@ -20,6 +21,7 @@ The `bal-server` application can be configured using environment variables. The
| `BAL_SERVER_DB_FILE` | Path to the SQLite3 database file. If the file does not exist, a new one will be created. | `bal.db` |
| `BAL_SERVER_BIND_ADDRESS` | Public address for listening to requests. | `127.0.0.1` |
| `BAL_SERVER_BIND_PORT` | Default port for listening to requests. | `9137` |
| `BAL_SERVER_PUB_KEY_PATH` | WillExecutor Ed25519 public key | `public_key.pem` |
| `BAL_SERVER_REGTEST_ADDRESS` | Bitcoin address for the regtest environment. | - |
| `BAL_SERVER_REGTEST_FIXED_FEE` | Fixed fee for the regtest environment. | 50000 |
| `BAL_SERVER_SIGNET_ADDRESS` | Bitcoin address for the signet environment. | - |
@ -28,3 +30,56 @@ The `bal-server` application can be configured using environment variables. The
| `BAL_SERVER_TESTNET_FIXED_FEE` | Fixed fee for the testnet environment. | 50000 |
| `BAL_SERVER_BITCOIN_ADDRESS` | Bitcoin address for the mainnet environment. | - |
| `BAL_SERVER_BITCOIN_FIXED_FEE` | Fixed fee for the mainnet environment. | 50000 |
# bal-pusher
`bal-pusher` is a tool that retrieves Bitcoin transactions from a database and pushes them to the Bitcoin network when their **locktime** exceeds the **median time past** (MTP). It listens for Bitcoin block updates via ZMQ.
## Installation
To use `bal-pusher`, you need to compile and install Bitcoin with ZMQ (ZeroMQ) support enabled. Then, configure your Bitcoin node and `bal-pusher` to push the transactions.
### Prerequisites
1. **Bitcoin with ZMQ Support**:
Ensure that Bitcoin is compiled with ZMQ support. Add the following line to your `bitcoin.conf` file:
```
zmqpubhashblock=tcp://127.0.0.1:28332
```
2. **Install Rust and Cargo**:
If you haven't already installed Rust and Cargo, you can follow the official instructions to do so: [Rust Installation](https://www.rust-lang.org/tools/install).
## Configuration
`bal-pusher` can be configured using environment variables. If no configuration file is provided, a default configuration file will be created.
### Available Configuration Variables
| Variable | Description | Default |
|---------------------------------------|------------------------------------------|----------------------------------------------|
| `BAL_PUSHER_CONFIG_FILE` | Path to the configuration file. If the file does not exist, it will be created. | `$HOME/.config/bal-pusher/default-config.toml` |
| `BAL_PUSHER_DB_FILE` | Path to the SQLite3 database file. If the file does not exist, it will be created. | `bal.db` |
| `BAL_PUSHER_ZMQ_LISTENER` | ZMQ listener for Bitcoin updates. | `tcp://127.0.0.1:28332` |
| `BAL_PUSHER_BITCOIN_HOST` | Bitcoin server host for RPC connections. | `http://127.0.0.1` |
| `BAL_PUSHER_BITCOIN_PORT` | Bitcoin RPC server port. | `8332` |
| `BAL_PUSHER_BITCOIN_COOKIE_FILE` | Path to Bitcoin RPC cookie file. | `$HOME/.bitcoin/.cookie` |
| `BAL_PUSHER_BITCOIN_RPC_USER` | Bitcoin RPC username. | - |
| `BAL_PUSHER_BITCOIN_RPC_PASSWORD` | Bitcoin RPC password. | - |
| `BAL_PUSHER_SEND_STATS` | Contact welist to provide times | false |
| `WELIST_SERVER_URL` | welist server url to provide times | https://welist.bitcoin-afer.life |
| `BAL_SERVER_URL` | WillExecutor server url | - |
| `SSL_KEY_PATH` | Ed25519 private key pem file | `private_key.pem` |
## Running `bal-pusher`
Once the application is installed and configured, you can start `bal-pusher` by running the following command:
```bash
$ bal-pusher
```
This will start the service, which will listen for Bitcoin blocks via ZMQ and push transactions from the database when their locktime exceeds the median time past.

16
bal-pusher.env Normal file
View File

@ -0,0 +1,16 @@
RUST_LOG=info
BAL_PUSHER_DB_FILE=/home/bal/bal.db
BAL_PUSHER_BITCOIN_COOKIE_FILE=/home/bitcoin/.bitcoin/.cookie
BAL_PUSHER_REGTEST_COOKIE_FILE=/home/bitcoin/.bitcoin/regtest/.cookie
BAL_PUSHER_TESTNET_COOKIE_FILE=/home/bitcoin/.bitcoin/testnet3/.cookie
BAL_PUSHER_SIGNET_COOKIE_FILE=/home/bitcoin/.bitcoin/signet/.cookie
BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332
BAL_PUSHER_SEND_STATS=true
WELIST_SERVER_URL=http://welist.bitcoin-after.life
SSL_KEY_PATH=/home/bal/privkey.pem
#your server domain. do not add https or final / only domain.
BAL_SERVER_URL="https://we.bitcoin-after.life"

14
bal-pusher.sh Normal file
View File

@ -0,0 +1,14 @@
RUST_LOG=trace
BAL_PUSHER_DB_FILE="$(pwd)/bal.db"
#export BAL_PUSHER_BITCOIN_COOKIE_FILE=/~/.bitcoin/.cookie
#export BAL_PUSHER_REGTEST_COOKIE_FILE=/~/.bitcoin/regtest/.cookie
#export BAL_PUSHER_TESTNET_COOKIE_FILE=/~/.bitcoin/testnet3/.cookie
#export BAL_PUSHER_SIGNET_COOKIE_FILE=/~/.bitcoin/signet/.cookie
BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332
export BAL_PUSHER_SEND_STATS=true
export WELIST_SERVER_URL=http://localhost:8085
export BAL_SERVER_URL="http://127.0.0.1:9133"
export SSL_KEY_PATH="$(pwd)/private_key.pem"
cargo run --bin=bal-pusher regtest

View File

@ -1,11 +1,15 @@
RUST_LOG=info
BAL_SERVER_DB_FILE=/path/to/bal.db
BAL_SERVER_DB_FILE="/home/bal/bal.db"
BAL_SERVER_INFO="BAL server test willexecutor"
BAL_SERVER_BIND_ADDRESS=127.0.0.1
BAL_SERVER_BITCOIN_ADDRESS=your mainnet address
BAL_SERVER_BITCOIN_FEE=100000
BAL_SERVER_REGTEST_ADDRESS=
BAL_SERVER_REGTEST_FEE=100000
BAL_SERVER_TESTNET_ADDRESS=
BAL_SERVER_TESTNET_FEE=100000
BAL_SERVER_SIGNET_ADDRESS=
BAL_SERVER_SIGNET_FEE=100000
BAL_SERVER_BIND_PORT=9133
BAL_SERVER_BITCOIN_ADDRESS="your bitcoin or xpub to recive payments here"
BAL_SERVER_BITCOIN_FIXED_FEE=50000
BAL_SERVER_PUB_KEY_PATH="/home/bal/public_key.pem"
BAL_SERVER_REGTEST_ADDRESS="vpub5UhLrYG1qQjnJhvJgBdqgpznyH11mxW9hwBYxf3KhfdjiupCFPUVDvgwpeZ9Wj5YUJXjKjXjy7DSbJNBW1sXbKwARiaphm1UjHYy3mKvTG4"
BAL_SERVER_REGTEST_FEE=5000
#BAL_SERVER_TESTNET_ADDRESS=
#BAL_SERVER_TESTNET_FEE=100000
#BAL_SERVER_SIGNET_ADDRESS=
#BAL_SERVER_SIGNET_FEE=100000

View File

@ -4,34 +4,46 @@ After=network.target
[Service]
EnvironmentFile=/etc/bal/bal-server.env
# Service execution
###################
EnvironmentFile=/home/bal/bal-server.env
ExecStart=/usr/local/bin/bal-server
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=bal-server
# Process management
####################
Type=simple
PIDFile=/run/bal-server/bal-server.pid
Restart=always
TimeoutSec=300
RestartSec=30
# Directory creation and permissions
####################################
User=bal
UMask=0027
# /run/bal-server
RuntimeDirectory=bal-server
RuntimeDirectoryMode=0710
PrivateTmp=true
# Hardening measures
####################
# Mount /usr, /boot/ and /etc read-only for the process.
ProtectSystem=full
# Disallow the process and all of its children to gain
# new privileges through execve().
NoNewPrivileges=true
# Use a new /dev namespace only populated with API pseudo devices
# such as /dev/null, /dev/zero and /dev/random.
PrivateDevices=true
# Deny the creation of writable and executable memory mappings.
MemoryDenyWriteExecute=true
[Install]

24
bal-server.sh Normal file
View File

@ -0,0 +1,24 @@
WORKING_DIR=$(pwd)
if [ ! -f "$WORKING_DIR/public_key.pem" ]; then
echo "creating keypairs"
openssl genpkey -algorithm ED25519 -out private_key.pem
openssl pkey -in private_key.pem -pubout -out public_key.pem
fi
export RUST_LOG="trace"
export BAL_SERVER_DB_FILE="$WORKING_DIR/bal.db"
export BAL_SERVER_INFO="BAL devel willexecutor server"
export BAL_SERVER_BIND_ADDRESS="127.0.0.1"
export BAL_SERVER_BIND_PORT=9133
export BAL_SERVER_PUB_KEY_PATH="$WORKING_DIR/public_key.pem"
#export BAL_SERVER_BITCOIN_ADDRESS="your bitcoin address or xpub to recive payments here"
#export BAL_SERVER_BITCOIN_FIXED_FEE=50000
export BAL_SERVER_REGTEST_ADDRESS="vpub5UhLrYG1qQjnJhvJgBdqgpznyH11mxW9hwBYxf3KhfdjiupCFPUVDvgwpeZ9Wj5YUJXjKjXjy7DSbJNBW1sXbKwARiaphm1UjHYy3mKvTG4"
export BAL_SERVER_REGTEST_FEE=5000
#export BAL_SERVER_TESTNET_ADDRESS=
#export BAL_SERVER_TESTNET_FEE=100000
#export BAL_SERVER_SIGNET_ADDRESS=
#export BAL_SERVER_SIGNET_FEE=100000
cargo run --bin=bal-server

599
src/bin/bal-pusher.rs Normal file
View File

@ -0,0 +1,599 @@
extern crate bitcoincore_rpc;
extern crate zmq;
use bitcoin::Network;
use bitcoincore_rpc::{bitcoin, Auth, Client, Error, RpcApi};
use bitcoincore_rpc_json::GetBlockchainInfoResult;
use sqlite::{Value};
use serde::Serialize;
use serde::Deserialize;
use serde_json::json;
use std::env;
use log::{info,warn,error,trace,debug};
use zmq::{Context, Socket, DEALER, DONTWAIT};
use std::str;
use std::{thread, time::Duration};
use std::collections::HashMap;
use byteorder::{LittleEndian, ReadBytesExt};
use std::io::Cursor;
use hex;
use std::error::Error as StdError;
use reqwest::Client as rClient;
use openssl::hash::MessageDigest;
use openssl::pkey::{PKey};
use openssl::sign::Signer;
use openssl::sign::Verifier;
use base64::{engine::general_purpose, Engine as _};
use std::fs;
use std::time::Instant;
const LOCKTIME_THRESHOLD:i64 = 5000000;
const VERSION:&str = "0.0.2";
#[derive(Debug, Clone,Serialize, Deserialize)]
struct MyConfig {
zmq_listener: String,
requests_file: String,
db_file: String,
bitcoin_dir: String,
regtest: NetworkParams,
testnet: NetworkParams,
signet: NetworkParams,
mainnet: NetworkParams,
send_stats: bool,
url: String,
secret_code: String,
ssl_key_path: String
}
impl Default for MyConfig {
fn default() -> Self {
MyConfig {
zmq_listener: "tcp://127.0.0.1:28332".to_string(),
requests_file: "rawrequests.log".to_string(),
db_file: "../bal.db".to_string(),
bitcoin_dir: "".to_string(),
regtest: get_network_params_default(Network::Regtest),
testnet: get_network_params_default(Network::Testnet),
signet: get_network_params_default(Network::Signet),
mainnet: get_network_params_default(Network::Bitcoin),
send_stats: false,
url: "http://localhost/".to_string(),
secret_code: "xxx".to_string(),
ssl_key_path: "privkey.pem".to_string(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct NetworkParams {
host: String,
port: u16,
dir_path: String,
db_field: String,
cookie_file: String,
rpc_user: String,
rpc_pass: String,
}
fn get_network_params(cfg: &MyConfig,network:Network)-> &NetworkParams{
match network{
Network::Testnet => &cfg.testnet,
Network::Signet => &cfg.signet,
Network::Regtest => &cfg.regtest,
_ => &cfg.mainnet
}
}
fn get_network_params_default(network:Network) -> NetworkParams{
match network {
Network::Testnet => NetworkParams{
host: "http://i27.0.0.1".to_string(),
port: 18332,
dir_path: "testnet3/".to_string(),
db_field: "testnet".to_string(),
cookie_file: "".to_string(),
rpc_user: "".to_string(),
rpc_pass: "".to_string(),
},
Network::Signet => NetworkParams{
host: "http://127.0.0.1".to_string(),
port: 18332,
dir_path: "signet/".to_string(),
db_field: "signet".to_string(),
cookie_file: "".to_string(),
rpc_user: "".to_string(),
rpc_pass: "".to_string(),
},
Network::Regtest => NetworkParams{
host: "http://127.0.0.1".to_string(),
port: 18443,
dir_path: "regtest/".to_string(),
db_field: "regtest".to_string(),
cookie_file: "".to_string(),
rpc_user: "".to_string(),
rpc_pass: "".to_string(),
},
_ => NetworkParams{
host: "http://127.0.0.1".to_string(),
port: 8332,
dir_path: "".to_string(),
db_field: "bitcoin".to_string(),
cookie_file: "".to_string(),
rpc_user: "".to_string(),
rpc_pass: "".to_string(),
},
}
}
fn get_cookie_filename(network: &NetworkParams) ->Result<String,Box<dyn StdError>>{
if network.cookie_file !=""{
Ok(network.cookie_file.clone())
}else{
match env::var_os("HOME") {
Some(home) => {
match home.to_str(){
Some(home_str) => {
let cookie_file_path = format!("{}/.bitcoin/{}.cookie",home_str, network.dir_path);
Ok(cookie_file_path)
},
None => Err("wrong HOME value".into())
}
},
None => Err("Please Set HOME environment variable".into())
}
}
}
fn get_client_from_username(url: &String, network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{
if network.rpc_user != "" {
match Client::new(&url[..],Auth::UserPass(network.rpc_user.to_string(),network.rpc_pass.to_string())){
Ok(client) => match client.get_blockchain_info(){
Ok(bcinfo) => Ok((client,bcinfo)),
Err(err) => Err(err.into())
}
Err(err)=>Err(err.into())
}
}else{
Err("Failed".into())
}
}
fn get_client_from_cookie(url: &String,network: &NetworkParams)->Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{
match get_cookie_filename(network){
Ok(cookie) => {
match Client::new(&url[..], Auth::CookieFile(cookie.into())) {
Ok(client) => {
match client.get_blockchain_info(){
Ok(bcinfo) => {
Ok((client,bcinfo))
},
Err(err) => {
Err(err.into())
}
}
},
Err(err)=>Err(err.into())
}
},
Err(err)=>Err(err.into())
}
}
fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{
let url = format!("{}:{}/",network.host,&network.port);
match get_client_from_username(&url,network){
Ok(client) =>{Ok(client)},
Err(_) =>{
match get_client_from_cookie(&url,&network){
Ok(client)=>{
Ok(client)
},
Err(err)=> Err(err.into())
}
}
}
}
async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Error> {
/*let url = args.next().expect("Usage: <rpc_url> <username> <password>");
let user = args.next().expect("no user given");
let pass = args.next().expect("no pass given");
*/
//let network = Network::Regtest
match get_client(network_params){
Ok((rpc,bcinfo)) => {
info!("connected");
//let best_block_hash = rpc.get_best_block_hash()?;
//info!("best block hash: {}", best_block_hash);
//let bestblockcount = rpc.get_block_count()?;
//info!("best block height: {}", bestblockcount);
//let best_block_hash_by_height = rpc.get_block_hash(bestblockcount)?;
//info!("best block hash by height: {}", best_block_hash_by_height);
//assert_eq!(best_block_hash_by_height, best_block_hash);
//let from_block= std::cmp::max(0, bestblockcount - 11);
//let mut time_sum:u64=0;
//for i in from_block..bestblockcount{
// let hash = rpc.get_block_hash(i).unwrap();
// let block: bitcoin::Block = rpc.get_by_id(&hash).unwrap();
// time_sum += <u32 as Into<u64>>::into(block.header.time);
//}
//let average_time = time_sum/11;
info!("median time: {}",bcinfo.median_time);
//info!("height time: {}",bcinfo.median_time);
info!("blocks: {}",bcinfo.blocks);
debug!("best block hash: {}",bcinfo.best_block_hash);
let average_time = bcinfo.median_time;
let db = sqlite::open(&cfg.db_file).unwrap();
let sqlquery = "SELECT * FROM tbl_tx WHERE network = :network AND status = :status AND ( locktime < :bestblock_height OR locktime > :locktime_threshold AND locktime < :bestblock_time);";
let query_tx = db.prepare(sqlquery).unwrap().into_iter();
trace!("query_tx: {}",sqlquery);
trace!(":locktime_threshold: {}", LOCKTIME_THRESHOLD );
trace!(":bestblock_time: {}", average_time);
trace!(":bestblock_height: {}", bcinfo.blocks);
trace!(":network: {}", network_params.db_field.clone());
trace!(":status: {}", 0);
//let query_tx = db.prepare("SELECT * FROM tbl_tx where status = :status").unwrap().into_iter();
let mut pushed_txs:Vec<String> = Vec::new();
let mut invalid_txs: std::collections::HashMap<String, String> = HashMap::new();
for row in query_tx.bind::<&[(_, Value)]>(&[
(":locktime_threshold", (LOCKTIME_THRESHOLD as i64).into()),
(":bestblock_time", (average_time as i64).into()),
(":bestblock_height", (bcinfo.blocks as i64).into()),
(":network", network_params.db_field.clone().into()),
(":status", 0.into()),
][..])
.unwrap()
.map(|row| row.unwrap())
{
let tx = row.read::<&str, _>("tx");
let txid = row.read::<&str, _>("txid");
let locktime = row.read::<i64,_>("locktime");
info!("to be pushed: {}: {}",txid, locktime);
match rpc.send_raw_transaction(tx){
Ok(o) => {
/*let mut file = OpenOptions::new()
.append(true) // Set the append option
.create(true) // Create the file if it doesn't exist
.open("valid_txs")?;
let data = format!("{}\t:\t{}\t:\t{}\n",txid,average_time,locktime);
file.write_all(data.as_bytes())?;
drop(file);
*/
info!("tx: {} pusshata PUSHED\n{}",txid,o);
pushed_txs.push(txid.to_string());
},
Err(err) => {
/*let mut file = OpenOptions::new()
.append(true) // Set the append option
.create(true) // Create the file if it doesn't exist
.open("/home/bal/invalid_txs")?;
let data = format!("{}:\t{}\t:\t{}\t:\t{}\n",txid,err,average_time,locktime);
file.write_all(data.as_bytes())?;
drop(file);
*/
warn!("Error: {}\n{}",err,txid);
//store err in invalid_txs
invalid_txs.insert(txid.to_string(), err.to_string());
},
};
}
if pushed_txs.len() > 0 {
let sql = format!("UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",pushed_txs.join("','"));
trace!("sqlok: {}",&sql);
let _ = db.execute(&sql);
}
if invalid_txs.len() > 0 {
for (txid,txerr) in &invalid_txs{
//let _ = db.execute(format!("UPDATE tbl_tx SET status = 2 WHERE txid in ('{}'Yp);",invalid_txs.join("','")));
let sql = format!("UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'");
trace!("sqlerror: {}",&sql);
let _ = db.execute(&sql);
}
}
let _ = send_stats_report(cfg, bcinfo).await;
}
Err(erx)=>{
panic!("impossible to get client {}",erx)
}
}
Ok(())
}
async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> Result<(),reqwest::Error>{
if cfg.send_stats {
debug!("sending report to welist");
let welist_url=env::var("WELIST_SERVER_URL").unwrap_or("https://welist.bitcoin-after.life".to_string());
let client = rClient::new();
let url = format!("{}/ping",welist_url);
let chain=bcinfo.chain.to_string().to_lowercase();
let message = format!("{0}{1}{2}{3}{4}",cfg.url,chain,bcinfo.blocks,bcinfo.median_time,bcinfo.best_block_hash);
let sign = sign_message(cfg.ssl_key_path.as_str(),&message.as_str());
let response = client.post(url)
.header("User-Agent", format!("bal-pusher/{}",VERSION))
.json(&json!(
{
"url": cfg.url,
"chain": chain,
"height": bcinfo.blocks,
"median_time": bcinfo.median_time,
"last_block_hash": bcinfo.best_block_hash,
"signature": sign,
}))
.send().await?;
let body = &(response.text().await?);
info!("Report to welist({})\tSent: {}", welist_url,body);
}else {
debug!("Not sending stats");
}
Ok(())
}
fn sign_message(private_key_path: &str, message: &str) -> String {
let key_data = fs::read(private_key_path).unwrap();
let private_key = PKey::private_key_from_pem(&key_data).unwrap();
let mut signer = Signer::new_without_digest(&private_key).unwrap();
let signature = signer.sign_oneshot_to_vec(message.as_bytes()).unwrap();
let signature_b64 = general_purpose::STANDARD.encode(&signature);
signature_b64
}
fn parse_env(cfg: &mut MyConfig){
match env::var("BAL_PUSHER_ZMQ_LISTENER") {
Ok(value) => {
cfg.zmq_listener = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_REQUEST_FILE") {
Ok(value) => {
cfg.requests_file = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_DB_FILE") {
Ok(value) => {
cfg.db_file = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_BITCOIN_DIR") {
Ok(value) => {
cfg.bitcoin_dir = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_SEND_STATS") {
Ok(value) => {
cfg.send_stats = value.parse::<bool>().unwrap();
},
Err(_) => {},
}
match env::var("BAL_SERVER_URL") {
Ok(value) => {
cfg.url= value;},
Err(_) => {},
}
match env::var("WELIST_SECRET_CODE") {
Ok(value) => {
cfg.secret_code = value;},
Err(_) => {},
}
match env::var("SSL_KEY_PATH") {
Ok(value) => {
cfg.ssl_key_path = value;},
Err(_) => {},
}
cfg.regtest = parse_env_netconfig(cfg,"regtest");
cfg.signet = parse_env_netconfig(cfg,"signet");
cfg.testnet = parse_env_netconfig(cfg,"testnet");
drop(parse_env_netconfig(cfg,"bitcoin"));
}
fn parse_env_netconfig(cfg_lock: &mut MyConfig, chain: &str) -> NetworkParams{
//fn parse_env_netconfig(cfg_lock: &MutexGuard<MyConfig>, chain: &str) -> &NetworkParams{
let cfg = match chain{
"regtest" => &mut cfg_lock.regtest,
"signet" => &mut cfg_lock.signet,
"testnet" => &mut cfg_lock.testnet,
&_ => &mut cfg_lock.mainnet,
};
match env::var(format!("BAL_PUSHER_{}_HOST",chain.to_uppercase())) {
Ok(value) => { cfg.host= value; },
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_PORT",chain.to_uppercase())) {
Ok(value) => {
match value.parse::<u64>(){
Ok(value) =>{ cfg.port = value.try_into().unwrap(); },
Err(_) => {},
}
}
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_DIR_PATH",chain.to_uppercase())) {
Ok(value) => { cfg.dir_path = value; },
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_DB_FIELD",chain.to_uppercase())) {
Ok(value) => { cfg.db_field = value; },
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_COOKIE_FILE",chain.to_uppercase())) {
Ok(value) => {
cfg.cookie_file = value; },
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_RPC_USER",chain.to_uppercase())) {
Ok(value) => { cfg.rpc_user = value; },
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_RPC_PASSWORD",chain.to_uppercase())) {
Ok(value) => { cfg.rpc_pass = value; },
Err(_) => {},
}
cfg.clone()
}
fn get_default_config()-> MyConfig {
let file = confy::get_configuration_file_path("bal-pusher",None).expect("Error while getting path");
info!("Default configuration file path is: {:#?}", file);
confy::load("bal-pusher",None).expect("cant_load")
}
fn check_zmq_connection(endpoint: &str) -> bool {
trace!("check zmq connection");
let context = Context::new();
let socket = match context.socket(DEALER) {
Ok(sock) => sock,
Err(_) => return false,
};
if socket.connect(endpoint).is_err() {
return false;
}
// Try to send an empty message non-blocking
socket.send("", DONTWAIT).is_ok()
}
// Add this struct to monitor connection health
struct ConnectionMonitor {
last_message_time: Instant,
timeout: Duration,
consecutive_timeouts: u32,
max_consecutive_timeouts: u32,
}
impl ConnectionMonitor {
fn new(timeout_secs: u64, max_timeouts: u32) -> Self {
Self {
last_message_time: Instant::now(),
timeout: Duration::from_secs(timeout_secs),
consecutive_timeouts: 0,
max_consecutive_timeouts: max_timeouts,
}
}
fn update(&mut self) {
self.last_message_time = Instant::now();
self.consecutive_timeouts = 0;
}
fn check_connection(&mut self) -> ConnectionStatus {
let elapsed = self.last_message_time.elapsed();
if elapsed > self.timeout {
self.consecutive_timeouts += 1;
if self.consecutive_timeouts >= self.max_consecutive_timeouts {
ConnectionStatus::Lost(elapsed)
} else {
ConnectionStatus::Warning(elapsed)
}
} else {
ConnectionStatus::Healthy
}
}
fn reset(&mut self) {
self.consecutive_timeouts = 0;
self.last_message_time = Instant::now();
}
}
enum ConnectionStatus {
Healthy,
Warning(Duration),
Lost(Duration),
}
#[tokio::main]
async fn main()-> std::io::Result<()>{
env_logger::init();
let mut cfg: MyConfig = match env::var("BAL_PUSHER_CONFIG_FILE") {
Ok(value) => {
match confy::load_path(&value){
Ok(val) => {
info!("The configuration file path is: {:#?}", value);
val
},
Err(err) => {
error!("{}",err);
get_default_config()
}
}
},
Err(_) => {
get_default_config()
},
};
parse_env(&mut cfg);
let mut args = std::env::args();
let _exe_name = args.next().unwrap();
let arg_network = match args.next(){
Some(nargs) => nargs,
None => "bitcoin".to_string()
};
let network = match arg_network.as_str(){
"testnet" => Network::Testnet,
"signet" => Network::Signet,
"regtest" => Network::Regtest,
_ => Network::Bitcoin,
};
info!("Network: {}",arg_network);
let network_params = get_network_params(&cfg,network);
let context = Context::new();
let socket: Socket = context.socket(zmq::SUB).unwrap();
let zmq_address = cfg.zmq_listener.clone();
info!("zmq listening on: {}",zmq_address);
socket.connect(&zmq_address).unwrap();
socket.set_subscribe(b"").unwrap();
let _ = main_result(&cfg,network_params).await;
info!("waiting new blocks..");
let mut last_seq:Vec<u8>=[0;4].to_vec();
let mut counter=0;
let max=100;
loop {
let message = socket.recv_multipart(0).unwrap();
let topic = message[0].clone();
let body = message[1].clone();
let seq = message[2].clone();
last_seq = seq;
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
trace!("ZMQ:GET BODY: {}", hex::encode(&body));
if topic == b"hashblock" {
info!("NEW BLOCK: {}", hex::encode(&body));
let _ = main_result(&cfg,network_params).await;
}
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
}
}
fn seq_to_str(seq:&Vec<u8>) -> String{
if seq.len()==4{
let mut rdr = Cursor::new(seq);
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
return sequence.to_string();
}
"Unknown".to_string()
}

594
src/bin/bal-server.rs Normal file
View File

@ -0,0 +1,594 @@
use bytes::Bytes;
use http_body_util::{ combinators::BoxBody, BodyExt, Empty, Full };
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{ Method, Request, Response, StatusCode };
use tokio::net::TcpListener;
use hyper_util::rt::TokioIo;
use std::net::IpAddr;
use std::env;
//use std::time::{SystemTime,UNIX_EPOCH};
use std::fs;
use std::sync::{ Arc, Mutex, MutexGuard };
//use std::net::SocketAddr;
use std::collections::HashMap;
use sqlite::{ State, Value, Connection };
use bitcoin::{ consensus, Transaction, Network };
use hex_conservative::FromHex;
use regex::Regex;
use serde::{ Serialize, Deserialize};
use log::{ info, error, trace, debug};
use serde_json;
use chrono::Utc;
#[path = "../db.rs"]
mod db;
use crate::db::{ create_database, get_next_address_index, insert_xpub, save_new_address, get_last_used_address_by_ip, execute_insert };
#[path = "../xpub.rs"]
mod xpub;
use crate::xpub::new_address_from_xpub;
const VERSION:&str="0.2.2";
const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"];
#[derive(Debug, Clone,Serialize, Deserialize)]
struct NetConfig {
address: String,
fixed_fee: u64,
xpub: bool,
network: Network,
name: String,
enabled: bool,
}
impl NetConfig {
fn default_network(name:String, network: Network) -> Self {
NetConfig {
address: "".to_string(),
fixed_fee: 50000,
xpub: false,
name,
network,
enabled: false,
}
}
}
#[derive(Debug, Serialize, Deserialize,Clone)]
struct MyConfig {
regtest: NetConfig,
signet: NetConfig,
testnet: NetConfig,
mainnet: NetConfig,
info: String,
bind_address: String,
bind_port: u16, // Changed to u16 for port numbers
db_file: String,
pub_key_path: String,
}
#[derive(Debug,Serialize, Deserialize)]
pub struct Info {
pub address: String,
pub base_fee: u64,
pub chain: String,
pub info: String,
pub version: String
}
impl Default for MyConfig {
fn default() -> Self {
MyConfig {
regtest: NetConfig::default_network("regtest".to_string(), Network::Regtest),
signet: NetConfig::default_network("signet".to_string(), Network::Signet),
testnet: NetConfig::default_network("testnet".to_string(), Network::Testnet),
mainnet: NetConfig::default_network("bitcoin".to_string(), Network::Bitcoin),
bind_address: "127.0.0.1".to_string(),
bind_port: 9137,
db_file: "bal.db".to_string(),
info: "Will Executor Server".to_string(),
pub_key_path: "public_key.pem".to_string(),
}
}
}
impl MyConfig {
fn get_net_config(&self, param: &str) -> &NetConfig{
match param {
"regtest" => &self.regtest,
"testnet" => &self.testnet,
"signet" => &self.signet,
_ => &self.mainnet,
}
}
}
async fn echo_version(
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
Ok(Response::new(full(VERSION)))
}
async fn echo_home(cfg: &MyConfig
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
debug!("echo_home: {}", cfg.info );
Ok(Response::new(full(cfg.info.clone())))
}
async fn echo_pub_key(
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let pub_key = fs::read_to_string(&cfg.pub_key_path)
.expect(format!("Failed to read public key file {}",cfg.pub_key_path).as_str());
Ok(Response::new(full(pub_key)))
}
async fn echo_info(
param: &str,
cfg: &MyConfig,
remote_addr: String,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo info!!!{}",param);
let netconfig=MyConfig::get_net_config(cfg,param);
if !netconfig.enabled {
debug!("network disabled {}",param);
return Ok(Response::new(full("network disabled")));
}
let address = match netconfig.xpub{
false => {
let address = netconfig.address.to_string();
trace!("is address: {}",&address);
address
},
true => {
let db = sqlite::open(&cfg.db_file).unwrap();
match get_last_used_address_by_ip(&db,&netconfig.name,&netconfig.address,&remote_addr){
Some(address)=>address,
None => {
let next = get_next_address_index(&db,&netconfig.name,&netconfig.address);
let address = new_address_from_xpub(&netconfig.address,next.1,netconfig.network).unwrap();
save_new_address(&db,next.0,&address.0,&address.1,&remote_addr);
debug!("save new address {} {}",address.0,address.1);
trace!("next {} {}",next.0,next.1);
address.0
}
}
}
};
let info = Info{
address,
base_fee: netconfig.fixed_fee,
chain: netconfig.network.to_string(),
info: cfg.info.to_string(),
version: VERSION.to_string()
};
trace!("address: {:#?}",info);
match serde_json::to_string(&info){
Ok(json_data) => {
debug!("echo info reply: {}", json_data);
return Ok(Response::new(full(json_data)));
},
Err(err) => Ok(Response::new(full(format!("error:{}",err))))
}
}
async fn echo_search(whole_body: &Bytes,
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo search!!!");
let strbody = std::str::from_utf8(whole_body).unwrap();
info!("{}",strbody);
let mut response = Response::new(full("Bad data received".to_owned()));
*response.status_mut() = StatusCode::BAD_REQUEST;
if !strbody.is_empty() && strbody.len()<=70 {
let db = sqlite::open(&cfg.db_file).unwrap();
let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ? LIMIT 1").unwrap();
statement.bind((1, strbody)).unwrap();
if let Ok(State::Row) = statement.next() {
let mut response_data = HashMap::new();
match statement.read::<String, _>("status") {
Ok(value) => response_data.insert("status", value),
Err(e) => {
error!("Error reading status: {}", e);
//response_data.insert("status", "Error".to_string())
None
}
};
// Read the transaction (tx)
match statement.read::<String, _>("tx") {
Ok(value) => response_data.insert("tx", value),
Err(e) => {
error!("Error reading tx: {}", e);
//response_data.insert("tx", "Error".to_string())
None
}
};
match statement.read::<String, _>("our_address") {
Ok(value) => response_data.insert("our_address", value),
Err(e) => {
error!("Error reading address: {}", e);
//response_data.insert("tx", "Error".to_string())
None
}
};
match statement.read::<String, _>("our_fees") {
Ok(value) => response_data.insert("our_fees", value),
Err(e) => {
error!("Error reading fees: {}", e);
//response_data.insert("tx", "Error".to_string())
None
}
};
// Read the request id (reqid)
match statement.read::<String, _>("reqid") {
Ok(value) => response_data.insert("time", value),
Err(e) => {
error!("Error reading reqid: {}", e);
//response_data.insert("time", "Error".to_string())
None
}
};
response = match serde_json::to_string(&response_data){
Ok(json_data) => Response::new(full(json_data)),
Err(_) => { response }
};
return Ok(response);
}
}
Ok(response)
}
async fn echo_push(whole_body: &Bytes,
cfg: &MyConfig,
param: &str,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
//let whole_body = req.collect().await?.to_bytes();
let strbody = std::str::from_utf8(whole_body).unwrap();
let mut response = Response::new(full("Bad data received".to_owned()));
let mut response_not_enable = Response::new(full("Network not enabled".to_owned()));
*response.status_mut() = StatusCode::BAD_REQUEST;
*response_not_enable.status_mut()=StatusCode::BAD_REQUEST;
let netconfig = MyConfig::get_net_config(cfg,param);
if !netconfig.enabled{
return Ok(response_not_enable);
}
let req_time = Utc::now().timestamp_nanos_opt().unwrap(); // Returns i64
let db = sqlite::open(&cfg.db_file).unwrap();
let lines = strbody.split("\n");
let sqltxshead = "INSERT INTO tbl_tx (txid, wtxid, ntxid, tx, locktime, reqid, network, our_address, our_fees)".to_string();
let mut sqltxs = "".to_string();
let sqlinpshead = "INSERT INTO tbl_inp (txid, in_txid, in_vout )".to_string();
let mut sqlinps = "".to_string();
let sqloutshead = "INSERT INTO tbl_out (txid, vout, script_pubkey, amount )".to_string();
let mut sqlouts = "".to_string();
let mut union_tx = true;
let mut union_inps = true;
let mut union_outs = true;
let mut already_present = false;
let mut ptx:Vec<(usize, Value)> = vec![];
let mut pinps:Vec<(usize, Value)> = vec![];
let mut pouts:Vec<(usize, Value)> = vec![];
let mut linenum = 1;
let mut lineinp = 1;
let mut lineout = 1;
for line in lines {
if line.is_empty(){
trace!("line len is: {}",line.len());
continue
}
let linea = format!("{req_time}:{line}");
info!("New Tx: {}", linea);
let raw_tx = match Vec::<u8>::from_hex(line) {
Ok(raw_tx) => raw_tx,
Err(err) => {
error!("rawtx error: {}",err);
continue
}
};
if !raw_tx.is_empty(){
trace!("len: {}",raw_tx.len());
let tx: Transaction = match consensus::deserialize(&raw_tx){
Ok(tx) => tx,
Err(err) => {error!("error: unable to parse tx: {}\n{}",line,err);continue}
};
let txid = tx.compute_txid().to_string();
trace!("txid: {}",txid);
let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ?").unwrap();
statement.bind((1,&txid[..])).unwrap();
if let Ok(State::Row) = statement.next() {
trace!("already present");
already_present=true;
continue;
}
let ntxid = tx.compute_ntxid();
let wtxid = tx.compute_wtxid();
let mut found = false;
let locktime = tx.lock_time;
let mut our_address:String = "".to_string();
let mut our_fees:u64 = 0;
for input in tx.input{
if !union_inps {
sqlinps = format!("{sqlinps} UNION ALL");
}else{
union_inps = false;
}
sqlinps = format!("{sqlinps} SELECT ?, ?, ?");
pinps.push((lineinp,Value::String(txid.to_string())));
pinps.push((lineinp+1,Value::String(input.previous_output.txid.to_string())));
pinps.push((lineinp+2,Value::String(input.previous_output.vout.to_string())));
lineinp += 3;
}
if netconfig.fixed_fee ==0 {
found = true;
}
for (idx,output) in tx.output.into_iter().enumerate(){
let script_pubkey = output.script_pubkey;
let address = match bitcoin::Address::from_script(script_pubkey.as_script(), netconfig.network){
Ok(address) => address.to_string(),
Err(_) => String::new(),
};
let amount = output.value;
our_fees = netconfig.fixed_fee;//search wllexecutor output
if netconfig.xpub{
let sql="select * from tbl_address where address=?";
let mut stmt = db.prepare(sql).expect("failed to fetch addresses");
stmt.bind((1,Value::String(address.to_string()))).unwrap();
if let Ok(State::Row) = stmt.next() {
our_address = address.to_string();
}
} else {
our_address = netconfig.address.to_string();
}
if address == our_address && amount.to_sat() >= netconfig.fixed_fee {
our_fees = amount.to_sat();
our_address = netconfig.address.to_string();
found = true;
trace!("address and fees are correct {}: {}",our_address,our_fees);
}
if !union_outs {
sqlouts = format!("{sqlouts} UNION ALL");
}else{
union_outs = false;
}
sqlouts = format!("{sqlouts} SELECT ?, ?, ?, ?");
pouts.push((lineout,Value::String(txid.to_string())));
pouts.push((lineout+1,Value::Integer(idx.try_into().unwrap())));
pouts.push((lineout+2,Value::String(script_pubkey.to_string())));
pouts.push((lineout+3,Value::Integer(amount.to_sat().try_into().unwrap())));
lineout += 4;
}
if !found {
error!("willexecutor output not found ");
return Ok(response)
} else {
if !union_tx {
sqltxs = format!("{sqltxs} UNION ALL");
}else{
union_tx = false;
}
sqltxs = format!("{sqltxs} SELECT ?, ?, ?, ?, ?, ?, ?, ?, ?");
ptx.push((linenum,Value::String(txid)));
ptx.push((linenum+1,Value::String(wtxid.to_string())));
ptx.push((linenum+2,Value::String(ntxid.to_string())));
ptx.push((linenum+3,Value::String(line.to_string())));
ptx.push((linenum+4,Value::String(locktime.to_string())));
ptx.push((linenum+5,Value::String(req_time.to_string())));
ptx.push((linenum+6,Value::String(netconfig.name.to_string())));
ptx.push((linenum+7,Value::String(our_address.to_string())));
ptx.push((linenum+8,Value::String(our_fees.to_string())));
linenum += 9;
}
}else{
trace!("rawTx len is: {}",raw_tx.len());
debug!("{}",&sqltxs);
}
}
if sqltxs.is_empty() && already_present {
return Ok(Response::new(full("already present")))
}
let sqltxs = format!("{}{};", sqltxshead, sqltxs);
let sqlinps = format!("{}{};", sqlinpshead, sqlinps);
let sqlouts = format!("{}{};", sqloutshead, sqlouts);
if let Err(err) = execute_insert(&db, sqltxs, ptx, sqlinps, pinps, sqlouts, pouts){
debug!("{}",err);
return Ok(response);
}
Ok(Response::new(full("thx")))
}
fn match_uri<'a>(path: &str, uri: &'a str) -> Option<&'a str> {
let re = Regex::new(path).unwrap();
if let Some(captures) = re.captures(uri) {
if let Some(param) = captures.name("param") {
return Some(param.as_str());
}
}
None
}
async fn echo(
req: Request<hyper::body::Incoming>,
cfg: &MyConfig,
ip: &String
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let mut not_found = Response::new(empty());
*not_found.status_mut() = StatusCode::NOT_FOUND;
let mut ret: Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> = Ok(not_found);
let uri = req.uri().path().to_string();
let remote_addr = req.headers().get("X-Real-IP").and_then(|value| value.to_str().ok()).and_then(|xff| xff.split(',').next()).map(|ip| ip.trim().to_string()).unwrap_or_else(|| ip.to_string());
trace!("{}: {}",remote_addr,uri);
match *req.method() {
// Serve some instructions at /
Method::POST => {
let whole_body = req.collect().await?.to_bytes();
if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/pushtxs$",uri.as_str()) {
//let whole_body = collect_body(req,512_000).await?;
ret = echo_push(&whole_body,cfg,param).await;
}
if uri=="/searchtx"{
//let whole_body = collect_body(req,64).await?;
ret = echo_search(&whole_body,cfg).await;
}
ret
}
Method::GET => {
if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/info$",uri.as_str()) {
ret = echo_info(param,cfg,remote_addr).await;
}
if uri=="/version"{
ret= echo_version().await;
}
if uri=="/.pub_key.pem" {
ret = echo_pub_key(cfg).await;
}
if uri=="/"{
ret = echo_home(cfg).await;
}
ret
}
// Return the 404 Not Found for other routes.
_ => ret
}
}
fn empty() -> BoxBody<Bytes, hyper::Error> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}
fn parse_env(cfg: &Arc<Mutex<MyConfig>>){
for (key, value) in std::env::vars() {
debug!("ENVIRONMENT {key}: {value}");
}
let mut cfg_lock = cfg.lock().unwrap();
if let Ok(value) = env::var("BAL_SERVER_DB_FILE") {
debug!("BAL_SERVER_DB_FILE: {}",value);
cfg_lock.db_file = value;
}
if let Ok(value) = env::var("BAL_SERVER_BIND_ADDRESS") {
debug!("BAL_SERVER_BIND_ADDRESS: {}",value);
cfg_lock.bind_address= value;
}
if let Ok(value) = env::var("BAL_SERVER_BIND_PORT") {
debug!("BAL_SERVER_BIND_PORT: {}",value);
if let Ok(v) = value.parse::<u16>(){
cfg_lock.bind_port = v;
}
}
if let Ok(value) = env::var("BAL_SERVER_PUB_KEY_PATH") {
debug!("BAL_SERVER_PUB_KEY_PATH: {}",value);
cfg_lock.pub_key_path = value;
}
if let Ok(value) = env::var("BAL_SERVER_INFO"){
debug!("BAL_SERVER_INFO: {}",value);
cfg_lock.info = value;
}
cfg_lock = parse_env_netconfig(cfg_lock,"regtest");
cfg_lock = parse_env_netconfig(cfg_lock,"signet");
cfg_lock = parse_env_netconfig(cfg_lock,"testnet");
drop(parse_env_netconfig(cfg_lock,"bitcoin"));
}
fn parse_env_netconfig<'a>(mut cfg_lock: MutexGuard<'a, MyConfig>, chain: &'a str) -> MutexGuard<'a, MyConfig>{
let cfg = match chain{
"regtest" => &mut cfg_lock.regtest,
"signet" => &mut cfg_lock.signet,
"testnet" => &mut cfg_lock.testnet,
&_ => &mut cfg_lock.mainnet,
};
if let Ok(value) = env::var(format!("BAL_SERVER_{}_ADDRESS",chain.to_uppercase())) {
debug!("BAL_SERVER_{}_ADDRESS: {}",chain.to_uppercase(),value);
cfg.address = value;
if cfg.address.len() > 5 {
if cfg.address[1..4] == *"pub" {
cfg.xpub=true;
trace!("is_xpub");
}
cfg.enabled=true;
}
}
if let Ok(value) = env::var(format!("BAL_SERVER_{}_FIXED_FEE",chain.to_uppercase())) {
debug!("BAL_SERVER_{}_FIXED_FEE: {}",chain.to_uppercase(),value);
if let Ok(v) = value.parse::<u64>(){
cfg.fixed_fee = v;
}
}
cfg_lock
}
fn init_network(db: &Connection, cfg: &MyConfig){
for network in NETWORKS{
let netconfig = MyConfig::get_net_config(cfg,network);
insert_xpub(db,&netconfig.name,&netconfig.address);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
env_logger::init();
let cfg: Arc<Mutex<MyConfig>> =Arc::<Mutex<MyConfig>>::default();
parse_env(&cfg);
let cfg_lock = cfg.lock().unwrap();
let db = sqlite::open(&cfg_lock.db_file).unwrap();
create_database(&db);
init_network(&db,&cfg_lock);
let addr = cfg_lock.bind_address.to_string();
let addr: IpAddr = addr.parse()?;
let listener = TcpListener::bind((addr,cfg_lock.bind_port)).await?;
info!("Listening on http://{}:{}", addr,cfg_lock.bind_port);
loop {
let (stream, _) = listener.accept().await?;
let ip = stream.peer_addr()?.to_string().split(":").next().unwrap().to_string();
let io = TokioIo::new(stream);
tokio::task::spawn({
let cfg = cfg_lock.clone();
async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(|req: Request<hyper::body::Incoming>| async {
echo(req,&cfg,&ip).await
}))
.await
{
error!("Error serving connection: {:?}", err);
}
}
});
}
}

147
src/db.rs Normal file
View File

@ -0,0 +1,147 @@
use sqlite::{ Connection, Value, State, Error };
use log::{info, trace, error};
pub fn create_database(db: &Connection){
info!("database sanity check");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_tx (txid PRIMARY KEY, date_creation TIMESTAMP DEFAULT CURRENT_TIMESTAMP, date_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP, wtxid, ntxid, tx, locktime integer, network, network_fees, reqid, our_fees, our_address, status integer DEFAULT 0);");
let _ = db.execute("ALTER TABLE tbl_tx ADD COLUMN push_err TEXT");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_inp(id, txid, in_txid, in_vout);");
let _ = db.execute("CREATE UNIQUE INDEX ON tbl_inp(txid,in_txid,in_vout);");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_out(id, txid, script_pubkey, amount, vout);");
let _ = db.execute("CREATE UNIQUE INDEX ON tbl_out(txid, script_pubkey, amount, vout);");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_xpub (id INTEGER PRIMARY KEY , network TEXT, xpub TEXT, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP,path_idx INTEGER DEFAULT -1);");
let _ = db.execute("CREATE UNIQUE INDEX idx_xpub ON tbl_xpub (network, xpub)");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);");
let _ = db.execute("UPDATE tbl_tx set network='bitcoin' where network='mainnet');");
}
/*
pub fn get_xpub_id(db: &Connection, network: &String, xpub: &String) -> Option<i64>{
let mut stmt = db.prepare("SELECT * FROM tbl_xpub where network = ? and xpub = ?;").unwrap();
let _ = stmt.bind((1,Value::String(network.to_string()))).unwrap();
let _ = stmt.bind((2,Value::String(xpub.to_string()))).unwrap();
if let Ok(State::Row) = stmt.next(){
return Some(stmt.read::<i64, _>("id").unwrap());
} else {
return None;
}
}
*/
pub fn insert_xpub(db: &Connection, network: &String, xpub: &String){
if xpub != "" {
trace!("going to insert: {} xpub:{}", network, xpub);
let mut stmt = db.prepare ("INSERT INTO tbl_xpub(network,xpub) VALUES(?, ?);").unwrap();
let _ = stmt.bind((1,Value::String(network.to_string()))).unwrap();
let _ = stmt.bind((2,Value::String(xpub.to_string()))).unwrap();
let _ = stmt.next();
}
}
pub fn get_last_used_address_by_ip(db: &Connection, network: &String, xpub: &String, address: &String) -> Option<String>{
let mut stmt = db.prepare("SELECT tbl_address.address FROM tbl_xpub join tbl_address on(tbl_xpub.id = tbl_address.xpub) where tbl_xpub.network = ? and tbl_address.remote_address = ? and tbl_xpub.xpub = ? ORDER BY tbl_address.date_create DESC LIMIT 1;").unwrap();
let _ = stmt.bind((1,Value::String(network.to_string())));
let _ = stmt.bind((2,Value::String(address.to_string())));
let _ = stmt.bind((3,Value::String(xpub.to_string())));
if let Ok(State::Row) = stmt.next(){
let address = stmt.read::<String,_>("address").unwrap();
return Some(address);
}else{
return None;
}
}
pub fn get_next_address_index(db: &Connection, network: &String, xpub: &String) -> (i64,i64){
let mut stmt = db.prepare("UPDATE tbl_xpub SET path_idx = path_idx + 1 WHERE network = ? and xpub= ? RETURNING path_idx,id;").unwrap();
stmt.bind((1,Value::String(network.to_string()))).unwrap();
stmt.bind((2,Value::String(xpub.to_string()))).unwrap();
match stmt.next(){
Ok(State::Row) =>{
let next = stmt.read::<i64,_>("path_idx").unwrap();
let id = stmt.read::<i64,_>("id").unwrap();
return (id,next);
},Err(_)=> {
return (0,0);
},Ok(State::Done) =>{
return (0,0);
}
};
}
pub fn save_new_address(db: &Connection,xpub: i64,address: &String, path: &String,remote_addr: &String){
let mut stmt = db.prepare("INSERT INTO tbl_address(address,path,xpub,remote_address) VALUES(?,?,?,?);").unwrap();
stmt.bind((1,Value::String(address.to_string()))).unwrap();
stmt.bind((2,Value::String(path.to_string()))).unwrap();
stmt.bind((3,Value::Integer(xpub))).unwrap();
stmt.bind((4,Value::String(remote_addr.to_string()))).unwrap();
let _ = stmt.next();
}
pub fn execute_insert(db: &Connection,
sqltxs: String,
ptx: Vec<(usize, Value)>,
sqlinp: String,
pinp: Vec<(usize, Value)>,
sqlout: String,
pout: Vec<(usize, Value)>) -> Result<(),Error>{
let _ = db.execute("BEGIN TRANSACTION");
let mut stmt = db.prepare(sqltxs.as_str()).expect("failed to prepare sqltxs");
if let Err(err) = stmt.bind::<&[(_,Value)]>(&ptx[..]) {
error!("error binding transaction parameters: {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
}
if let Err(err) = stmt.next() {
error!("error inserting transactions {}",err);
let _ = db.execute("ROLLBACK");
}else{
let mut stmt = db.prepare(sqlinp.as_str()).expect("failed to prepare sqlinp");
if let Err(err) = stmt.bind::<&[(_,Value)]>(&pinp[..]) {
error!("error binding inputs parameters {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
}
if let Err(err) = stmt.next() {
error!("error inserting inputs {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
}else{
let mut stmt = db.prepare(sqlout.as_str()).expect("failed to prepare sqlout");
if let Err(err) = stmt.bind::<&[(_,Value)]>(&pout[..]) {
error!("error binding outs parameters {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
}
if let Err(err) = stmt.next() {
error!("error inserting outs {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
}
}
}
let _ = db.execute("COMMIT");
Ok(())
}
pub fn get_total_transaction_number(db: Connection, network: &String) -> Result<i64,Error> {
let mut stmt = db.prepare("SELECT COUNT(*) as total_number FROM tbl_tx where network = ?;").unwrap();
stmt.bind((1,Value::String(network.to_string()))).unwrap();
match stmt.next(){
Ok(State::Row)=>{
Ok(stmt.read::<i64,_>("total_number").unwrap())
},
Ok(sqlite::State::Done) => todo!(),
Err(err)=>Err(err)
}
}

View File

@ -1,502 +0,0 @@
use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use tokio::net::TcpListener;
use hyper_util::rt::TokioIo;
use std::net::IpAddr;
use std::env;
use std::time::{SystemTime,UNIX_EPOCH};
use std::sync::{Arc, Mutex, MutexGuard};
use std::collections::HashMap;
use sqlite::{State,Connection};
use bitcoin::{consensus, Transaction, Network};
use hex_conservative::FromHex;
use regex::Regex;
use serde::{Serialize, Deserialize};
use log::{info,error,trace,debug};
use serde_json;
#[derive(Debug, Clone,Serialize, Deserialize)]
struct NetConfig {
address: String,
fixed_fee: u64,
}
impl Default for NetConfig {
fn default() -> Self {
NetConfig {
address: "".to_string(),
fixed_fee: 50000,
}
}
}
impl NetConfig {
fn default_regtest() -> Self {
NetConfig {
address: "".to_string(),
fixed_fee: 50000,
}
}
}
#[derive(Debug, Serialize, Deserialize,Clone)]
struct MyConfig {
regtest: NetConfig,
signet: NetConfig,
testnet4: NetConfig,
testnet3: NetConfig,
mainnet: NetConfig,
bind_address: String,
bind_port: u16, // Changed to u16 for port numbers
db_file: String,
}
impl Default for MyConfig {
fn default() -> Self {
MyConfig {
regtest: NetConfig::default_regtest(),
signet: NetConfig::default(), // Use default for other networks
testnet4: NetConfig::default(),
testnet3: NetConfig::default(),
mainnet: NetConfig::default(),
bind_address: "127.0.0.1".to_string(),
bind_port: 9137, // Ensure this is a u16
db_file: "../bal.db".to_string(),
}
}
}
impl MyConfig {
fn get_net_config(&self, param: &str) -> &NetConfig{
match param {
"regtest" => &self.regtest,
"testnet" => &self.testnet3,
"signet" => &self.signet,
_ => &self.mainnet,
}
}
}
async fn echo_info(
param: &str,
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo info!!!{}",param);
let netconfig=MyConfig::get_net_config(cfg,param);
return Ok(Response::new(full("{\"address\":\"".to_owned()+&netconfig.address+"\",\"base_fee\":\""+&netconfig.fixed_fee.to_string()+"\"}")));
}
async fn echo_search(whole_body: &Bytes,
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo search!!!");
let strbody = std::str::from_utf8(&whole_body).unwrap();
info!("{}",strbody);
trace!("{}",strbody.len());
let mut response = Response::new(full("Bad data received".to_owned()));
*response.status_mut() = StatusCode::BAD_REQUEST;
if strbody.len() >0 && strbody.len()<=70 {
let db = sqlite::open(&cfg.db_file).unwrap();
trace!("qua ci arrivo");
let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ?").unwrap();
statement.bind((1, strbody)).unwrap();
while let Ok(State::Row) = statement.next() {
trace!("qua tutto ok");
let mut response_data = HashMap::new();
match statement.read::<String, _>("status") {
Ok(value) => response_data.insert("status", value),
Err(e) => {
error!("Error reading status: {}", e);
break;
//response_data.insert("status", "Error".to_string())
}
};
// Read the transaction (tx)
match statement.read::<String, _>("tx") {
Ok(value) => response_data.insert("tx", value),
Err(e) => {
error!("Error reading tx: {}", e);
break;
//response_data.insert("tx", "Error".to_string())
}
};
match statement.read::<String, _>("our_address") {
Ok(value) => response_data.insert("our_address", value),
Err(e) => {
error!("Error reading address: {}", e);
break;
//response_data.insert("tx", "Error".to_string())
}
};
match statement.read::<String, _>("our_fees") {
Ok(value) => response_data.insert("our_fees", value),
Err(e) => {
error!("Error reading fees: {}", e);
break;
//response_data.insert("tx", "Error".to_string())
}
};
// Read the request id (reqid)
match statement.read::<String, _>("reqid") {
Ok(value) => response_data.insert("time", value),
Err(e) => {
error!("Error reading reqid: {}", e);
break;
//response_data.insert("time", "Error".to_string())
}
};
response = match serde_json::to_string(&response_data){
Ok(json_data) => Response::new(full(json_data)),
Err(_) => {break;}
};
return Ok(response);
}
}
Ok(response)
}
async fn echo_push(whole_body: &Bytes,
cfg: &MyConfig,
param: &str,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
//let whole_body = req.collect().await?.to_bytes();
let strbody = std::str::from_utf8(&whole_body).unwrap();
let lines = strbody.split("\n");
let mut response = Response::new(full("Bad data received".to_owned()));
*response.status_mut() = StatusCode::BAD_REQUEST;
debug!("network: {}", param);
let network = match param{
"testnet" => Network::Testnet,
"signet" => Network::Signet,
"regtest" => Network::Regtest,
&_ => Network::Bitcoin,
};
let req_time = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_nanos();
let sqltxshead = "INSERT INTO tbl_tx (txid, wtxid, ntxid, tx, locktime, reqid, network, our_address, our_fees)".to_string();
let mut sqltxs = "".to_string();
//let mut sqlinps = "INSERT INTO tbl_input (txid, in_txid,in_vout)".to_string();
//let mut sqlouts = "INSERT INTO tbl_output (txid, script_pubkey, address, amount)\n".to_string();
let mut union_tx = true;
let mut already_present = false;
//let mut union_inps = true;
//let mut union_outs = true;
let db = sqlite::open(&cfg.db_file).unwrap();
let netconfig = MyConfig::get_net_config(cfg,param);
for line in lines {
if line.len() == 0{
trace!("line len is: {}",line.len());
continue
}
let linea = format!("{req_time}:{line}");
info!("New Tx: {}", linea);
let raw_tx = match Vec::<u8>::from_hex(line) {
Ok(raw_tx) => raw_tx,
Err(err) => {
error!("rawtx error: {}",err);
continue
}
};
if raw_tx.len() > 0 {
trace!("len: {}",raw_tx.len());
let tx: Transaction = match consensus::deserialize(&raw_tx){
Ok(tx) => tx,
Err(err) => {error!("error: unable to parse tx: {}\n{}",line,err);continue}
};
let txid = tx.txid().to_string();
trace!("txid: {}",txid);
let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ?").unwrap();
statement.bind((1,&txid[..])).unwrap();
//statement.bind((1,"Bob")).unwrap();
if let Ok(State::Row) = statement.next() {
trace!("already present");
already_present=true;
continue;
}
let ntxid = tx.ntxid();
let wtxid = tx.wtxid();
let mut found = false;
let locktime = tx.lock_time;
let mut our_fees = 0;
let mut our_address:String = "".to_string();
//dbg!(netconfig.fixed_fee);
if netconfig.fixed_fee >0 {
for output in tx.output{
let script_pubkey = output.script_pubkey;
let address = match bitcoin::Address::from_script(script_pubkey.as_script(), network){
Ok(address) => address.to_string(),
Err(_) => String::new(),
};
//dbg!(&address);
let amount = output.value;
//dbg!(&amount);
//search wllexecutor output
if address == netconfig.address.to_string() && amount.to_sat() >= netconfig.fixed_fee{
our_fees = amount.to_sat();
our_address = netconfig.address.to_string();
found = true;
trace!("address and fees are correct {}: {}",our_address,our_fees);
break;
}else {
trace!("address and fees not found {}: {}",address,amount.to_sat());
trace!("address and fees not found {}: {}",netconfig.address.to_string(),netconfig.fixed_fee);
}
}
} else { found = true; }
if found == false{
error!("willexecutor output not found ");
//return Ok(response)
} else {
if union_tx == false {
sqltxs = format!("{sqltxs} UNION ALL");
}else{
union_tx = false;
}
sqltxs = format!("{sqltxs} SELECT '{txid}', '{wtxid}', '{ntxid}', '{line}', '{locktime}', '{req_time}', '{network}','{our_address}',{our_fees}");
}
}
else{
trace!("rawTx len is: {}",raw_tx.len());
debug!("{}",&sqltxs);
}
}
debug!("SQL: {}",sqltxs);
if sqltxs.len()== 0{
if already_present == true{
//dbg!(already_present);
return Ok(Response::new(full("already present")))
}
}
let _ = db.execute("BEGIN TRANSACTION");
let sql = format!("{}{}",sqltxshead,sqltxs);
if let Err(err) = db.execute(&sql){
error!("error executing sql:{} - {}",&sql,err);
let _ = db.execute("ROLLBACK");
//dbg!(&already_present);
if already_present == true{
trace!("already_present = True");
return Ok(Response::new(full("already present")))
}
return Ok(response)
}
//if !error {
// if let Err(_) = db.execute(sqlinps){
// let _ = db.execute("ROLLBACK");
// error = true
// }
//}
//if !error {
// if let Err(_) = db.execute(sqlouts){
// let _ = db.execute("ROLLBACK");
// error = true;
// }
//}
let _ = db.execute("COMMIT");
Ok(Response::new(full("thx")))
}
fn create_database(db: Connection){
info!("database sanity check");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_tx (txid PRIMARY KEY, wtxid, ntxid, tx, locktime integer, network, network_fees, reqid, our_fees, our_address, status integer DEFAULT 0);");
let _ = db.execute("ALTER TABLE tbl_tx ADD COLUMN push_err TEXT");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_input (txid, in_txid,in_vout, spend_txidi);");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_output (txid, script_pubkey, address, amount);");
let _ = db.execute("CREATE UNIQUE INDEX idx_tbl_input ON(txid, txid,in_txid,in_vout)");
}
fn match_uri<'a>(path: &str, uri: &'a str) -> Option<&'a str> {
let re = Regex::new(path).unwrap();
if let Some(captures) = re.captures(uri) {
if let Some(param) = captures.name("param") {
return Some(param.as_str());
}
}
None
}
/// This is our service handler. It receives a Request, routes on its
/// path, and returns a Future of a Response.
async fn echo(
req: Request<hyper::body::Incoming>,
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let mut not_found = Response::new(empty());
*not_found.status_mut() = StatusCode::NOT_FOUND;
let mut ret: Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> = Ok(not_found);
let uri = req.uri().path().to_string();
//dbg!(&req);
//dbg!(&uri);
match req.method() {
// Serve some instructions at /
&Method::POST => {
let whole_body = req.collect().await?.to_bytes();
if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/pushtxs$",uri.as_str()) {
//let whole_body = collect_body(req,512_000).await?;
ret = echo_push(&whole_body,cfg,param).await;
}
if uri=="/searchtx"{
//let whole_body = collect_body(req,64).await?;
ret = echo_search(&whole_body,cfg).await;
}
ret
}
&Method::GET => {
if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/info$",uri.as_str()) {
ret = echo_info(param,cfg).await;
}
ret
}
// Return the 404 Not Found for other routes.
_ => ret
}
}
fn empty() -> BoxBody<Bytes, hyper::Error> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}
fn parse_env(cfg: &Arc<Mutex<MyConfig>>){
let mut cfg_lock = cfg.lock().unwrap();
match env::var("BAL_SERVER_DB_FILE") {
Ok(value) => {
cfg_lock.db_file = value;},
Err(_) => {},
}
match env::var("BAL_SERVER_BIND_ADDRESS") {
Ok(value) => {
cfg_lock.bind_address = value;},
Err(_) => {},
}
match env::var("BAL_SERVER_BIND_PORT") {
Ok(value) => {
cfg_lock.bind_address = value;},
Err(_) => {},
}
cfg_lock = parse_env_netconfig(cfg_lock,"regtest");
cfg_lock = parse_env_netconfig(cfg_lock,"signet");
cfg_lock = parse_env_netconfig(cfg_lock,"testnet3");
cfg_lock = parse_env_netconfig(cfg_lock,"testnet");
drop(parse_env_netconfig(cfg_lock,"bitcoin"));
}
fn parse_env_netconfig<'a>(mut cfg_lock: MutexGuard<'a, MyConfig>, chain: &'a str) -> MutexGuard<'a, MyConfig>{
let cfg = match chain{
"regtest" => &mut cfg_lock.regtest,
"signet" => &mut cfg_lock.signet,
"testnet3" => &mut cfg_lock.testnet3,
"testnet" => &mut cfg_lock.testnet4,
&_ => &mut cfg_lock.mainnet,
};
match env::var(format!("BAL_SERVER_{}_ADDRESS",chain.to_uppercase())) {
Ok(value) => { cfg.address = value; },
Err(_) => {},
}
match env::var(format!("BAL_SERVER_{}_FIXE_FEE",chain.to_uppercase())) {
Ok(value) => {
match value.parse::<u64>(){
Ok(value) =>{ cfg.fixed_fee = value; },
Err(_) => {},
}
}
Err(_) => {},
}
cfg_lock
}
fn get_default_config()-> MyConfig {
let file = confy::get_configuration_file_path("bal-server",None).expect("Error while getting path");
info!("Default configuration file path is: {:#?}", file);
confy::load("bal-server",None).expect("cant_load")
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
env_logger::init();
let cfg: Arc<Mutex<MyConfig>> = match env::var("BAL_SERVER_CONFIG_FILE") {
Ok(value) => {
Arc::new(Mutex::new(
match confy::load_path(value.to_string()){
Ok(val) => {
info!("The configuration file path is: {:#?}", value);
val
},
Err(err) => {
error!("{}",err);
get_default_config()
}
}
))
},
Err(_) => {
Arc::new(Mutex::new(get_default_config()))
},
};
parse_env(&cfg);
let cfg_lock = cfg.lock().unwrap();
let db = sqlite::open(&cfg_lock.db_file).unwrap();
create_database(db);
let addr = cfg_lock.bind_address.to_string();
info!("bind address:{}",addr);
let addr: IpAddr = addr.parse()?;
let listener = TcpListener::bind((addr,cfg_lock.bind_port)).await?;
info!("Listening on http://{}:{}", addr,cfg_lock.bind_port);
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
tokio::task::spawn({
let cfg = cfg_lock.clone();
async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(|req: Request<hyper::body::Incoming>| async {
echo(req,&cfg).await
}))
.await
{
error!("Error serving connection: {:?}", err);
}
}
});
}
}

112
src/xpub.rs Normal file
View File

@ -0,0 +1,112 @@
use sha2::{Digest, Sha256};
use bitcoin::bip32::Xpub;
use std::str::FromStr;
use bitcoin::bip32::DerivationPath;
use bitcoin::key::Secp256k1;
use bitcoin::Address;
use bitcoin::ScriptBuf;
use bitcoin::WPubkeyHash;
use bitcoin::Network;
use bitcoin::hashes::Hash;
// Mainnet (BIP44/BIP49/BIP84)
enum BS58Prefix{
Xpub,
//Ypub,
//Zpub,
//Tpub,
//Vpub,
//Upub
}
const XPUB_PREFIX:[u8; 4] = [0x04, 0x88, 0xB2, 0x1E]; // xpub (Legacy P2PKH)
//const YPUB_PREFIX:[u8; 4] = [0x04, 0x9D, 0x7C, 0xB2]; // ypub (Nested SegWit P2SH-P2WPKH)
//const ZPUB_PREFIX:[u8; 4] = [0x04, 0xB2, 0x47, 0x46]; // zpub (Native SegWit P2WPKH)
//const TPUB_PREFIX:[u8; 4] = [0x04, 0x35, 0x87, 0xCF]; // tpub (Testnet Legacy P2PKH)
//const VPUB_PREFIX:[u8; 4] = [0x04, 0x5F, 0x1C, 0xF6]; // vpub (Testnet Nested SegWit)
//const UPUB_PREFIX:[u8; 4] = [0x04, 0x4A, 0x52, 0x62]; // upub (RegTest Nested SegWit)
fn base58check_decode(s: &str) -> Result<Vec<u8>, String> {
let data = bs58::decode(s).into_vec().map_err(|e| e.to_string())?;
if data.len() < 4 {
return Err("Data troppo corta".to_string());
}
let (payload, checksum) = data.split_at(data.len() - 4);
let hash = Sha256::digest(Sha256::digest(payload));
if hash[0..4] != checksum[..] {
return Err("Checksum invalido".to_string());
}
Ok(payload.to_vec())
}
fn base58check_encode(data: &[u8]) -> String {
let checksum = &Sha256::digest(Sha256::digest(data))[0..4];
let full = [data, checksum].concat();
bs58::encode(full).into_string()
}
fn convert_to(zpub: &str,prefix: BS58Prefix) -> Result<String, String> {
let mut data = base58check_decode(zpub)?;
if data.len() < 4 {
return Err("Non è una zpub valida.".to_string());
}
data.splice(0..4, match prefix {
BS58Prefix::Xpub => XPUB_PREFIX,
//BS58Prefix::Ypub => YPUB_PREFIX,
//BS58Prefix::Zpub => ZPUB_PREFIX,
//BS58Prefix::Vpub => VPUB_PREFIX,
//BS58Prefix::Tpub => TPUB_PREFIX,
//BS58Prefix::Upub => UPUB_PREFIX,
});
Ok(base58check_encode(&data))
}
pub fn new_address_from_xpub(zpub: &str, index: i64,network: Network)-> Result<(String,String), Box<dyn std::error::Error>>{
let xpub = Xpub::from_str(&convert_to(zpub,BS58Prefix::Xpub)?)?;
let path = format!("m/0/{}",index);
let derivation_path = DerivationPath::from_str(path.as_str())?;
let secp = Secp256k1::new();
let derived_xpub = xpub.derive_pub(&secp, &derivation_path)?;
let public_key = derived_xpub.public_key;
let pubkey_bytes = public_key.serialize();
let witness_program = WPubkeyHash::hash(&pubkey_bytes);
let redeem_script = ScriptBuf::new_p2wpkh(&witness_program);
//let script_pubkey = ScriptBuf::new_p2sh(&redeem_script.script_hash());
let address = Address::from_script(&redeem_script, network)?;
//let address = Address::from_script(&script_pubkey, network)?;
Ok((address.to_string(),path.to_string()))
}
/*
fn main() -> Result<(), Box<dyn std::error::Error>>{
//let zpub = "xpub6C29v8gxCXREHUzoGNfqqFqZWxTVEmYtmZshuzfSwBKNmfYQxoizRziCkkUUA4WwJZkJs2i7nttRiC6MQG7mxZpouXeYkTZe3U52RyPAeo2";
//let zpub = "vpub5Ut36m34VebUUjdhYaxJCjSPqk3ZR8bA2MXLmbHRQCycAxy5Q1GFPJspLkJywJjBgQnvU3rmwPKTPp1ELLWeXrve3zBufpZR4MRCCTNHzsn";
let zpub = "zpub6qdfveGrxBQN3z8paZ88EHpCn5MGXpUoHwQmHhPbj4rPQtUjbWyCHrJFYZGVY7MsmVbDaeu4JYqRqcdLzMx78wZFEWbLrF9FG3gr2MPQC5H";
match convert_to(zpub,BS58Prefix::Xpub) {
Ok(xpub) => println!("XPUB: {}", xpub),
Err(e) => eprintln!("Errore: {}", e),
}
let xpub = Xpub::from_str(&convert_to(zpub,BS58Prefix::Xpub)?)?;
let derivation_path = DerivationPath::from_str("m/0/0")?;
let secp = Secp256k1::new();
let derived_xpub = xpub.derive_pub(&secp, &derivation_path)?;
let public_key = derived_xpub.public_key;
let pubkey_bytes = public_key.serialize();
let witness_program = WPubkeyHash::hash(&pubkey_bytes);
let redeem_script = ScriptBuf::new_p2wpkh(&witness_program);
let script_pubkey = ScriptBuf::new_p2sh(&redeem_script.script_hash());
// Generate the Bitcoin SegWit (BIP49) address
let network = Network::Bitcoin;
let address = Address::from_script(&redeem_script, network)?;
let address = Address::from_script(&script_pubkey, network)?;
Ok(())
}
*/