Compare commits

..

No commits in common. "main" and "v0.1.0" have entirely different histories.
main ... v0.1.0

13 changed files with 659 additions and 3246 deletions

1767
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -5,29 +5,18 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
base64 = "0.22.1" hyper = { version = "1.2", features = ["full"] }
bs58 = "0.4.0" tokio = { version = "1", features = ["full"] }
bytes = "1.2"
bitcoin = { version = "0.32.5" }
bitcoincore-rpc = "0.19.0"
bitcoincore-rpc-json = "0.19.0"
byteorder = "1.5.0"
confy = "0.6.1"
chrono = "0.4.40"
env_logger = "0.11.5"
hex = "0.4.3"
hex-conservative = "0.1.1"
hyper = { version = "1.3.1", features = ["http1","server"] }
hyper-util = { version = "0.1.3", features = ["tokio"] }
http-body-util = "0.1" http-body-util = "0.1"
log = "0.4.21" hyper-util = { version = "0.1", features = ["full"] }
openssl = { version = "0.10.74", features = ["vendored"] } bytes = "1.2"
sha2 = "0.10.8" serde = { version = "1.0.152", features = ["derive"] } # <- Only one serde version needed (serde or serde_derive)
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.116" serde_json = "1.0.116"
confy = "0.6.1"
bitcoin = "0.31.0"
sqlite = "0.34.0" sqlite = "0.34.0"
hex-conservative = "0.1.1"
regex = "1.10.4" regex = "1.10.4"
reqwest = { version = "0.12.24", features = ["json","socks"] } log = "0.4.21"
tokio = { version = "1", features = ["rt", "net","macros","rt-multi-thread"] } # Keep only necessary runtime components env_logger = "0.11.5"
zmq = "0.10.0"

View File

@ -1,11 +1,10 @@
# bal-server # bal-server
## Installation ## Installation
```bash ```bash
$ git clone .... $ git clone ....
$ cd bal-server $ cd bal-server
$ openssl genpkey -algorithm ED25519 -out private_key.pem
$ openssl pkey -in private_key.pem -pubout -out public_key.pem
$ cargo build --release $ cargo build --release
$ sudo cp target/release/bal-server /usr/local/bin $ sudo cp target/release/bal-server /usr/local/bin
$ bal-server $ bal-server
@ -21,7 +20,6 @@ The `bal-server` application can be configured using environment variables. The
| `BAL_SERVER_DB_FILE` | Path to the SQLite3 database file. If the file does not exist, a new one will be created. | `bal.db` | | `BAL_SERVER_DB_FILE` | Path to the SQLite3 database file. If the file does not exist, a new one will be created. | `bal.db` |
| `BAL_SERVER_BIND_ADDRESS` | Public address for listening to requests. | `127.0.0.1` | | `BAL_SERVER_BIND_ADDRESS` | Public address for listening to requests. | `127.0.0.1` |
| `BAL_SERVER_BIND_PORT` | Default port for listening to requests. | `9137` | | `BAL_SERVER_BIND_PORT` | Default port for listening to requests. | `9137` |
| `BAL_SERVER_PUB_KEY_PATH` | WillExecutor Ed25519 public key | `public_key.pem` |
| `BAL_SERVER_REGTEST_ADDRESS` | Bitcoin address for the regtest environment. | - | | `BAL_SERVER_REGTEST_ADDRESS` | Bitcoin address for the regtest environment. | - |
| `BAL_SERVER_REGTEST_FIXED_FEE` | Fixed fee for the regtest environment. | 50000 | | `BAL_SERVER_REGTEST_FIXED_FEE` | Fixed fee for the regtest environment. | 50000 |
| `BAL_SERVER_SIGNET_ADDRESS` | Bitcoin address for the signet environment. | - | | `BAL_SERVER_SIGNET_ADDRESS` | Bitcoin address for the signet environment. | - |
@ -30,56 +28,3 @@ The `bal-server` application can be configured using environment variables. The
| `BAL_SERVER_TESTNET_FIXED_FEE` | Fixed fee for the testnet environment. | 50000 | | `BAL_SERVER_TESTNET_FIXED_FEE` | Fixed fee for the testnet environment. | 50000 |
| `BAL_SERVER_BITCOIN_ADDRESS` | Bitcoin address for the mainnet environment. | - | | `BAL_SERVER_BITCOIN_ADDRESS` | Bitcoin address for the mainnet environment. | - |
| `BAL_SERVER_BITCOIN_FIXED_FEE` | Fixed fee for the mainnet environment. | 50000 | | `BAL_SERVER_BITCOIN_FIXED_FEE` | Fixed fee for the mainnet environment. | 50000 |
# bal-pusher
`bal-pusher` is a tool that retrieves Bitcoin transactions from a database and pushes them to the Bitcoin network when their **locktime** exceeds the **median time past** (MTP). It listens for Bitcoin block updates via ZMQ.
## Installation
To use `bal-pusher`, you need to compile and install Bitcoin with ZMQ (ZeroMQ) support enabled. Then, configure your Bitcoin node and `bal-pusher` to push the transactions.
### Prerequisites
1. **Bitcoin with ZMQ Support**:
Ensure that Bitcoin is compiled with ZMQ support. Add the following line to your `bitcoin.conf` file:
```
zmqpubhashblock=tcp://127.0.0.1:28332
```
2. **Install Rust and Cargo**:
If you haven't already installed Rust and Cargo, you can follow the official instructions to do so: [Rust Installation](https://www.rust-lang.org/tools/install).
## Configuration
`bal-pusher` can be configured using environment variables. If no configuration file is provided, a default configuration file will be created.
### Available Configuration Variables
| Variable | Description | Default |
|---------------------------------------|------------------------------------------|----------------------------------------------|
| `BAL_PUSHER_CONFIG_FILE` | Path to the configuration file. If the file does not exist, it will be created. | `$HOME/.config/bal-pusher/default-config.toml` |
| `BAL_PUSHER_DB_FILE` | Path to the SQLite3 database file. If the file does not exist, it will be created. | `bal.db` |
| `BAL_PUSHER_ZMQ_LISTENER` | ZMQ listener for Bitcoin updates. | `tcp://127.0.0.1:28332` |
| `BAL_PUSHER_BITCOIN_HOST` | Bitcoin server host for RPC connections. | `http://127.0.0.1` |
| `BAL_PUSHER_BITCOIN_PORT` | Bitcoin RPC server port. | `8332` |
| `BAL_PUSHER_BITCOIN_COOKIE_FILE` | Path to Bitcoin RPC cookie file. | `$HOME/.bitcoin/.cookie` |
| `BAL_PUSHER_BITCOIN_RPC_USER` | Bitcoin RPC username. | - |
| `BAL_PUSHER_BITCOIN_RPC_PASSWORD` | Bitcoin RPC password. | - |
| `BAL_PUSHER_SEND_STATS` | Contact welist to provide times | false |
| `WELIST_SERVER_URL` | welist server url to provide times | https://welist.bitcoin-afer.life |
| `BAL_SERVER_URL` | WillExecutor server url | - |
| `SSL_KEY_PATH` | Ed25519 private key pem file | `private_key.pem` |
## Running `bal-pusher`
Once the application is installed and configured, you can start `bal-pusher` by running the following command:
```bash
$ bal-pusher
```
This will start the service, which will listen for Bitcoin blocks via ZMQ and push transactions from the database when their locktime exceeds the median time past.

View File

@ -1,16 +0,0 @@
RUST_LOG=info
BAL_PUSHER_DB_FILE=/home/bal/bal.db
BAL_PUSHER_BITCOIN_COOKIE_FILE=/home/bitcoin/.bitcoin/.cookie
BAL_PUSHER_REGTEST_COOKIE_FILE=/home/bitcoin/.bitcoin/regtest/.cookie
BAL_PUSHER_TESTNET_COOKIE_FILE=/home/bitcoin/.bitcoin/testnet3/.cookie
BAL_PUSHER_SIGNET_COOKIE_FILE=/home/bitcoin/.bitcoin/signet/.cookie
BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332
BAL_PUSHER_SEND_STATS=true
WELIST_SERVER_URL=http://welist.bitcoin-after.life
SSL_KEY_PATH=/home/bal/privkey.pem
#your server domain. do not add https or final / only domain.
BAL_SERVER_URL="https://we.bitcoin-after.life"

View File

@ -1,14 +0,0 @@
RUST_LOG=trace
BAL_PUSHER_DB_FILE="$(pwd)/bal.db"
#export BAL_PUSHER_BITCOIN_COOKIE_FILE=/~/.bitcoin/.cookie
#export BAL_PUSHER_REGTEST_COOKIE_FILE=/~/.bitcoin/regtest/.cookie
#export BAL_PUSHER_TESTNET_COOKIE_FILE=/~/.bitcoin/testnet3/.cookie
#export BAL_PUSHER_SIGNET_COOKIE_FILE=/~/.bitcoin/signet/.cookie
BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332
export BAL_PUSHER_SEND_STATS=true
export WELIST_SERVER_URL=http://localhost:8085
export BAL_SERVER_URL="http://127.0.0.1:9133"
export SSL_KEY_PATH="$(pwd)/private_key.pem"
cargo run --bin=bal-pusher regtest

View File

