3 Commits

Author SHA1 Message Date
66f34cb29f pusher fix 2025-07-16 03:47:28 -04:00
36edfcd073 fixed network db_field 2025-07-15 15:18:37 -04:00
9e89ae884e fixed db field network 2025-07-15 15:12:21 -04:00
3 changed files with 33 additions and 16 deletions

View File

@@ -11,7 +11,7 @@ use serde::Deserialize;
use std::env; use std::env;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::{ Write}; use std::io::{ Write};
use log::{info,debug,warn,error}; use log::{info,warn,error,trace,debug};
use zmq::{Context, Socket}; use zmq::{Context, Socket};
use std::str; use std::str;
use std::{thread, time::Duration}; use std::{thread, time::Duration};
@@ -73,7 +73,7 @@ fn get_network_params(cfg: &MyConfig,network:Network)-> &NetworkParams{
fn get_network_params_default(network:Network) -> NetworkParams{ fn get_network_params_default(network:Network) -> NetworkParams{
match network { match network {
Network::Testnet => NetworkParams{ Network::Testnet => NetworkParams{
host: "http://localhost".to_string(), host: "http://i27.0.0.1".to_string(),
port: 18332, port: 18332,
dir_path: "testnet3/".to_string(), dir_path: "testnet3/".to_string(),
db_field: "testnet".to_string(), db_field: "testnet".to_string(),
@@ -82,7 +82,7 @@ fn get_network_params_default(network:Network) -> NetworkParams{
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
Network::Signet => NetworkParams{ Network::Signet => NetworkParams{
host: "http://localhost".to_string(), host: "http://127.0.0.1".to_string(),
port: 18332, port: 18332,
dir_path: "signet/".to_string(), dir_path: "signet/".to_string(),
db_field: "signet".to_string(), db_field: "signet".to_string(),
@@ -91,7 +91,7 @@ fn get_network_params_default(network:Network) -> NetworkParams{
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
Network::Regtest => NetworkParams{ Network::Regtest => NetworkParams{
host: "http://localhost".to_string(), host: "http://127.0.0.1".to_string(),
port: 18443, port: 18443,
dir_path: "regtest/".to_string(), dir_path: "regtest/".to_string(),
db_field: "regtest".to_string(), db_field: "regtest".to_string(),
@@ -100,7 +100,7 @@ fn get_network_params_default(network:Network) -> NetworkParams{
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
_ => NetworkParams{ _ => NetworkParams{
host: "http://localhost".to_string(), host: "http://127.0.0.1".to_string(),
port: 8332, port: 8332,
dir_path: "".to_string(), dir_path: "".to_string(),
db_field: "bitcoin".to_string(), db_field: "bitcoin".to_string(),
@@ -117,7 +117,6 @@ fn get_cookie_filename(network: &NetworkParams) ->Result<String,Box<dyn StdError
}else{ }else{
match env::var_os("HOME") { match env::var_os("HOME") {
Some(home) => { Some(home) => {
info!("some home {}",home.to_str().unwrap());
match home.to_str(){ match home.to_str(){
Some(home_str) => { Some(home_str) => {
let cookie_file_path = format!("{}/.bitcoin/{}.cookie",home_str, network.dir_path); let cookie_file_path = format!("{}/.bitcoin/{}.cookie",home_str, network.dir_path);
@@ -148,9 +147,13 @@ fn get_client_from_cookie(url: &String,network: &NetworkParams)->Result<(Client,
match get_cookie_filename(network){ match get_cookie_filename(network){
Ok(cookie) => { Ok(cookie) => {
match Client::new(&url[..], Auth::CookieFile(cookie.into())) { match Client::new(&url[..], Auth::CookieFile(cookie.into())) {
Ok(client) => match client.get_blockchain_info(){ Ok(client) => {
match client.get_blockchain_info(){
Ok(bcinfo) => Ok((client,bcinfo)), Ok(bcinfo) => Ok((client,bcinfo)),
Err(err) => Err(err.into()) Err(err) => {
Err(err.into())
}
}
}, },
Err(err)=>Err(err.into()) Err(err)=>Err(err.into())
@@ -160,7 +163,7 @@ fn get_client_from_cookie(url: &String,network: &NetworkParams)->Result<(Client,
} }
} }
fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{ fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{
let url = format!("{}:{}",network.host,&network.port); let url = format!("{}:{}/",network.host,&network.port);
match get_client_from_username(&url,network){ match get_client_from_username(&url,network){
Ok(client) =>{Ok(client)}, Ok(client) =>{Ok(client)},
Err(_) =>{ Err(_) =>{
@@ -202,6 +205,11 @@ fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Err
info!("median time: {}",bcinfo.median_time); info!("median time: {}",bcinfo.median_time);
let average_time = bcinfo.median_time; let average_time = bcinfo.median_time;
let db = sqlite::open(&cfg.db_file).unwrap(); let db = sqlite::open(&cfg.db_file).unwrap();
trace!(":locktime_threshold: {}", LOCKTIME_THRESHOLD );
trace!(":bestblock_time: {}", average_time);
trace!(":bestblock_height: {}", bcinfo.blocks);
trace!(":network: {}", network_params.db_field.clone());
trace!(":status: {}", 0);
let query_tx = db.prepare("SELECT * FROM tbl_tx WHERE network = :network AND status = :status AND ( locktime < :bestblock_height OR locktime > :locktime_threshold AND locktime < :bestblock_time);").unwrap().into_iter(); let query_tx = db.prepare("SELECT * FROM tbl_tx WHERE network = :network AND status = :status AND ( locktime < :bestblock_height OR locktime > :locktime_threshold AND locktime < :bestblock_time);").unwrap().into_iter();
//let query_tx = db.prepare("SELECT * FROM tbl_tx where status = :status").unwrap().into_iter(); //let query_tx = db.prepare("SELECT * FROM tbl_tx where status = :status").unwrap().into_iter();
@@ -251,12 +259,16 @@ fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Err
} }
if pushed_txs.len() > 0 { if pushed_txs.len() > 0 {
let _ = db.execute(format!("UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",pushed_txs.join("','"))); let sql = format!("UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",pushed_txs.join("','"));
trace!("sqlok: {}",&sql);
let _ = db.execute(&sql);
} }
if invalid_txs.len() > 0 { if invalid_txs.len() > 0 {
for (txid,txerr) in &invalid_txs{ for (txid,txerr) in &invalid_txs{
//let _ = db.execute(format!("UPDATE tbl_tx SET status = 2 WHERE txid in ('{}'Yp);",invalid_txs.join("','"))); //let _ = db.execute(format!("UPDATE tbl_tx SET status = 2 WHERE txid in ('{}'Yp);",invalid_txs.join("','")));
let _ = db.execute(format!("UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'")); let sql = format!("UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'");
trace!("sqlerror: {}",&sql);
let _ = db.execute(&sql);
} }
} }
} }
@@ -381,7 +393,7 @@ fn main(){
}; };
debug!("Network: {}",arg_network); info!("Network: {}",arg_network);
let network_params = get_network_params(&cfg,network); let network_params = get_network_params(&cfg,network);
@@ -389,6 +401,7 @@ fn main(){
let socket: Socket = context.socket(zmq::SUB).unwrap(); let socket: Socket = context.socket(zmq::SUB).unwrap();
let zmq_address = cfg.zmq_listener.clone(); let zmq_address = cfg.zmq_listener.clone();
info!("zmq listening on: {}",zmq_address);
socket.connect(&zmq_address).unwrap(); socket.connect(&zmq_address).unwrap();
socket.set_subscribe(b"").unwrap(); socket.set_subscribe(b"").unwrap();
@@ -411,8 +424,10 @@ fn main(){
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer"); let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
sequence_str = sequence.to_string(); sequence_str = sequence.to_string();
}*/ }*/
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
trace!("ZMQ:GET BODY: {}", hex::encode(&body));
if topic == b"hashblock" { if topic == b"hashblock" {
info!("NEW BLOCK{}", hex::encode(body)); info!("NEW BLOCK{}", hex::encode(&body));
//let cfg = cfg.clone(); //let cfg = cfg.clone();
let _ = main_result(&cfg,network_params); let _ = main_result(&cfg,network_params);
} }

View File

@@ -369,7 +369,7 @@ async fn echo_push(whole_body: &Bytes,
ptx.push((linenum+3,Value::String(line.to_string()))); ptx.push((linenum+3,Value::String(line.to_string())));
ptx.push((linenum+4,Value::String(locktime.to_string()))); ptx.push((linenum+4,Value::String(locktime.to_string())));
ptx.push((linenum+5,Value::String(req_time.to_string()))); ptx.push((linenum+5,Value::String(req_time.to_string())));
ptx.push((linenum+6,Value::String(param.to_string()))); ptx.push((linenum+6,Value::String(netconfig.name.to_string())));
ptx.push((linenum+7,Value::String(our_address.to_string()))); ptx.push((linenum+7,Value::String(our_address.to_string())));
ptx.push((linenum+8,Value::String(our_fees.to_string()))); ptx.push((linenum+8,Value::String(our_fees.to_string())));
linenum += 9; linenum += 9;

View File

@@ -17,6 +17,8 @@ pub fn create_database(db: &Connection){
let _ = db.execute("CREATE UNIQUE INDEX idx_xpub ON tbl_xpub (network, xpub)"); let _ = db.execute("CREATE UNIQUE INDEX idx_xpub ON tbl_xpub (network, xpub)");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);"); let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);");
let _ = db.execute("UPDATE tbl_tx set network='bitcoin' where network='mainnet');");
} }
/* /*