From dd075508b7255d001321fac8798139aaefd89ad3 Mon Sep 17 00:00:00 2001 From: bitcoinafterlife Date: Mon, 3 Nov 2025 08:46:13 -0400 Subject: [PATCH] fix bal-pusher zmq connection stability --- src/bin/bal-pusher.rs | 158 +----------------------------------------- 1 file changed, 2 insertions(+), 156 deletions(-) diff --git a/src/bin/bal-pusher.rs b/src/bin/bal-pusher.rs index e27c17c..303b2ed 100644 --- a/src/bin/bal-pusher.rs +++ b/src/bin/bal-pusher.rs @@ -332,7 +332,7 @@ async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> R })) .send().await?; let body = &(response.text().await?); - trace!("Body: {}", body); + info!("Report to welist({})\tSent: {}", welist_url,body); }else { debug!("Not sending stats"); } @@ -560,118 +560,6 @@ async fn main()-> std::io::Result<()>{ info!("Network: {}",arg_network); let network_params = get_network_params(&cfg,network); - - -/* - - let context = Context::new(); - let mut socket: Socket = context.socket(zmq::SUB).unwrap(); - - let zmq_address = cfg.zmq_listener.clone(); - info!("zmq listening on: {}", zmq_address); - socket.connect(&zmq_address).unwrap(); - socket.set_subscribe(b"").unwrap(); - - let _ = main_result(&cfg, network_params).await; - info!("waiting new blocks.."); - - let mut last_seq: Vec = [0;4].to_vec(); - let mut counter = 0; - let max = 100; - - // Initialize connection monitor - 10 second timeout, 3 consecutive timeouts before declaring lost - let mut connection_monitor = ConnectionMonitor::new(10, 3); - - loop { - counter += 1; - - // Check connection health every 100 iterations (about 10 seconds with your sleep) - if counter >= max { - match connection_monitor.check_connection() { - ConnectionStatus::Healthy => { - debug!("ZMQ connection healthy"); - } - ConnectionStatus::Warning(elapsed) => { - warn!("ZMQ connection warning - no messages for {:?}", elapsed); - // Try to send a ping or do a lightweight check - if let Err(e) = socket.send("", zmq::DONTWAIT) { - warn!("ZMQ send test failed: {}", e); - } - } - ConnectionStatus::Lost(elapsed) => { - error!("🚨 ZMQ connection LOST - no messages for {:?}", elapsed); - error!("Attempting to reconnect..."); - - // Reconnection logic - drop(socket); - let new_socket = context.socket(zmq::SUB).unwrap(); - socket = new_socket; - socket.connect(&zmq_address).unwrap(); - socket.set_subscribe(b"").unwrap(); - - connection_monitor.reset(); - info!("Reconnected to ZMQ endpoint"); - counter = 0; - continue; - } - } - counter = 0; - } - - // Use non-blocking receive with timeout - let mut items = [socket.as_poll_item(zmq::POLLIN)]; - match zmq::poll(&mut items, 1000) { // 1 second timeout - Ok(0) => { - // Timeout - no message received - debug!("No message received (timeout)"); - thread::sleep(Duration::from_millis(100)); - continue; - } - Ok(_) => { - if items[0].is_readable() { - match socket.recv_multipart(0) { - Ok(message) => { - // Update connection monitor - we got a message! - connection_monitor.update(); - - let topic = message[0].clone(); - let body = message[1].clone(); - let seq = message[2].clone(); - - if last_seq >= seq { - continue; - } - last_seq = seq; - - debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic")); - trace!("ZMQ:GET BODY: {}", hex::encode(&body)); - - if topic == b"hashblock" { - info!("NEW BLOCK: {}", hex::encode(&body)); - let _ = main_result(&cfg, network_params).await; - } - } - Err(e) => { - error!("Error receiving ZMQ message: {}", e); - thread::sleep(Duration::from_millis(1000)); // Longer sleep on error - } - } - } - } - Err(e) => { - error!("ZMQ poll error: {}", e); - thread::sleep(Duration::from_millis(1000)); - } - } - - thread::sleep(Duration::from_millis(100)); - } -} - -*/ - - - let context = Context::new(); let socket: Socket = context.socket(zmq::SUB).unwrap(); @@ -687,62 +575,20 @@ async fn main()-> std::io::Result<()>{ let mut counter=0; let max=100; loop { - counter+=1; - println!("hello"); - if counter >=max{ - println!("hello"); - check_zmq_connection(&zmq_address); - counter=0; - } - println!("viva la figa"); - let message = recv_multipart(&socket,0).unwrap(); - println!("e chi la castiga"); + let message = socket.recv_multipart(0).unwrap(); let topic = message[0].clone(); let body = message[1].clone(); let seq = message[2].clone(); - println!("last_seq({}) - seq({}) ",seq_to_str(&last_seq),seq_to_str(&seq)); - /*if last_seq == seq { - println!(">="); - continue - } - */ last_seq = seq; - //let mut sequence_str = "Unknown".to_string(); - /*if seq.len()==4{ - let mut rdr = Cursor::new(seq); - let sequence = rdr.read_u32::().expect("Failed to read integer"); - sequence_str = sequence.to_string(); - }*/ debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic")); trace!("ZMQ:GET BODY: {}", hex::encode(&body)); if topic == b"hashblock" { info!("NEW BLOCK: {}", hex::encode(&body)); - //let cfg = cfg.clone(); let _ = main_result(&cfg,network_params).await; } - println!("sleep"); thread::sleep(Duration::from_millis(100)); // Sleep for 100ms - println!("sleep done"); } } -pub fn recv_multipart(socket: &Socket, flags: i32) -> Result>,zmq::Error> { - let mut parts: Vec> = vec![]; - loop { - println!("hellorec"); - let part = socket.recv_bytes(flags)?; - println!("push"); - parts.push(part); - - println!("more parts"); - let more_parts = socket.get_rcvmore()?; - println!("more parts done"); - if !more_parts { - break; - } - } - Ok(parts) - } - fn seq_to_str(seq:&Vec) -> String{ if seq.len()==4{ let mut rdr = Cursor::new(seq);