@ -1,15 +1,11 @@
RUST_LOG=info RUST_LOG=info
BAL_SERVER_DB_FILE="/home/bal/bal.db" BAL_SERVER_DB_FILE=/path/to/bal.db
BAL_SERVER_INFO="BAL server test willexecutor"
BAL_SERVER_BIND_ADDRESS=127.0.0.1 BAL_SERVER_BIND_ADDRESS=127.0.0.1
BAL_SERVER_BIND_PORT=9133 BAL_SERVER_BITCOIN_ADDRESS=your mainnet address
BAL_SERVER_BITCOIN_ADDRESS="your bitcoin or xpub to recive payments here" BAL_SERVER_BITCOIN_FEE=100000
BAL_SERVER_BITCOIN_FIXED_FEE=50000 BAL_SERVER_REGTEST_ADDRESS=
BAL_SERVER_PUB_KEY_PATH="/home/bal/public_key.pem" BAL_SERVER_REGTEST_FEE=100000
BAL_SERVER_TESTNET_ADDRESS=
BAL_SERVER_REGTEST_ADDRESS="vpub5UhLrYG1qQjnJhvJgBdqgpznyH11mxW9hwBYxf3KhfdjiupCFPUVDvgwpeZ9Wj5YUJXjKjXjy7DSbJNBW1sXbKwARiaphm1UjHYy3mKvTG4" BAL_SERVER_TESTNET_FEE=100000
BAL_SERVER_REGTEST_FEE=5000 BAL_SERVER_SIGNET_ADDRESS=
#BAL_SERVER_TESTNET_ADDRESS= BAL_SERVER_SIGNET_FEE=100000
#BAL_SERVER_TESTNET_FEE=100000
#BAL_SERVER_SIGNET_ADDRESS=
#BAL_SERVER_SIGNET_FEE=100000

View File

@ -4,46 +4,34 @@ After=network.target
[Service] [Service]
# Service execution EnvironmentFile=/etc/bal/bal-server.env
###################
EnvironmentFile=/home/bal/bal-server.env
ExecStart=/usr/local/bin/bal-server ExecStart=/usr/local/bin/bal-server
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=bal-server SyslogIdentifier=bal-server
# Process management
####################
Type=simple Type=simple
PIDFile=/run/bal-server/bal-server.pid PIDFile=/run/bal-server/bal-server.pid
Restart=always Restart=always
TimeoutSec=300 TimeoutSec=300
RestartSec=30 RestartSec=30
# Directory creation and permissions
####################################
User=bal User=bal
UMask=0027 UMask=0027
# /run/bal-server
RuntimeDirectory=bal-server RuntimeDirectory=bal-server
RuntimeDirectoryMode=0710 RuntimeDirectoryMode=0710
# Hardening measures PrivateTmp=true
####################
# Mount /usr, /boot/ and /etc read-only for the process.
ProtectSystem=full ProtectSystem=full
# Disallow the process and all of its children to gain
# new privileges through execve().
NoNewPrivileges=true NoNewPrivileges=true
# Use a new /dev namespace only populated with API pseudo devices
# such as /dev/null, /dev/zero and /dev/random.
PrivateDevices=true PrivateDevices=true
# Deny the creation of writable and executable memory mappings.
MemoryDenyWriteExecute=true MemoryDenyWriteExecute=true
[Install] [Install]

View File

@ -1,24 +0,0 @@
WORKING_DIR=$(pwd)
if [ ! -f "$WORKING_DIR/public_key.pem" ]; then
echo "creating keypairs"
openssl genpkey -algorithm ED25519 -out private_key.pem
openssl pkey -in private_key.pem -pubout -out public_key.pem
fi
export RUST_LOG="trace"
export BAL_SERVER_DB_FILE="$WORKING_DIR/bal.db"
export BAL_SERVER_INFO="BAL devel willexecutor server"
export BAL_SERVER_BIND_ADDRESS="127.0.0.1"
export BAL_SERVER_BIND_PORT=9133
export BAL_SERVER_PUB_KEY_PATH="$WORKING_DIR/public_key.pem"
#export BAL_SERVER_BITCOIN_ADDRESS="your bitcoin address or xpub to recive payments here"
#export BAL_SERVER_BITCOIN_FIXED_FEE=50000
export BAL_SERVER_REGTEST_ADDRESS="vpub5UhLrYG1qQjnJhvJgBdqgpznyH11mxW9hwBYxf3KhfdjiupCFPUVDvgwpeZ9Wj5YUJXjKjXjy7DSbJNBW1sXbKwARiaphm1UjHYy3mKvTG4"
export BAL_SERVER_REGTEST_FEE=5000
#export BAL_SERVER_TESTNET_ADDRESS=
#export BAL_SERVER_TESTNET_FEE=100000
#export BAL_SERVER_SIGNET_ADDRESS=
#export BAL_SERVER_SIGNET_FEE=100000
cargo run --bin=bal-server

View File

