fix bal-pusher zmq connection stability
This commit is contained in:
parent
4ac492ba79
commit
dd075508b7
@ -332,7 +332,7 @@ async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> R
|
|||||||
}))
|
}))
|
||||||
.send().await?;
|
.send().await?;
|
||||||
let body = &(response.text().await?);
|
let body = &(response.text().await?);
|
||||||
trace!("Body: {}", body);
|
info!("Report to welist({})\tSent: {}", welist_url,body);
|
||||||
}else {
|
}else {
|
||||||
debug!("Not sending stats");
|
debug!("Not sending stats");
|
||||||
}
|
}
|
||||||
@ -560,118 +560,6 @@ async fn main()-> std::io::Result<()>{
|
|||||||
info!("Network: {}",arg_network);
|
info!("Network: {}",arg_network);
|
||||||
let network_params = get_network_params(&cfg,network);
|
let network_params = get_network_params(&cfg,network);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
|
|
||||||
let context = Context::new();
|
|
||||||
let mut socket: Socket = context.socket(zmq::SUB).unwrap();
|
|
||||||
|
|
||||||
let zmq_address = cfg.zmq_listener.clone();
|
|
||||||
info!("zmq listening on: {}", zmq_address);
|
|
||||||
socket.connect(&zmq_address).unwrap();
|
|
||||||
socket.set_subscribe(b"").unwrap();
|
|
||||||
|
|
||||||
let _ = main_result(&cfg, network_params).await;
|
|
||||||
info!("waiting new blocks..");
|
|
||||||
|
|
||||||
let mut last_seq: Vec<u8> = [0;4].to_vec();
|
|
||||||
let mut counter = 0;
|
|
||||||
let max = 100;
|
|
||||||
|
|
||||||
// Initialize connection monitor - 10 second timeout, 3 consecutive timeouts before declaring lost
|
|
||||||
let mut connection_monitor = ConnectionMonitor::new(10, 3);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
counter += 1;
|
|
||||||
|
|
||||||
// Check connection health every 100 iterations (about 10 seconds with your sleep)
|
|
||||||
if counter >= max {
|
|
||||||
match connection_monitor.check_connection() {
|
|
||||||
ConnectionStatus::Healthy => {
|
|
||||||
debug!("ZMQ connection healthy");
|
|
||||||
}
|
|
||||||
ConnectionStatus::Warning(elapsed) => {
|
|
||||||
warn!("ZMQ connection warning - no messages for {:?}", elapsed);
|
|
||||||
// Try to send a ping or do a lightweight check
|
|
||||||
if let Err(e) = socket.send("", zmq::DONTWAIT) {
|
|
||||||
warn!("ZMQ send test failed: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ConnectionStatus::Lost(elapsed) => {
|
|
||||||
error!("🚨 ZMQ connection LOST - no messages for {:?}", elapsed);
|
|
||||||
error!("Attempting to reconnect...");
|
|
||||||
|
|
||||||
// Reconnection logic
|
|
||||||
drop(socket);
|
|
||||||
let new_socket = context.socket(zmq::SUB).unwrap();
|
|
||||||
socket = new_socket;
|
|
||||||
socket.connect(&zmq_address).unwrap();
|
|
||||||
socket.set_subscribe(b"").unwrap();
|
|
||||||
|
|
||||||
connection_monitor.reset();
|
|
||||||
info!("Reconnected to ZMQ endpoint");
|
|
||||||
counter = 0;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
counter = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use non-blocking receive with timeout
|
|
||||||
let mut items = [socket.as_poll_item(zmq::POLLIN)];
|
|
||||||
match zmq::poll(&mut items, 1000) { // 1 second timeout
|
|
||||||
Ok(0) => {
|
|
||||||
// Timeout - no message received
|
|
||||||
debug!("No message received (timeout)");
|
|
||||||
thread::sleep(Duration::from_millis(100));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Ok(_) => {
|
|
||||||
if items[0].is_readable() {
|
|
||||||
match socket.recv_multipart(0) {
|
|
||||||
Ok(message) => {
|
|
||||||
// Update connection monitor - we got a message!
|
|
||||||
connection_monitor.update();
|
|
||||||
|
|
||||||
let topic = message[0].clone();
|
|
||||||
let body = message[1].clone();
|
|
||||||
let seq = message[2].clone();
|
|
||||||
|
|
||||||
if last_seq >= seq {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
last_seq = seq;
|
|
||||||
|
|
||||||
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
|
|
||||||
trace!("ZMQ:GET BODY: {}", hex::encode(&body));
|
|
||||||
|
|
||||||
if topic == b"hashblock" {
|
|
||||||
info!("NEW BLOCK: {}", hex::encode(&body));
|
|
||||||
let _ = main_result(&cfg, network_params).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Error receiving ZMQ message: {}", e);
|
|
||||||
thread::sleep(Duration::from_millis(1000)); // Longer sleep on error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("ZMQ poll error: {}", e);
|
|
||||||
thread::sleep(Duration::from_millis(1000));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
thread::sleep(Duration::from_millis(100));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
let context = Context::new();
|
let context = Context::new();
|
||||||
let socket: Socket = context.socket(zmq::SUB).unwrap();
|
let socket: Socket = context.socket(zmq::SUB).unwrap();
|
||||||
|
|
||||||
@ -687,62 +575,20 @@ async fn main()-> std::io::Result<()>{
|
|||||||
let mut counter=0;
|
let mut counter=0;
|
||||||
let max=100;
|
let max=100;
|
||||||
loop {
|
loop {
|
||||||
counter+=1;
|
let message = socket.recv_multipart(0).unwrap();
|
||||||
println!("hello");
|
|
||||||
if counter >=max{
|
|
||||||
println!("hello");
|
|
||||||
check_zmq_connection(&zmq_address);
|
|
||||||
counter=0;
|
|
||||||
}
|
|
||||||
println!("viva la figa");
|
|
||||||
let message = recv_multipart(&socket,0).unwrap();
|
|
||||||
println!("e chi la castiga");
|
|
||||||
let topic = message[0].clone();
|
let topic = message[0].clone();
|
||||||
let body = message[1].clone();
|
let body = message[1].clone();
|
||||||
let seq = message[2].clone();
|
let seq = message[2].clone();
|
||||||
println!("last_seq({}) - seq({}) ",seq_to_str(&last_seq),seq_to_str(&seq));
|
|
||||||
/*if last_seq == seq {
|
|
||||||
println!(">=");
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
last_seq = seq;
|
last_seq = seq;
|
||||||
//let mut sequence_str = "Unknown".to_string();
|
|
||||||
/*if seq.len()==4{
|
|
||||||
let mut rdr = Cursor::new(seq);
|
|
||||||
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
|
|
||||||
sequence_str = sequence.to_string();
|
|
||||||
}*/
|
|
||||||
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
|
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
|
||||||
trace!("ZMQ:GET BODY: {}", hex::encode(&body));
|
trace!("ZMQ:GET BODY: {}", hex::encode(&body));
|
||||||
if topic == b"hashblock" {
|
if topic == b"hashblock" {
|
||||||
info!("NEW BLOCK: {}", hex::encode(&body));
|
info!("NEW BLOCK: {}", hex::encode(&body));
|
||||||
//let cfg = cfg.clone();
|
|
||||||
let _ = main_result(&cfg,network_params).await;
|
let _ = main_result(&cfg,network_params).await;
|
||||||
}
|
}
|
||||||
println!("sleep");
|
|
||||||
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
|
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
|
||||||
println!("sleep done");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn recv_multipart(socket: &Socket, flags: i32) -> Result<Vec<Vec<u8>>,zmq::Error> {
|
|
||||||
let mut parts: Vec<Vec<u8>> = vec![];
|
|
||||||
loop {
|
|
||||||
println!("hellorec");
|
|
||||||
let part = socket.recv_bytes(flags)?;
|
|
||||||
println!("push");
|
|
||||||
parts.push(part);
|
|
||||||
|
|
||||||
println!("more parts");
|
|
||||||
let more_parts = socket.get_rcvmore()?;
|
|
||||||
println!("more parts done");
|
|
||||||
if !more_parts {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(parts)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn seq_to_str(seq:&Vec<u8>) -> String{
|
fn seq_to_str(seq:&Vec<u8>) -> String{
|
||||||
if seq.len()==4{
|
if seq.len()==4{
|
||||||
let mut rdr = Cursor::new(seq);
|
let mut rdr = Cursor::new(seq);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user