@ -1,599 +0,0 @@
extern crate bitcoincore_rpc;
extern crate zmq;
use bitcoin::Network;
use bitcoincore_rpc::{bitcoin, Auth, Client, Error, RpcApi};
use bitcoincore_rpc_json::GetBlockchainInfoResult;
use sqlite::{Value};
use serde::Serialize;
use serde::Deserialize;
use serde_json::json;
use std::env;
use log::{info,warn,error,trace,debug};
use zmq::{Context, Socket, DEALER, DONTWAIT};
use std::str;
use std::{thread, time::Duration};
use std::collections::HashMap;
use byteorder::{LittleEndian, ReadBytesExt};
use std::io::Cursor;
use hex;
use std::error::Error as StdError;
use reqwest::Client as rClient;
use openssl::hash::MessageDigest;
use openssl::pkey::{PKey};
use openssl::sign::Signer;
use openssl::sign::Verifier;
use base64::{engine::general_purpose, Engine as _};
use std::fs;
use std::time::Instant;
const LOCKTIME_THRESHOLD:i64 = 5000000;
const VERSION:&str = "0.0.2";
#[derive(Debug, Clone,Serialize, Deserialize)]
struct MyConfig {
zmq_listener: String,
requests_file: String,
db_file: String,
bitcoin_dir: String,
regtest: NetworkParams,
testnet: NetworkParams,
signet: NetworkParams,
mainnet: NetworkParams,
send_stats: bool,
url: String,
secret_code: String,
ssl_key_path: String
}
impl Default for MyConfig {
fn default() -> Self {
MyConfig {
zmq_listener: "tcp://127.0.0.1:28332".to_string(),
requests_file: "rawrequests.log".to_string(),
db_file: "../bal.db".to_string(),
bitcoin_dir: "".to_string(),
regtest: get_network_params_default(Network::Regtest),
testnet: get_network_params_default(Network::Testnet),
signet: get_network_params_default(Network::Signet),
mainnet: get_network_params_default(Network::Bitcoin),
send_stats: false,
url: "http://localhost/".to_string(),
secret_code: "xxx".to_string(),
ssl_key_path: "privkey.pem".to_string(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct NetworkParams {
host: String,
port: u16,
dir_path: String,
db_field: String,
cookie_file: String,
rpc_user: String,
rpc_pass: String,
}
fn get_network_params(cfg: &MyConfig,network:Network)-> &NetworkParams{
match network{
Network::Testnet => &cfg.testnet,
Network::Signet => &cfg.signet,
Network::Regtest => &cfg.regtest,
_ => &cfg.mainnet
}
}
fn get_network_params_default(network:Network) -> NetworkParams{
match network {
Network::Testnet => NetworkParams{
host: "http://i27.0.0.1".to_string(),
port: 18332,
dir_path: "testnet3/".to_string(),
db_field: "testnet".to_string(),
cookie_file: "".to_string(),
rpc_user: "".to_string(),
rpc_pass: "".to_string(),
},
Network::Signet => NetworkParams{
host: "http://127.0.0.1".to_string(),
port: 18332,
dir_path: "signet/".to_string(),
db_field: "signet".to_string(),
cookie_file: "".to_string(),
rpc_user: "".to_string(),
rpc_pass: "".to_string(),
},
Network::Regtest => NetworkParams{
host: "http://127.0.0.1".to_string(),
port: 18443,
dir_path: "regtest/".to_string(),
db_field: "regtest".to_string(),
cookie_file: "".to_string(),
rpc_user: "".to_string(),
rpc_pass: "".to_string(),
},
_ => NetworkParams{
host: "http://127.0.0.1".to_string(),
port: 8332,
dir_path: "".to_string(),
db_field: "bitcoin".to_string(),
cookie_file: "".to_string(),
rpc_user: "".to_string(),
rpc_pass: "".to_string(),
},
}
}
fn get_cookie_filename(network: &NetworkParams) ->Result<String,Box<dyn StdError>>{
if network.cookie_file !=""{
Ok(network.cookie_file.clone())
}else{
match env::var_os("HOME") {
Some(home) => {
match home.to_str(){
Some(home_str) => {
let cookie_file_path = format!("{}/.bitcoin/{}.cookie",home_str, network.dir_path);
Ok(cookie_file_path)
},
None => Err("wrong HOME value".into())
}
},
None => Err("Please Set HOME environment variable".into())
}
}
}
fn get_client_from_username(url: &String, network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{
if network.rpc_user != "" {
match Client::new(&url[..],Auth::UserPass(network.rpc_user.to_string(),network.rpc_pass.to_string())){
Ok(client) => match client.get_blockchain_info(){
Ok(bcinfo) => Ok((client,bcinfo)),
Err(err) => Err(err.into())
}
Err(err)=>Err(err.into())
}
}else{
Err("Failed".into())
}
}
fn get_client_from_cookie(url: &String,network: &NetworkParams)->Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{
match get_cookie_filename(network){
Ok(cookie) => {
match Client::new(&url[..], Auth::CookieFile(cookie.into())) {
Ok(client) => {
match client.get_blockchain_info(){
Ok(bcinfo) => {
Ok((client,bcinfo))
},
Err(err) => {
Err(err.into())
}
}
},
Err(err)=>Err(err.into())
}
},
Err(err)=>Err(err.into())
}
}
fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{
let url = format!("{}:{}/",network.host,&network.port);
match get_client_from_username(&url,network){
Ok(client) =>{Ok(client)},
Err(_) =>{
match get_client_from_cookie(&url,&network){
Ok(client)=>{
Ok(client)
},
Err(err)=> Err(err.into())
}
}
}
}
async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Error> {
/*let url = args.next().expect("Usage: <rpc_url> <username> <password>");
let user = args.next().expect("no user given");
let pass = args.next().expect("no pass given");
*/
//let network = Network::Regtest
match get_client(network_params){
Ok((rpc,bcinfo)) => {
info!("connected");
//let best_block_hash = rpc.get_best_block_hash()?;
//info!("best block hash: {}", best_block_hash);
//let bestblockcount = rpc.get_block_count()?;
//info!("best block height: {}", bestblockcount);
//let best_block_hash_by_height = rpc.get_block_hash(bestblockcount)?;
//info!("best block hash by height: {}", best_block_hash_by_height);
//assert_eq!(best_block_hash_by_height, best_block_hash);
//let from_block= std::cmp::max(0, bestblockcount - 11);
//let mut time_sum:u64=0;
//for i in from_block..bestblockcount{
// let hash = rpc.get_block_hash(i).unwrap();
// let block: bitcoin::Block = rpc.get_by_id(&hash).unwrap();
// time_sum += <u32 as Into<u64>>::into(block.header.time);
//}
//let average_time = time_sum/11;
info!("median time: {}",bcinfo.median_time);
//info!("height time: {}",bcinfo.median_time);
info!("blocks: {}",bcinfo.blocks);
debug!("best block hash: {}",bcinfo.best_block_hash);
let average_time = bcinfo.median_time;
let db = sqlite::open(&cfg.db_file).unwrap();
let sqlquery = "SELECT * FROM tbl_tx WHERE network = :network AND status = :status AND ( locktime < :bestblock_height OR locktime > :locktime_threshold AND locktime < :bestblock_time);";
let query_tx = db.prepare(sqlquery).unwrap().into_iter();
trace!("query_tx: {}",sqlquery);
trace!(":locktime_threshold: {}", LOCKTIME_THRESHOLD );
trace!(":bestblock_time: {}", average_time);
trace!(":bestblock_height: {}", bcinfo.blocks);
trace!(":network: {}", network_params.db_field.clone());
trace!(":status: {}", 0);
//let query_tx = db.prepare("SELECT * FROM tbl_tx where status = :status").unwrap().into_iter();
let mut pushed_txs:Vec<String> = Vec::new();
let mut invalid_txs: std::collections::HashMap<String, String> = HashMap::new();
for row in query_tx.bind::<&[(_, Value)]>(&[
(":locktime_threshold", (LOCKTIME_THRESHOLD as i64).into()),
(":bestblock_time", (average_time as i64).into()),
(":bestblock_height", (bcinfo.blocks as i64).into()),
(":network", network_params.db_field.clone().into()),
(":status", 0.into()),
][..])
.unwrap()
.map(|row| row.unwrap())
{
let tx = row.read::<&str, _>("tx");
let txid = row.read::<&str, _>("txid");
let locktime = row.read::<i64,_>("locktime");
info!("to be pushed: {}: {}",txid, locktime);
match rpc.send_raw_transaction(tx){
Ok(o) => {
/*let mut file = OpenOptions::new()
.append(true) // Set the append option
.create(true) // Create the file if it doesn't exist
.open("valid_txs")?;
let data = format!("{}\t:\t{}\t:\t{}\n",txid,average_time,locktime);
file.write_all(data.as_bytes())?;
drop(file);
*/
info!("tx: {} pusshata PUSHED\n{}",txid,o);
pushed_txs.push(txid.to_string());
},
Err(err) => {
/*let mut file = OpenOptions::new()
.append(true) // Set the append option
.create(true) // Create the file if it doesn't exist
.open("/home/bal/invalid_txs")?;
let data = format!("{}:\t{}\t:\t{}\t:\t{}\n",txid,err,average_time,locktime);
file.write_all(data.as_bytes())?;
drop(file);
*/
warn!("Error: {}\n{}",err,txid);
//store err in invalid_txs
invalid_txs.insert(txid.to_string(), err.to_string());
},
};
}
if pushed_txs.len() > 0 {
let sql = format!("UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",pushed_txs.join("','"));
trace!("sqlok: {}",&sql);
let _ = db.execute(&sql);
}
if invalid_txs.len() > 0 {
for (txid,txerr) in &invalid_txs{
//let _ = db.execute(format!("UPDATE tbl_tx SET status = 2 WHERE txid in ('{}'Yp);",invalid_txs.join("','")));
let sql = format!("UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'");
trace!("sqlerror: {}",&sql);
let _ = db.execute(&sql);
}
}
let _ = send_stats_report(cfg, bcinfo).await;
}
Err(erx)=>{
panic!("impossible to get client {}",erx)
}
}
Ok(())
}
async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> Result<(),reqwest::Error>{
if cfg.send_stats {
debug!("sending report to welist");
let welist_url=env::var("WELIST_SERVER_URL").unwrap_or("https://welist.bitcoin-after.life".to_string());
let client = rClient::new();
let url = format!("{}/ping",welist_url);
let chain=bcinfo.chain.to_string().to_lowercase();
let message = format!("{0}{1}{2}{3}{4}",cfg.url,chain,bcinfo.blocks,bcinfo.median_time,bcinfo.best_block_hash);
let sign = sign_message(cfg.ssl_key_path.as_str(),&message.as_str());
let response = client.post(url)
.header("User-Agent", format!("bal-pusher/{}",VERSION))
.json(&json!(
{
"url": cfg.url,
"chain": chain,
"height": bcinfo.blocks,
"median_time": bcinfo.median_time,
"last_block_hash": bcinfo.best_block_hash,
"signature": sign,
}))
.send().await?;
let body = &(response.text().await?);
info!("Report to welist({})\tSent: {}", welist_url,body);
}else {
debug!("Not sending stats");
}
Ok(())
}
fn sign_message(private_key_path: &str, message: &str) -> String {
let key_data = fs::read(private_key_path).unwrap();
let private_key = PKey::private_key_from_pem(&key_data).unwrap();
let mut signer = Signer::new_without_digest(&private_key).unwrap();
let signature = signer.sign_oneshot_to_vec(message.as_bytes()).unwrap();
let signature_b64 = general_purpose::STANDARD.encode(&signature);
signature_b64
}
fn parse_env(cfg: &mut MyConfig){
match env::var("BAL_PUSHER_ZMQ_LISTENER") {
Ok(value) => {
cfg.zmq_listener = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_REQUEST_FILE") {
Ok(value) => {
cfg.requests_file = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_DB_FILE") {
Ok(value) => {
cfg.db_file = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_BITCOIN_DIR") {
Ok(value) => {
cfg.bitcoin_dir = value;},
Err(_) => {},
}
match env::var("BAL_PUSHER_SEND_STATS") {
Ok(value) => {
cfg.send_stats = value.parse::<bool>().unwrap();
},
Err(_) => {},
}
match env::var("BAL_SERVER_URL") {
Ok(value) => {
cfg.url= value;},
Err(_) => {},
}
match env::var("WELIST_SECRET_CODE") {
Ok(value) => {
cfg.secret_code = value;},
Err(_) => {},
}
match env::var("SSL_KEY_PATH") {
Ok(value) => {
cfg.ssl_key_path = value;},
Err(_) => {},
}
cfg.regtest = parse_env_netconfig(cfg,"regtest");
cfg.signet = parse_env_netconfig(cfg,"signet");
cfg.testnet = parse_env_netconfig(cfg,"testnet");
drop(parse_env_netconfig(cfg,"bitcoin"));
}
fn parse_env_netconfig(cfg_lock: &mut MyConfig, chain: &str) -> NetworkParams{
//fn parse_env_netconfig(cfg_lock: &MutexGuard<MyConfig>, chain: &str) -> &NetworkParams{
let cfg = match chain{
"regtest" => &mut cfg_lock.regtest,
"signet" => &mut cfg_lock.signet,
"testnet" => &mut cfg_lock.testnet,
&_ => &mut cfg_lock.mainnet,
};
match env::var(format!("BAL_PUSHER_{}_HOST",chain.to_uppercase())) {
Ok(value) => { cfg.host= value; },
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_PORT",chain.to_uppercase())) {
Ok(value) => {
match value.parse::<u64>(){
Ok(value) =>{ cfg.port = value.try_into().unwrap(); },
Err(_) => {},
}
}
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_DIR_PATH",chain.to_uppercase())) {
Ok(value) => { cfg.dir_path = value; },
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_DB_FIELD",chain.to_uppercase())) {
Ok(value) => { cfg.db_field = value; },
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_COOKIE_FILE",chain.to_uppercase())) {
Ok(value) => {
cfg.cookie_file = value; },
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_RPC_USER",chain.to_uppercase())) {
Ok(value) => { cfg.rpc_user = value; },
Err(_) => {},
}
match env::var(format!("BAL_PUSHER_{}_RPC_PASSWORD",chain.to_uppercase())) {
Ok(value) => { cfg.rpc_pass = value; },
Err(_) => {},
}
cfg.clone()
}
fn get_default_config()-> MyConfig {
let file = confy::get_configuration_file_path("bal-pusher",None).expect("Error while getting path");
info!("Default configuration file path is: {:#?}", file);
confy::load("bal-pusher",None).expect("cant_load")
}
fn check_zmq_connection(endpoint: &str) -> bool {
trace!("check zmq connection");
let context = Context::new();
let socket = match context.socket(DEALER) {
Ok(sock) => sock,
Err(_) => return false,
};
if socket.connect(endpoint).is_err() {
return false;
}
// Try to send an empty message non-blocking
socket.send("", DONTWAIT).is_ok()
}
// Add this struct to monitor connection health
struct ConnectionMonitor {
last_message_time: Instant,
timeout: Duration,
consecutive_timeouts: u32,
max_consecutive_timeouts: u32,
}
impl ConnectionMonitor {
fn new(timeout_secs: u64, max_timeouts: u32) -> Self {
Self {
last_message_time: Instant::now(),
timeout: Duration::from_secs(timeout_secs),
consecutive_timeouts: 0,
max_consecutive_timeouts: max_timeouts,
}
}
fn update(&mut self) {
self.last_message_time = Instant::now();
self.consecutive_timeouts = 0;
}
fn check_connection(&mut self) -> ConnectionStatus {
let elapsed = self.last_message_time.elapsed();
if elapsed > self.timeout {
self.consecutive_timeouts += 1;
if self.consecutive_timeouts >= self.max_consecutive_timeouts {
ConnectionStatus::Lost(elapsed)
} else {
ConnectionStatus::Warning(elapsed)
}
} else {
ConnectionStatus::Healthy
}
}
fn reset(&mut self) {
self.consecutive_timeouts = 0;
self.last_message_time = Instant::now();
}
}
enum ConnectionStatus {
Healthy,
Warning(Duration),
Lost(Duration),
}
#[tokio::main]
async fn main()-> std::io::Result<()>{
env_logger::init();
let mut cfg: MyConfig = match env::var("BAL_PUSHER_CONFIG_FILE") {
Ok(value) => {
match confy::load_path(&value){
Ok(val) => {
info!("The configuration file path is: {:#?}", value);
val
},
Err(err) => {
error!("{}",err);
get_default_config()
}
}
},
Err(_) => {
get_default_config()
},
};
parse_env(&mut cfg);
let mut args = std::env::args();
let _exe_name = args.next().unwrap();
let arg_network = match args.next(){
Some(nargs) => nargs,
None => "bitcoin".to_string()
};
let network = match arg_network.as_str(){
"testnet" => Network::Testnet,
"signet" => Network::Signet,
"regtest" => Network::Regtest,
_ => Network::Bitcoin,
};
info!("Network: {}",arg_network);
let network_params = get_network_params(&cfg,network);
let context = Context::new();
let socket: Socket = context.socket(zmq::SUB).unwrap();
let zmq_address = cfg.zmq_listener.clone();
info!("zmq listening on: {}",zmq_address);
socket.connect(&zmq_address).unwrap();
socket.set_subscribe(b"").unwrap();
let _ = main_result(&cfg,network_params).await;
info!("waiting new blocks..");
let mut last_seq:Vec<u8>=[0;4].to_vec();
let mut counter=0;
let max=100;
loop {
let message = socket.recv_multipart(0).unwrap();
let topic = message[0].clone();
let body = message[1].clone();
let seq = message[2].clone();
last_seq = seq;
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
trace!("ZMQ:GET BODY: {}", hex::encode(&body));
if topic == b"hashblock" {
info!("NEW BLOCK: {}", hex::encode(&body));
let _ = main_result(&cfg,network_params).await;
}
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
}
}
fn seq_to_str(seq:&Vec<u8>) -> String{
if seq.len()==4{
let mut rdr = Cursor::new(seq);
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
return sequence.to_string();
}
"Unknown".to_string()
}

View File

@ -1,594 +0,0 @@
use bytes::Bytes;
use http_body_util::{ combinators::BoxBody, BodyExt, Empty, Full };
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{ Method, Request, Response, StatusCode };
use tokio::net::TcpListener;
use hyper_util::rt::TokioIo;
use std::net::IpAddr;
use std::env;
//use std::time::{SystemTime,UNIX_EPOCH};
use std::fs;
use std::sync::{ Arc, Mutex, MutexGuard };
//use std::net::SocketAddr;
use std::collections::HashMap;
use sqlite::{ State, Value, Connection };
use bitcoin::{ consensus, Transaction, Network };
use hex_conservative::FromHex;
use regex::Regex;
use serde::{ Serialize, Deserialize};
use log::{ info, error, trace, debug};
use serde_json;
use chrono::Utc;
#[path = "../db.rs"]
mod db;
use crate::db::{ create_database, get_next_address_index, insert_xpub, save_new_address, get_last_used_address_by_ip, execute_insert };
#[path = "../xpub.rs"]
mod xpub;
use crate::xpub::new_address_from_xpub;
const VERSION:&str="0.2.2";
const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"];
#[derive(Debug, Clone,Serialize, Deserialize)]
struct NetConfig {
address: String,
fixed_fee: u64,
xpub: bool,
network: Network,
name: String,
enabled: bool,
}
impl NetConfig {
fn default_network(name:String, network: Network) -> Self {
NetConfig {
address: "".to_string(),
fixed_fee: 50000,
xpub: false,
name,
network,
enabled: false,
}
}
}
#[derive(Debug, Serialize, Deserialize,Clone)]
struct MyConfig {
regtest: NetConfig,
signet: NetConfig,
testnet: NetConfig,
mainnet: NetConfig,
info: String,
bind_address: String,
bind_port: u16, // Changed to u16 for port numbers
db_file: String,
pub_key_path: String,
}
#[derive(Debug,Serialize, Deserialize)]
pub struct Info {
pub address: String,
pub base_fee: u64,
pub chain: String,
pub info: String,
pub version: String
}
impl Default for MyConfig {
fn default() -> Self {
MyConfig {
regtest: NetConfig::default_network("regtest".to_string(), Network::Regtest),
signet: NetConfig::default_network("signet".to_string(), Network::Signet),
testnet: NetConfig::default_network("testnet".to_string(), Network::Testnet),
mainnet: NetConfig::default_network("bitcoin".to_string(), Network::Bitcoin),
bind_address: "127.0.0.1".to_string(),
bind_port: 9137,
db_file: "bal.db".to_string(),
info: "Will Executor Server".to_string(),
pub_key_path: "public_key.pem".to_string(),
}
}
}
impl MyConfig {
fn get_net_config(&self, param: &str) -> &NetConfig{
match param {
"regtest" => &self.regtest,
"testnet" => &self.testnet,
"signet" => &self.signet,
_ => &self.mainnet,
}
}
}
async fn echo_version(
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
Ok(Response::new(full(VERSION)))
}
async fn echo_home(cfg: &MyConfig
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
debug!("echo_home: {}", cfg.info );
Ok(Response::new(full(cfg.info.clone())))
}
async fn echo_pub_key(
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let pub_key = fs::read_to_string(&cfg.pub_key_path)
.expect(format!("Failed to read public key file {}",cfg.pub_key_path).as_str());
Ok(Response::new(full(pub_key)))
}
async fn echo_info(
param: &str,
cfg: &MyConfig,
remote_addr: String,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo info!!!{}",param);
let netconfig=MyConfig::get_net_config(cfg,param);
if !netconfig.enabled {
debug!("network disabled {}",param);
return Ok(Response::new(full("network disabled")));
}
let address = match netconfig.xpub{
false => {
let address = netconfig.address.to_string();
trace!("is address: {}",&address);
address
},
true => {
let db = sqlite::open(&cfg.db_file).unwrap();
match get_last_used_address_by_ip(&db,&netconfig.name,&netconfig.address,&remote_addr){
Some(address)=>address,
None => {
let next = get_next_address_index(&db,&netconfig.name,&netconfig.address);
let address = new_address_from_xpub(&netconfig.address,next.1,netconfig.network).unwrap();
save_new_address(&db,next.0,&address.0,&address.1,&remote_addr);
debug!("save new address {} {}",address.0,address.1);
trace!("next {} {}",next.0,next.1);
address.0
}
}
}
};
let info = Info{
address,
base_fee: netconfig.fixed_fee,
chain: netconfig.network.to_string(),
info: cfg.info.to_string(),
version: VERSION.to_string()
};
trace!("address: {:#?}",info);
match serde_json::to_string(&info){
Ok(json_data) => {
debug!("echo info reply: {}", json_data);
return Ok(Response::new(full(json_data)));
},
Err(err) => Ok(Response::new(full(format!("error:{}",err))))
}
}
async fn echo_search(whole_body: &Bytes,
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo search!!!");
let strbody = std::str::from_utf8(whole_body).unwrap();
info!("{}",strbody);
let mut response = Response::new(full("Bad data received".to_owned()));
*response.status_mut() = StatusCode::BAD_REQUEST;
if !strbody.is_empty() && strbody.len()<=70 {
let db = sqlite::open(&cfg.db_file).unwrap();
let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ? LIMIT 1").unwrap();
statement.bind((1, strbody)).unwrap();
if let Ok(State::Row) = statement.next() {
let mut response_data = HashMap::new();
match statement.read::<String, _>("status") {
Ok(value) => response_data.insert("status", value),
Err(e) => {
error!("Error reading status: {}", e);
//response_data.insert("status", "Error".to_string())
None
}
};
// Read the transaction (tx)
match statement.read::<String, _>("tx") {
Ok(value) => response_data.insert("tx", value),
Err(e) => {
error!("Error reading tx: {}", e);
//response_data.insert("tx", "Error".to_string())
None
}
};
match statement.read::<String, _>("our_address") {
Ok(value) => response_data.insert("our_address", value),
Err(e) => {
error!("Error reading address: {}", e);
//response_data.insert("tx", "Error".to_string())
None
}
};
match statement.read::<String, _>("our_fees") {
Ok(value) => response_data.insert("our_fees", value),
Err(e) => {
error!("Error reading fees: {}", e);
//response_data.insert("tx", "Error".to_string())
None
}
};
// Read the request id (reqid)
match statement.read::<String, _>("reqid") {
Ok(value) => response_data.insert("time", value),
Err(e) => {
error!("Error reading reqid: {}", e);
//response_data.insert("time", "Error".to_string())
None
}
};
response = match serde_json::to_string(&response_data){
Ok(json_data) => Response::new(full(json_data)),
Err(_) => { response }
};
return Ok(response);
}
}
Ok(response)
}
async fn echo_push(whole_body: &Bytes,
cfg: &MyConfig,
param: &str,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
//let whole_body = req.collect().await?.to_bytes();
let strbody = std::str::from_utf8(whole_body).unwrap();
let mut response = Response::new(full("Bad data received".to_owned()));
let mut response_not_enable = Response::new(full("Network not enabled".to_owned()));
*response.status_mut() = StatusCode::BAD_REQUEST;
*response_not_enable.status_mut()=StatusCode::BAD_REQUEST;
let netconfig = MyConfig::get_net_config(cfg,param);
if !netconfig.enabled{
return Ok(response_not_enable);
}
let req_time = Utc::now().timestamp_nanos_opt().unwrap(); // Returns i64
let db = sqlite::open(&cfg.db_file).unwrap();
let lines = strbody.split("\n");
let sqltxshead = "INSERT INTO tbl_tx (txid, wtxid, ntxid, tx, locktime, reqid, network, our_address, our_fees)".to_string();
let mut sqltxs = "".to_string();
let sqlinpshead = "INSERT INTO tbl_inp (txid, in_txid, in_vout )".to_string();
let mut sqlinps = "".to_string();
let sqloutshead = "INSERT INTO tbl_out (txid, vout, script_pubkey, amount )".to_string();
let mut sqlouts = "".to_string();
let mut union_tx = true;
let mut union_inps = true;
let mut union_outs = true;
let mut already_present = false;
let mut ptx:Vec<(usize, Value)> = vec![];
let mut pinps:Vec<(usize, Value)> = vec![];
let mut pouts:Vec<(usize, Value)> = vec![];
let mut linenum = 1;
let mut lineinp = 1;
let mut lineout = 1;
for line in lines {
if line.is_empty(){
trace!("line len is: {}",line.len());
continue
}
let linea = format!("{req_time}:{line}");
info!("New Tx: {}", linea);
let raw_tx = match Vec::<u8>::from_hex(line) {
Ok(raw_tx) => raw_tx,
Err(err) => {
error!("rawtx error: {}",err);
continue
}
};
if !raw_tx.is_empty(){
trace!("len: {}",raw_tx.len());
let tx: Transaction = match consensus::deserialize(&raw_tx){
Ok(tx) => tx,
Err(err) => {error!("error: unable to parse tx: {}\n{}",line,err);continue}
};
let txid = tx.compute_txid().to_string();
trace!("txid: {}",txid);
let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ?").unwrap();
statement.bind((1,&txid[..])).unwrap();
if let Ok(State::Row) = statement.next() {
trace!("already present");
already_present=true;
continue;
}
let ntxid = tx.compute_ntxid();
let wtxid = tx.compute_wtxid();
let mut found = false;
let locktime = tx.lock_time;
let mut our_address:String = "".to_string();
let mut our_fees:u64 = 0;
for input in tx.input{
if !union_inps {
sqlinps = format!("{sqlinps} UNION ALL");
}else{
union_inps = false;
}
sqlinps = format!("{sqlinps} SELECT ?, ?, ?");
pinps.push((lineinp,Value::String(txid.to_string())));
pinps.push((lineinp+1,Value::String(input.previous_output.txid.to_string())));
pinps.push((lineinp+2,Value::String(input.previous_output.vout.to_string())));
lineinp += 3;
}
if netconfig.fixed_fee ==0 {
found = true;
}
for (idx,output) in tx.output.into_iter().enumerate(){
let script_pubkey = output.script_pubkey;
let address = match bitcoin::Address::from_script(script_pubkey.as_script(), netconfig.network){
Ok(address) => address.to_string(),
Err(_) => String::new(),
};
let amount = output.value;
our_fees = netconfig.fixed_fee;//search wllexecutor output
if netconfig.xpub{
let sql="select * from tbl_address where address=?";
let mut stmt = db.prepare(sql).expect("failed to fetch addresses");
stmt.bind((1,Value::String(address.to_string()))).unwrap();
if let Ok(State::Row) = stmt.next() {
our_address = address.to_string();
}
} else {
our_address = netconfig.address.to_string();
}
if address == our_address && amount.to_sat() >= netconfig.fixed_fee {
our_fees = amount.to_sat();
our_address = netconfig.address.to_string();
found = true;
trace!("address and fees are correct {}: {}",our_address,our_fees);
}
if !union_outs {
sqlouts = format!("{sqlouts} UNION ALL");
}else{
union_outs = false;
}
sqlouts = format!("{sqlouts} SELECT ?, ?, ?, ?");
pouts.push((lineout,Value::String(txid.to_string())));
pouts.push((lineout+1,Value::Integer(idx.try_into().unwrap())));
pouts.push((lineout+2,Value::String(script_pubkey.to_string())));
pouts.push((lineout+3,Value::Integer(amount.to_sat().try_into().unwrap())));
lineout += 4;
}
if !found {
error!("willexecutor output not found ");
return Ok(response)
} else {
if !union_tx {
sqltxs = format!("{sqltxs} UNION ALL");
}else{
union_tx = false;
}
sqltxs = format!("{sqltxs} SELECT ?, ?, ?, ?, ?, ?, ?, ?, ?");
ptx.push((linenum,Value::String(txid)));
ptx.push((linenum+1,Value::String(wtxid.to_string())));
ptx.push((linenum+2,Value::String(ntxid.to_string())));
ptx.push((linenum+3,Value::String(line.to_string())));
ptx.push((linenum+4,Value::String(locktime.to_string())));
ptx.push((linenum+5,Value::String(req_time.to_string())));
ptx.push((linenum+6,Value::String(netconfig.name.to_string())));
ptx.push((linenum+7,Value::String(our_address.to_string())));
ptx.push((linenum+8,Value::String(our_fees.to_string())));
linenum += 9;
}
}else{
trace!("rawTx len is: {}",raw_tx.len());
debug!("{}",&sqltxs);
}
}
if sqltxs.is_empty() && already_present {
return Ok(Response::new(full("already present")))
}
let sqltxs = format!("{}{};", sqltxshead, sqltxs);
let sqlinps = format!("{}{};", sqlinpshead, sqlinps);
let sqlouts = format!("{}{};", sqloutshead, sqlouts);
if let Err(err) = execute_insert(&db, sqltxs, ptx, sqlinps, pinps, sqlouts, pouts){
debug!("{}",err);
return Ok(response);
}
Ok(Response::new(full("thx")))
}
fn match_uri<'a>(path: &str, uri: &'a str) -> Option<&'a str> {
let re = Regex::new(path).unwrap();
if let Some(captures) = re.captures(uri) {
if let Some(param) = captures.name("param") {
return Some(param.as_str());
}
}
None
}
async fn echo(
req: Request<hyper::body::Incoming>,
cfg: &MyConfig,
ip: &String
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let mut not_found = Response::new(empty());
*not_found.status_mut() = StatusCode::NOT_FOUND;
let mut ret: Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> = Ok(not_found);
let uri = req.uri().path().to_string();
let remote_addr = req.headers().get("X-Real-IP").and_then(|value| value.to_str().ok()).and_then(|xff| xff.split(',').next()).map(|ip| ip.trim().to_string()).unwrap_or_else(|| ip.to_string());
trace!("{}: {}",remote_addr,uri);
match *req.method() {
// Serve some instructions at /
Method::POST => {
let whole_body = req.collect().await?.to_bytes();
if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/pushtxs$",uri.as_str()) {
//let whole_body = collect_body(req,512_000).await?;
ret = echo_push(&whole_body,cfg,param).await;
}
if uri=="/searchtx"{
//let whole_body = collect_body(req,64).await?;
ret = echo_search(&whole_body,cfg).await;
}
ret
}
Method::GET => {
if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/info$",uri.as_str()) {
ret = echo_info(param,cfg,remote_addr).await;
}
if uri=="/version"{
ret= echo_version().await;
}
if uri=="/.pub_key.pem" {
ret = echo_pub_key(cfg).await;
}
if uri=="/"{
ret = echo_home(cfg).await;
}
ret
}
// Return the 404 Not Found for other routes.
_ => ret
}
}
fn empty() -> BoxBody<Bytes, hyper::Error> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}
fn parse_env(cfg: &Arc<Mutex<MyConfig>>){
for (key, value) in std::env::vars() {
debug!("ENVIRONMENT {key}: {value}");
}
let mut cfg_lock = cfg.lock().unwrap();
if let Ok(value) = env::var("BAL_SERVER_DB_FILE") {
debug!("BAL_SERVER_DB_FILE: {}",value);
cfg_lock.db_file = value;
}
if let Ok(value) = env::var("BAL_SERVER_BIND_ADDRESS") {
debug!("BAL_SERVER_BIND_ADDRESS: {}",value);
cfg_lock.bind_address= value;
}
if let Ok(value) = env::var("BAL_SERVER_BIND_PORT") {
debug!("BAL_SERVER_BIND_PORT: {}",value);
if let Ok(v) = value.parse::<u16>(){
cfg_lock.bind_port = v;
}
}
if let Ok(value) = env::var("BAL_SERVER_PUB_KEY_PATH") {
debug!("BAL_SERVER_PUB_KEY_PATH: {}",value);
cfg_lock.pub_key_path = value;
}
if let Ok(value) = env::var("BAL_SERVER_INFO"){
debug!("BAL_SERVER_INFO: {}",value);
cfg_lock.info = value;
}
cfg_lock = parse_env_netconfig(cfg_lock,"regtest");
cfg_lock = parse_env_netconfig(cfg_lock,"signet");
cfg_lock = parse_env_netconfig(cfg_lock,"testnet");
drop(parse_env_netconfig(cfg_lock,"bitcoin"));
}
fn parse_env_netconfig<'a>(mut cfg_lock: MutexGuard<'a, MyConfig>, chain: &'a str) -> MutexGuard<'a, MyConfig>{
let cfg = match chain{
"regtest" => &mut cfg_lock.regtest,
"signet" => &mut cfg_lock.signet,
"testnet" => &mut cfg_lock.testnet,
&_ => &mut cfg_lock.mainnet,
};
if let Ok(value) = env::var(format!("BAL_SERVER_{}_ADDRESS",chain.to_uppercase())) {
debug!("BAL_SERVER_{}_ADDRESS: {}",chain.to_uppercase(),value);
cfg.address = value;
if cfg.address.len() > 5 {
if cfg.address[1..4] == *"pub" {
cfg.xpub=true;
trace!("is_xpub");
}
cfg.enabled=true;
}
}
if let Ok(value) = env::var(format!("BAL_SERVER_{}_FIXED_FEE",chain.to_uppercase())) {
debug!("BAL_SERVER_{}_FIXED_FEE: {}",chain.to_uppercase(),value);
if let Ok(v) = value.parse::<u64>(){
cfg.fixed_fee = v;
}
}
cfg_lock
}
fn init_network(db: &Connection, cfg: &MyConfig){
for network in NETWORKS{
let netconfig = MyConfig::get_net_config(cfg,network);
insert_xpub(db,&netconfig.name,&netconfig.address);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
env_logger::init();
let cfg: Arc<Mutex<MyConfig>> =Arc::<Mutex<MyConfig>>::default();
parse_env(&cfg);
let cfg_lock = cfg.lock().unwrap();
let db = sqlite::open(&cfg_lock.db_file).unwrap();
create_database(&db);
init_network(&db,&cfg_lock);
let addr = cfg_lock.bind_address.to_string();
let addr: IpAddr = addr.parse()?;
let listener = TcpListener::bind((addr,cfg_lock.bind_port)).await?;
info!("Listening on http://{}:{}", addr,cfg_lock.bind_port);
loop {
let (stream, _) = listener.accept().await?;
let ip = stream.peer_addr()?.to_string().split(":").next().unwrap().to_string();
let io = TokioIo::new(stream);
tokio::task::spawn({
let cfg = cfg_lock.clone();
async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(|req: Request<hyper::body::Incoming>| async {
echo(req,&cfg,&ip).await
}))
.await
{
error!("Error serving connection: {:?}", err);
}
}
});
}
}

147
src/db.rs
View File

@ -1,147 +0,0 @@
use sqlite::{ Connection, Value, State, Error };
use log::{info, trace, error};
pub fn create_database(db: &Connection){
info!("database sanity check");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_tx (txid PRIMARY KEY, date_creation TIMESTAMP DEFAULT CURRENT_TIMESTAMP, date_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP, wtxid, ntxid, tx, locktime integer, network, network_fees, reqid, our_fees, our_address, status integer DEFAULT 0);");
let _ = db.execute("ALTER TABLE tbl_tx ADD COLUMN push_err TEXT");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_inp(id, txid, in_txid, in_vout);");
let _ = db.execute("CREATE UNIQUE INDEX ON tbl_inp(txid,in_txid,in_vout);");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_out(id, txid, script_pubkey, amount, vout);");
let _ = db.execute("CREATE UNIQUE INDEX ON tbl_out(txid, script_pubkey, amount, vout);");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_xpub (id INTEGER PRIMARY KEY , network TEXT, xpub TEXT, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP,path_idx INTEGER DEFAULT -1);");
let _ = db.execute("CREATE UNIQUE INDEX idx_xpub ON tbl_xpub (network, xpub)");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);");
let _ = db.execute("UPDATE tbl_tx set network='bitcoin' where network='mainnet');");
}
/*
pub fn get_xpub_id(db: &Connection, network: &String, xpub: &String) -> Option<i64>{
let mut stmt = db.prepare("SELECT * FROM tbl_xpub where network = ? and xpub = ?;").unwrap();
let _ = stmt.bind((1,Value::String(network.to_string()))).unwrap();
let _ = stmt.bind((2,Value::String(xpub.to_string()))).unwrap();
if let Ok(State::Row) = stmt.next(){
return Some(stmt.read::<i64, _>("id").unwrap());
} else {
return None;
}
}
*/
pub fn insert_xpub(db: &Connection, network: &String, xpub: &String){
if xpub != "" {
trace!("going to insert: {} xpub:{}", network, xpub);
let mut stmt = db.prepare ("INSERT INTO tbl_xpub(network,xpub) VALUES(?, ?);").unwrap();
let _ = stmt.bind((1,Value::String(network.to_string()))).unwrap();
let _ = stmt.bind((2,Value::String(xpub.to_string()))).unwrap();
let _ = stmt.next();
}
}
pub fn get_last_used_address_by_ip(db: &Connection, network: &String, xpub: &String, address: &String) -> Option<String>{
let mut stmt = db.prepare("SELECT tbl_address.address FROM tbl_xpub join tbl_address on(tbl_xpub.id = tbl_address.xpub) where tbl_xpub.network = ? and tbl_address.remote_address = ? and tbl_xpub.xpub = ? ORDER BY tbl_address.date_create DESC LIMIT 1;").unwrap();
let _ = stmt.bind((1,Value::String(network.to_string())));
let _ = stmt.bind((2,Value::String(address.to_string())));
let _ = stmt.bind((3,Value::String(xpub.to_string())));
if let Ok(State::Row) = stmt.next(){
let address = stmt.read::<String,_>("address").unwrap();
return Some(address);
}else{
return None;
}
}
pub fn get_next_address_index(db: &Connection, network: &String, xpub: &String) -> (i64,i64){
let mut stmt = db.prepare("UPDATE tbl_xpub SET path_idx = path_idx + 1 WHERE network = ? and xpub= ? RETURNING path_idx,id;").unwrap();
stmt.bind((1,Value::String(network.to_string()))).unwrap();
stmt.bind((2,Value::String(xpub.to_string()))).unwrap();
match stmt.next(){
Ok(State::Row) =>{
let next = stmt.read::<i64,_>("path_idx").unwrap();
let id = stmt.read::<i64,_>("id").unwrap();
return (id,next);
},Err(_)=> {
return (0,0);
},Ok(State::Done) =>{
return (0,0);
}
};
}
pub fn save_new_address(db: &Connection,xpub: i64,address: &String, path: &String,remote_addr: &String){
let mut stmt = db.prepare("INSERT INTO tbl_address(address,path,xpub,remote_address) VALUES(?,?,?,?);").unwrap();
stmt.bind((1,Value::String(address.to_string()))).unwrap();
stmt.bind((2,Value::String(path.to_string()))).unwrap();
stmt.bind((3,Value::Integer(xpub))).unwrap();
stmt.bind((4,Value::String(remote_addr.to_string()))).unwrap();
let _ = stmt.next();
}
pub fn execute_insert(db: &Connection,
sqltxs: String,
ptx: Vec<(usize, Value)>,
sqlinp: String,
pinp: Vec<(usize, Value)>,
sqlout: String,
pout: Vec<(usize, Value)>) -> Result<(),Error>{
let _ = db.execute("BEGIN TRANSACTION");
let mut stmt = db.prepare(sqltxs.as_str()).expect("failed to prepare sqltxs");
if let Err(err) = stmt.bind::<&[(_,Value)]>(&ptx[..]) {
error!("error binding transaction parameters: {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
}
if let Err(err) = stmt.next() {
error!("error inserting transactions {}",err);
let _ = db.execute("ROLLBACK");
}else{
let mut stmt = db.prepare(sqlinp.as_str()).expect("failed to prepare sqlinp");
if let Err(err) = stmt.bind::<&[(_,Value)]>(&pinp[..]) {
error!("error binding inputs parameters {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
}
if let Err(err) = stmt.next() {
error!("error inserting inputs {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
}else{
let mut stmt = db.prepare(sqlout.as_str()).expect("failed to prepare sqlout");
if let Err(err) = stmt.bind::<&[(_,Value)]>(&pout[..]) {
error!("error binding outs parameters {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
}
if let Err(err) = stmt.next() {
error!("error inserting outs {}", err);
let _ = db.execute("ROLLBACK");
return Err(err);
}
}
}
let _ = db.execute("COMMIT");
Ok(())
}
pub fn get_total_transaction_number(db: Connection, network: &String) -> Result<i64,Error> {
let mut stmt = db.prepare("SELECT COUNT(*) as total_number FROM tbl_tx where network = ?;").unwrap();
stmt.bind((1,Value::String(network.to_string()))).unwrap();
match stmt.next(){
Ok(State::Row)=>{
Ok(stmt.read::<i64,_>("total_number").unwrap())
},
Ok(sqlite::State::Done) => todo!(),
Err(err)=>Err(err)
}
}

502
src/main.rs Normal file
View File

@ -0,0 +1,502 @@
use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use tokio::net::TcpListener;
use hyper_util::rt::TokioIo;
use std::net::IpAddr;
use std::env;
use std::time::{SystemTime,UNIX_EPOCH};
use std::sync::{Arc, Mutex, MutexGuard};
use std::collections::HashMap;
use sqlite::{State,Connection};
use bitcoin::{consensus, Transaction, Network};
use hex_conservative::FromHex;
use regex::Regex;
use serde::{Serialize, Deserialize};
use log::{info,error,trace,debug};
use serde_json;
#[derive(Debug, Clone,Serialize, Deserialize)]
struct NetConfig {
address: String,
fixed_fee: u64,
}
impl Default for NetConfig {
fn default() -> Self {
NetConfig {
address: "".to_string(),
fixed_fee: 50000,
}
}
}
impl NetConfig {
fn default_regtest() -> Self {
NetConfig {
address: "".to_string(),
fixed_fee: 50000,
}
}
}
#[derive(Debug, Serialize, Deserialize,Clone)]
struct MyConfig {
regtest: NetConfig,
signet: NetConfig,
testnet4: NetConfig,
testnet3: NetConfig,
mainnet: NetConfig,
bind_address: String,
bind_port: u16, // Changed to u16 for port numbers
db_file: String,
}
impl Default for MyConfig {
fn default() -> Self {
MyConfig {
regtest: NetConfig::default_regtest(),
signet: NetConfig::default(), // Use default for other networks
testnet4: NetConfig::default(),
testnet3: NetConfig::default(),
mainnet: NetConfig::default(),
bind_address: "127.0.0.1".to_string(),
bind_port: 9137, // Ensure this is a u16
db_file: "../bal.db".to_string(),
}
}
}
impl MyConfig {
fn get_net_config(&self, param: &str) -> &NetConfig{
match param {
"regtest" => &self.regtest,
"testnet" => &self.testnet3,
"signet" => &self.signet,
_ => &self.mainnet,
}
}
}
async fn echo_info(
param: &str,
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo info!!!{}",param);
let netconfig=MyConfig::get_net_config(cfg,param);
return Ok(Response::new(full("{\"address\":\"".to_owned()+&netconfig.address+"\",\"base_fee\":\""+&netconfig.fixed_fee.to_string()+"\"}")));
}
async fn echo_search(whole_body: &Bytes,
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
info!("echo search!!!");
let strbody = std::str::from_utf8(&whole_body).unwrap();
info!("{}",strbody);
trace!("{}",strbody.len());
let mut response = Response::new(full("Bad data received".to_owned()));
*response.status_mut() = StatusCode::BAD_REQUEST;
if strbody.len() >0 && strbody.len()<=70 {
let db = sqlite::open(&cfg.db_file).unwrap();
trace!("qua ci arrivo");
let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ?").unwrap();
statement.bind((1, strbody)).unwrap();
while let Ok(State::Row) = statement.next() {
trace!("qua tutto ok");
let mut response_data = HashMap::new();
match statement.read::<String, _>("status") {
Ok(value) => response_data.insert("status", value),
Err(e) => {
error!("Error reading status: {}", e);
break;
//response_data.insert("status", "Error".to_string())
}
};
// Read the transaction (tx)
match statement.read::<String, _>("tx") {
Ok(value) => response_data.insert("tx", value),
Err(e) => {
error!("Error reading tx: {}", e);
break;
//response_data.insert("tx", "Error".to_string())
}
};
match statement.read::<String, _>("our_address") {
Ok(value) => response_data.insert("our_address", value),
Err(e) => {
error!("Error reading address: {}", e);
break;
//response_data.insert("tx", "Error".to_string())
}
};
match statement.read::<String, _>("our_fees") {
Ok(value) => response_data.insert("our_fees", value),
Err(e) => {
error!("Error reading fees: {}", e);
break;
//response_data.insert("tx", "Error".to_string())
}
};
// Read the request id (reqid)
match statement.read::<String, _>("reqid") {
Ok(value) => response_data.insert("time", value),
Err(e) => {
error!("Error reading reqid: {}", e);
break;
//response_data.insert("time", "Error".to_string())
}
};
response = match serde_json::to_string(&response_data){
Ok(json_data) => Response::new(full(json_data)),
Err(_) => {break;}
};
return Ok(response);
}
}
Ok(response)
}
async fn echo_push(whole_body: &Bytes,
cfg: &MyConfig,
param: &str,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
//let whole_body = req.collect().await?.to_bytes();
let strbody = std::str::from_utf8(&whole_body).unwrap();
let lines = strbody.split("\n");
let mut response = Response::new(full("Bad data received".to_owned()));
*response.status_mut() = StatusCode::BAD_REQUEST;
debug!("network: {}", param);
let network = match param{
"testnet" => Network::Testnet,
"signet" => Network::Signet,
"regtest" => Network::Regtest,
&_ => Network::Bitcoin,
};
let req_time = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_nanos();
let sqltxshead = "INSERT INTO tbl_tx (txid, wtxid, ntxid, tx, locktime, reqid, network, our_address, our_fees)".to_string();
let mut sqltxs = "".to_string();
//let mut sqlinps = "INSERT INTO tbl_input (txid, in_txid,in_vout)".to_string();
//let mut sqlouts = "INSERT INTO tbl_output (txid, script_pubkey, address, amount)\n".to_string();
let mut union_tx = true;
let mut already_present = false;
//let mut union_inps = true;
//let mut union_outs = true;
let db = sqlite::open(&cfg.db_file).unwrap();
let netconfig = MyConfig::get_net_config(cfg,param);
for line in lines {
if line.len() == 0{
trace!("line len is: {}",line.len());
continue
}
let linea = format!("{req_time}:{line}");
info!("New Tx: {}", linea);
let raw_tx = match Vec::<u8>::from_hex(line) {
Ok(raw_tx) => raw_tx,
Err(err) => {
error!("rawtx error: {}",err);
continue
}
};
if raw_tx.len() > 0 {
trace!("len: {}",raw_tx.len());
let tx: Transaction = match consensus::deserialize(&raw_tx){
Ok(tx) => tx,
Err(err) => {error!("error: unable to parse tx: {}\n{}",line,err);continue}
};
let txid = tx.txid().to_string();
trace!("txid: {}",txid);
let mut statement = db.prepare("SELECT * FROM tbl_tx WHERE txid = ?").unwrap();
statement.bind((1,&txid[..])).unwrap();
//statement.bind((1,"Bob")).unwrap();
if let Ok(State::Row) = statement.next() {
trace!("already present");
already_present=true;
continue;
}
let ntxid = tx.ntxid();
let wtxid = tx.wtxid();
let mut found = false;
let locktime = tx.lock_time;
let mut our_fees = 0;
let mut our_address:String = "".to_string();
//dbg!(netconfig.fixed_fee);
if netconfig.fixed_fee >0 {
for output in tx.output{
let script_pubkey = output.script_pubkey;
let address = match bitcoin::Address::from_script(script_pubkey.as_script(), network){
Ok(address) => address.to_string(),
Err(_) => String::new(),
};
//dbg!(&address);
let amount = output.value;
//dbg!(&amount);
//search wllexecutor output
if address == netconfig.address.to_string() && amount.to_sat() >= netconfig.fixed_fee{
our_fees = amount.to_sat();
our_address = netconfig.address.to_string();
found = true;
trace!("address and fees are correct {}: {}",our_address,our_fees);
break;
}else {
trace!("address and fees not found {}: {}",address,amount.to_sat());
trace!("address and fees not found {}: {}",netconfig.address.to_string(),netconfig.fixed_fee);
}
}
} else { found = true; }
if found == false{
error!("willexecutor output not found ");
//return Ok(response)
} else {
if union_tx == false {
sqltxs = format!("{sqltxs} UNION ALL");
}else{
union_tx = false;
}
sqltxs = format!("{sqltxs} SELECT '{txid}', '{wtxid}', '{ntxid}', '{line}', '{locktime}', '{req_time}', '{network}','{our_address}',{our_fees}");
}
}
else{
trace!("rawTx len is: {}",raw_tx.len());
debug!("{}",&sqltxs);
}
}
debug!("SQL: {}",sqltxs);
if sqltxs.len()== 0{
if already_present == true{
//dbg!(already_present);
return Ok(Response::new(full("already present")))
}
}
let _ = db.execute("BEGIN TRANSACTION");
let sql = format!("{}{}",sqltxshead,sqltxs);
if let Err(err) = db.execute(&sql){
error!("error executing sql:{} - {}",&sql,err);
let _ = db.execute("ROLLBACK");
//dbg!(&already_present);
if already_present == true{
trace!("already_present = True");
return Ok(Response::new(full("already present")))
}
return Ok(response)
}
//if !error {
// if let Err(_) = db.execute(sqlinps){
// let _ = db.execute("ROLLBACK");
// error = true
// }
//}
//if !error {
// if let Err(_) = db.execute(sqlouts){
// let _ = db.execute("ROLLBACK");
// error = true;
// }
//}
let _ = db.execute("COMMIT");
Ok(Response::new(full("thx")))
}
fn create_database(db: Connection){
info!("database sanity check");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_tx (txid PRIMARY KEY, wtxid, ntxid, tx, locktime integer, network, network_fees, reqid, our_fees, our_address, status integer DEFAULT 0);");
let _ = db.execute("ALTER TABLE tbl_tx ADD COLUMN push_err TEXT");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_input (txid, in_txid,in_vout, spend_txidi);");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_output (txid, script_pubkey, address, amount);");
let _ = db.execute("CREATE UNIQUE INDEX idx_tbl_input ON(txid, txid,in_txid,in_vout)");
}
fn match_uri<'a>(path: &str, uri: &'a str) -> Option<&'a str> {
let re = Regex::new(path).unwrap();
if let Some(captures) = re.captures(uri) {
if let Some(param) = captures.name("param") {
return Some(param.as_str());
}
}
None
}
/// This is our service handler. It receives a Request, routes on its
/// path, and returns a Future of a Response.
async fn echo(
req: Request<hyper::body::Incoming>,
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let mut not_found = Response::new(empty());
*not_found.status_mut() = StatusCode::NOT_FOUND;
let mut ret: Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> = Ok(not_found);
let uri = req.uri().path().to_string();
//dbg!(&req);
//dbg!(&uri);
match req.method() {
// Serve some instructions at /
&Method::POST => {
let whole_body = req.collect().await?.to_bytes();
if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/pushtxs$",uri.as_str()) {
//let whole_body = collect_body(req,512_000).await?;
ret = echo_push(&whole_body,cfg,param).await;
}
if uri=="/searchtx"{
//let whole_body = collect_body(req,64).await?;
ret = echo_search(&whole_body,cfg).await;
}
ret
}
&Method::GET => {
if let Some(param) = match_uri(r"^?/?(?P<param>[^/]?+)?/info$",uri.as_str()) {
ret = echo_info(param,cfg).await;
}
ret
}
// Return the 404 Not Found for other routes.
_ => ret
}
}
fn empty() -> BoxBody<Bytes, hyper::Error> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}
fn parse_env(cfg: &Arc<Mutex<MyConfig>>){
let mut cfg_lock = cfg.lock().unwrap();
match env::var("BAL_SERVER_DB_FILE") {
Ok(value) => {
cfg_lock.db_file = value;},
Err(_) => {},
}
match env::var("BAL_SERVER_BIND_ADDRESS") {
Ok(value) => {
cfg_lock.bind_address = value;},
Err(_) => {},
}
match env::var("BAL_SERVER_BIND_PORT") {
Ok(value) => {
cfg_lock.bind_address = value;},
Err(_) => {},
}
cfg_lock = parse_env_netconfig(cfg_lock,"regtest");
cfg_lock = parse_env_netconfig(cfg_lock,"signet");
cfg_lock = parse_env_netconfig(cfg_lock,"testnet3");
cfg_lock = parse_env_netconfig(cfg_lock,"testnet");
drop(parse_env_netconfig(cfg_lock,"bitcoin"));
}
fn parse_env_netconfig<'a>(mut cfg_lock: MutexGuard<'a, MyConfig>, chain: &'a str) -> MutexGuard<'a, MyConfig>{
let cfg = match chain{
"regtest" => &mut cfg_lock.regtest,
"signet" => &mut cfg_lock.signet,
"testnet3" => &mut cfg_lock.testnet3,
"testnet" => &mut cfg_lock.testnet4,
&_ => &mut cfg_lock.mainnet,
};
match env::var(format!("BAL_SERVER_{}_ADDRESS",chain.to_uppercase())) {
Ok(value) => { cfg.address = value; },
Err(_) => {},
}
match env::var(format!("BAL_SERVER_{}_FIXE_FEE",chain.to_uppercase())) {
Ok(value) => {
match value.parse::<u64>(){
Ok(value) =>{ cfg.fixed_fee = value; },
Err(_) => {},
}
}
Err(_) => {},
}
cfg_lock
}
fn get_default_config()-> MyConfig {
let file = confy::get_configuration_file_path("bal-server",None).expect("Error while getting path");
info!("Default configuration file path is: {:#?}", file);
confy::load("bal-server",None).expect("cant_load")
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
env_logger::init();
let cfg: Arc<Mutex<MyConfig>> = match env::var("BAL_SERVER_CONFIG_FILE") {
Ok(value) => {
Arc::new(Mutex::new(
match confy::load_path(value.to_string()){
Ok(val) => {
info!("The configuration file path is: {:#?}", value);
val
},
Err(err) => {
error!("{}",err);
get_default_config()
}
}
))
},
Err(_) => {
Arc::new(Mutex::new(get_default_config()))
},
};
parse_env(&cfg);
let cfg_lock = cfg.lock().unwrap();
let db = sqlite::open(&cfg_lock.db_file).unwrap();
create_database(db);
let addr = cfg_lock.bind_address.to_string();
info!("bind address:{}",addr);
let addr: IpAddr = addr.parse()?;
let listener = TcpListener::bind((addr,cfg_lock.bind_port)).await?;
info!("Listening on http://{}:{}", addr,cfg_lock.bind_port);
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
tokio::task::spawn({
let cfg = cfg_lock.clone();
async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(|req: Request<hyper::body::Incoming>| async {
echo(req,&cfg).await
}))
.await
{
error!("Error serving connection: {:?}", err);
}
}
});
}
}

View File

@ -1,112 +0,0 @@
use sha2::{Digest, Sha256};
use bitcoin::bip32::Xpub;
use std::str::FromStr;
use bitcoin::bip32::DerivationPath;
use bitcoin::key::Secp256k1;
use bitcoin::Address;
use bitcoin::ScriptBuf;
use bitcoin::WPubkeyHash;
use bitcoin::Network;
use bitcoin::hashes::Hash;
// Mainnet (BIP44/BIP49/BIP84)
enum BS58Prefix{
Xpub,
//Ypub,
//Zpub,
//Tpub,
//Vpub,
//Upub
}
const XPUB_PREFIX:[u8; 4] = [0x04, 0x88, 0xB2, 0x1E]; // xpub (Legacy P2PKH)
//const YPUB_PREFIX:[u8; 4] = [0x04, 0x9D, 0x7C, 0xB2]; // ypub (Nested SegWit P2SH-P2WPKH)
//const ZPUB_PREFIX:[u8; 4] = [0x04, 0xB2, 0x47, 0x46]; // zpub (Native SegWit P2WPKH)
//const TPUB_PREFIX:[u8; 4] = [0x04, 0x35, 0x87, 0xCF]; // tpub (Testnet Legacy P2PKH)
//const VPUB_PREFIX:[u8; 4] = [0x04, 0x5F, 0x1C, 0xF6]; // vpub (Testnet Nested SegWit)
//const UPUB_PREFIX:[u8; 4] = [0x04, 0x4A, 0x52, 0x62]; // upub (RegTest Nested SegWit)
fn base58check_decode(s: &str) -> Result<Vec<u8>, String> {
let data = bs58::decode(s).into_vec().map_err(|e| e.to_string())?;
if data.len() < 4 {
return Err("Data troppo corta".to_string());
}
let (payload, checksum) = data.split_at(data.len() - 4);
let hash = Sha256::digest(Sha256::digest(payload));
if hash[0..4] != checksum[..] {
return Err("Checksum invalido".to_string());
}
Ok(payload.to_vec())
}
fn base58check_encode(data: &[u8]) -> String {
let checksum = &Sha256::digest(Sha256::digest(data))[0..4];
let full = [data, checksum].concat();
bs58::encode(full).into_string()
}
fn convert_to(zpub: &str,prefix: BS58Prefix) -> Result<String, String> {
let mut data = base58check_decode(zpub)?;
if data.len() < 4 {
return Err("Non è una zpub valida.".to_string());
}
data.splice(0..4, match prefix {
BS58Prefix::Xpub => XPUB_PREFIX,
//BS58Prefix::Ypub => YPUB_PREFIX,
//BS58Prefix::Zpub => ZPUB_PREFIX,
//BS58Prefix::Vpub => VPUB_PREFIX,
//BS58Prefix::Tpub => TPUB_PREFIX,
//BS58Prefix::Upub => UPUB_PREFIX,
});
Ok(base58check_encode(&data))
}
pub fn new_address_from_xpub(zpub: &str, index: i64,network: Network)-> Result<(String,String), Box<dyn std::error::Error>>{
let xpub = Xpub::from_str(&convert_to(zpub,BS58Prefix::Xpub)?)?;
let path = format!("m/0/{}",index);
let derivation_path = DerivationPath::from_str(path.as_str())?;
let secp = Secp256k1::new();
let derived_xpub = xpub.derive_pub(&secp, &derivation_path)?;
let public_key = derived_xpub.public_key;
let pubkey_bytes = public_key.serialize();
let witness_program = WPubkeyHash::hash(&pubkey_bytes);
let redeem_script = ScriptBuf::new_p2wpkh(&witness_program);
//let script_pubkey = ScriptBuf::new_p2sh(&redeem_script.script_hash());
let address = Address::from_script(&redeem_script, network)?;
//let address = Address::from_script(&script_pubkey, network)?;
Ok((address.to_string(),path.to_string()))
}
/*
fn main() -> Result<(), Box<dyn std::error::Error>>{
//let zpub = "xpub6C29v8gxCXREHUzoGNfqqFqZWxTVEmYtmZshuzfSwBKNmfYQxoizRziCkkUUA4WwJZkJs2i7nttRiC6MQG7mxZpouXeYkTZe3U52RyPAeo2";
//let zpub = "vpub5Ut36m34VebUUjdhYaxJCjSPqk3ZR8bA2MXLmbHRQCycAxy5Q1GFPJspLkJywJjBgQnvU3rmwPKTPp1ELLWeXrve3zBufpZR4MRCCTNHzsn";
let zpub = "zpub6qdfveGrxBQN3z8paZ88EHpCn5MGXpUoHwQmHhPbj4rPQtUjbWyCHrJFYZGVY7MsmVbDaeu4JYqRqcdLzMx78wZFEWbLrF9FG3gr2MPQC5H";
match convert_to(zpub,BS58Prefix::Xpub) {
Ok(xpub) => println!("XPUB: {}", xpub),
Err(e) => eprintln!("Errore: {}", e),
}
let xpub = Xpub::from_str(&convert_to(zpub,BS58Prefix::Xpub)?)?;
let derivation_path = DerivationPath::from_str("m/0/0")?;
let secp = Secp256k1::new();
let derived_xpub = xpub.derive_pub(&secp, &derivation_path)?;
let public_key = derived_xpub.public_key;
let pubkey_bytes = public_key.serialize();
let witness_program = WPubkeyHash::hash(&pubkey_bytes);
let redeem_script = ScriptBuf::new_p2wpkh(&witness_program);
let script_pubkey = ScriptBuf::new_p2sh(&redeem_script.script_hash());
// Generate the Bitcoin SegWit (BIP49) address
let network = Network::Bitcoin;
let address = Address::from_script(&redeem_script, network)?;
let address = Address::from_script(&script_pubkey, network)?;
Ok(())
}
*/