""" bal.core.willexecutors ======================= Client logic for talking to *will-executor* servers. A will-executor is an optional third-party service that, for a small fee, stores the signed inheritance transactions off-line and broadcasts them once their locktime expires (acting as a dead-man's switch backup). This module only contains the networking / data-shaping logic (downloading the server list, pinging servers for their fee and address, pushing transactions, checking whether a tx is already stored). It is GUI-free: all user interaction is handled by the Qt layer. """ import json import time from datetime import datetime from aiohttp import ClientResponse from electrum.i18n import _ from electrum.logging import get_logger from electrum.network import Network from .plugin_base import BalPlugin # Per-request timeout (seconds) for interactive operations (ping / info / # list download). These fail fast (no retries) so a dead server does not # block the UI. DEFAULT_TIMEOUT = 5 # Broadcast (pushtxs) timeouts. Broadcasting a will is important, so we keep a # couple of quick retries to survive a transient hiccup -- but far from the old # 10s x 10 retries + 30s sleeps (~140s) that froze the wizard on a dead server. # Worst case per server is now ~ PUSH_TIMEOUT * (1 + PUSH_MAX_RETRIES) # + PUSH_RETRY_SLEEP * PUSH_MAX_RETRIES = 8 * 3 + 1 * 2 = ~26s, and the wizard # also enforces a global deadline on top of this (see push_transactions_parallel). PUSH_TIMEOUT = 8 PUSH_MAX_RETRIES = 2 PUSH_RETRY_SLEEP = 1 # Global wall-clock deadline (seconds) for the whole parallel broadcast. Once # it elapses we stop waiting for the still-pending servers, mark them as # "Timeout" and let the wizard proceed instead of appearing stuck. PUSH_GLOBAL_DEADLINE = 30 # Check (searchtx) timeouts. Used when the user presses "Check" to verify that # each will-executor still holds the transaction. Like the broadcast path, the # old defaults (10s x 10 retries + 30s sleeps ~= 140s per server) froze the # "checking transaction" dialog on a single dead server. Fail fast with one # quick retry, and cap the whole batch with a global deadline. CHECK_TIMEOUT = 8 CHECK_MAX_RETRIES = 1 CHECK_RETRY_SLEEP = 1 CHECK_GLOBAL_DEADLINE = 30 _logger = get_logger(__name__) chainname = BalPlugin.chainname class Willexecutors: # Expose the networking constants as class attributes so the GUI layer can # reference them (e.g. to show the "Xs / DEADLINEs" countdown) without # importing module-level names. Single source of truth: the module # constants defined above. DEFAULT_TIMEOUT = DEFAULT_TIMEOUT PUSH_TIMEOUT = PUSH_TIMEOUT PUSH_MAX_RETRIES = PUSH_MAX_RETRIES PUSH_RETRY_SLEEP = PUSH_RETRY_SLEEP PUSH_GLOBAL_DEADLINE = PUSH_GLOBAL_DEADLINE CHECK_TIMEOUT = CHECK_TIMEOUT CHECK_MAX_RETRIES = CHECK_MAX_RETRIES CHECK_RETRY_SLEEP = CHECK_RETRY_SLEEP CHECK_GLOBAL_DEADLINE = CHECK_GLOBAL_DEADLINE @staticmethod def save(bal_plugin, willexecutors): _logger.debug(f"save {willexecutors},{chainname}") aw = bal_plugin.WILLEXECUTORS.get() aw[chainname] = willexecutors bal_plugin.WILLEXECUTORS.set(aw) _logger.debug(f"saved: {aw}") # bal_plugin.WILLEXECUTORS.set(willexecutors) @staticmethod def get_willexecutors( bal_plugin, update=False, bal_window=False, force=False, task=True ): willexecutors = bal_plugin.WILLEXECUTORS.get() willexecutors = willexecutors.get(chainname, {}) to_del = [] for w in willexecutors: if not isinstance(willexecutors[w], dict): to_del.append(w) continue Willexecutors.initialize_willexecutor(willexecutors[w], w) for w in to_del: _logger.error( "error Willexecutor to delete type:{} {}".format( type(willexecutors[w]), w ) ) del willexecutors[w] bal = bal_plugin.WILLEXECUTORS.default.get(chainname, {}) for bal_url, bal_executor in bal.items(): if bal_url not in willexecutors: _logger.debug(f"force add {bal_url} willexecutor") willexecutors[bal_url] = bal_executor # if update: # found = False # for url, we in willexecutors.items(): # if Willexecutors.is_selected(we): # found = True # if found or force: # if bal_plugin.PING_WILLEXECUTORS.get() or force: # ping_willexecutors = True # if bal_plugin.ASK_PING_WILLEXECUTORS.get() and not force: # if bal_window: # ping_willexecutors = bal_window.window.question( # _( # "Contact willexecutors servers to update payment informations?" # ) # ) # if ping_willexecutors: # if task: # bal_window.ping_willexecutors(willexecutors, task) # else: # bal_window.ping_willexecutors_task(willexecutors) w_sorted = dict( sorted( willexecutors.items(), key=lambda w: w[1].get("sort", 0), reverse=True ) ) return w_sorted @staticmethod def is_selected(willexecutor, value=None): if not willexecutor: return False if value is not None: willexecutor["selected"] = value try: return willexecutor["selected"] except Exception: willexecutor["selected"] = False return False @staticmethod def get_willexecutor_transactions(will, force=False): willexecutors = {} for wid, willitem in will.items(): if willitem.get_status("VALID"): if willitem.get_status("COMPLETE"): if not willitem.get_status("PUSHED") or force: if willexecutor := willitem.we: url = willexecutor["url"] if willexecutor and Willexecutors.is_selected(willexecutor): if url not in willexecutors: willexecutor["txs"] = "" willexecutor["txsids"] = [] willexecutor["broadcast_status"] = _("Waiting...") willexecutors[url] = willexecutor willexecutors[url]["txs"] += str(willitem.tx) + "\n" willexecutors[url]["txsids"].append(wid) return willexecutors # def only_selected_list(willexecutors): # out = {} # for url, v in willexecutors.items(): # if Willexecutors.is_selected(url): # out[url] = v # def push_transactions_to_willexecutors(will): # willexecutors = Willexecutors.get_transactions_to_be_pushed() # for url in willexecutors: # willexecutor = willexecutors[url] # if Willexecutors.is_selected(willexecutor): # if "txs" in willexecutor: # Willexecutors.push_transactions_to_willexecutor( # willexecutors[url]["txs"], url # ) @staticmethod def send_request( method, url, data=None, *, timeout=10, handle_response=None, count_reply=0, max_retries=10, retry_sleep=3, ): """Send an HTTP request to a will-executor server. ``max_retries`` / ``retry_sleep`` control the timeout-retry behaviour: * For *critical* operations (pushing inheritance transactions) the historical default of up to 10 retries with a 3s back-off is kept, so a transient network hiccup does not lose a transaction. * For *interactive* operations (ping / info / list download) callers should pass ``max_retries=0`` so a dead server fails fast (one short timeout) instead of blocking the UI for minutes. See :meth:`ping_servers_parallel`. """ network = Network.get_instance() if not network: raise Exception("You are offline.") _logger.debug(f"<-- {method} {url} {data}") headers = {} headers["user-agent"] = f"BalPlugin v:{BalPlugin.__version__}" headers["Content-Type"] = "text/plain" if not handle_response: handle_response = Willexecutors.handle_response try: if method == "get": response = Network.send_http_on_proxy( method, url, params=data, headers=headers, on_finish=handle_response, timeout=timeout, ) elif method == "post": response = Network.send_http_on_proxy( method, url, body=data, headers=headers, on_finish=handle_response, timeout=timeout, ) else: raise Exception(f"unexpected {method=!r}") except TimeoutError: if count_reply < max_retries: _logger.debug( f"timeout({count_reply}) error: retry in {retry_sleep} sec..." ) if retry_sleep: time.sleep(retry_sleep) return Willexecutors.send_request( method, url, data, timeout=timeout, handle_response=handle_response, count_reply=count_reply + 1, max_retries=max_retries, retry_sleep=retry_sleep, ) else: _logger.debug(f"Too many timeouts: {count_reply}") except Exception as e: raise e else: _logger.debug(f"--> {response}") return response @staticmethod def get_we_url_from_response(resp): url_slices = str(resp.url).split("/") if len(url_slices) > 2: url_slices = url_slices[:-2] return "/".join(url_slices) @staticmethod async def handle_response(resp: ClientResponse): r = await resp.text() try: r = json.loads(r) # url = Willexecutors.get_we_url_from_response(resp) # r["url"]= url # r["status"]=resp.status except Exception as e: _logger.debug(f"error handling response:{e}") pass return r @staticmethod class AlreadyPresentException(Exception): pass @staticmethod def push_transactions_to_willexecutor( willexecutor, *, timeout=PUSH_TIMEOUT, max_retries=PUSH_MAX_RETRIES, retry_sleep=PUSH_RETRY_SLEEP, ): # ``timeout`` / ``max_retries`` / ``retry_sleep`` are forwarded to # send_request so the broadcast fails fast on a dead/slow server instead # of hanging for ~140s (the old default was 10s timeout x 10 retries + # 30s of sleeps). A small number of quick retries still protects # against a transient hiccup without freezing the wizard. out = True try: _logger.debug(f"{willexecutor['url']}: {willexecutor['txs']}") if w := Willexecutors.send_request( "post", willexecutor["url"] + "/" + chainname + "/pushtxs", data=willexecutor["txs"].encode("ascii"), timeout=timeout, max_retries=max_retries, retry_sleep=retry_sleep, ): willexecutor["broadcast_status"] = _("Success") _logger.debug(f"pushed: {w}") if w != "thx": _logger.debug(f"error: {w}") raise Exception(w) else: raise Exception("empty reply from:{willexecutor['url']}") except Exception as e: _logger.debug(f"error:{e}") if str(e) == "already present": raise Willexecutors.AlreadyPresentException() out = False willexecutor["broadcast_status"] = _("Failed") return out @staticmethod def ping_servers(willexecutors): for url, we in willexecutors.items(): Willexecutors.get_info_task(url, we) @staticmethod def get_info_task(url, willexecutor, *, timeout=DEFAULT_TIMEOUT, max_retries=0, retry_sleep=0): w = None try: _logger.info("GETINFO_WILLEXECUTOR") _logger.debug(url) # Fast-fail by default (max_retries=0): a dead server returns after a # single short timeout instead of retrying 10x with sleeps, which # used to freeze the UI for minutes per unreachable server. w = Willexecutors.send_request( "get", url + "/" + chainname + "/info", timeout=timeout, max_retries=max_retries, retry_sleep=retry_sleep, ) if isinstance(w, dict): willexecutor["url"] = url willexecutor["status"] = 200 willexecutor["base_fee"] = w["base_fee"] willexecutor["address"] = w["address"] willexecutor["info"] = w["info"] else: # No dict reply (timeout / empty) -> mark as unreachable. willexecutor["status"] = "KO" _logger.debug(f"response_data {w}") except Exception as e: _logger.error(f"error {e} contacting {url}: {w}") willexecutor["status"] = "KO" willexecutor["last_update"] = datetime.now().timestamp() return willexecutor @staticmethod def ping_servers_parallel(willexecutors, *, on_each=None, max_workers=8, timeout=DEFAULT_TIMEOUT, on_tick=None, tick_interval=1.0): """Ping every will-executor concurrently and report results as they arrive. Network requests run in a thread pool: each ``send_http_on_proxy`` call schedules its coroutine on Electrum's shared asyncio loop and blocks only its *own* worker thread, so the total wall-clock time is roughly that of the slowest server rather than the *sum* of all of them. A single dead server can no longer stall the whole batch. Args: willexecutors: ``{url: we_dict}`` mapping (mutated in place with the ping result, exactly like the old sequential ``ping_servers``). on_each: optional ``callback(url, we_dict, ok: bool)`` invoked from a worker thread each time a server answers (or fails), so the GUI can update its list live. Must be thread-safe / marshalled to the GUI thread by the caller. max_workers: maximum number of concurrent pings. timeout: per-request timeout in seconds (fast-fail, no retries). on_tick: optional ``callback()`` invoked periodically (every ``tick_interval`` seconds) **from the calling thread** while waiting for servers, so a Qt caller can refresh an elapsed-time counter from the same thread that drives ``on_each``. Returns: The same ``willexecutors`` mapping, updated in place. """ from concurrent.futures import ThreadPoolExecutor, wait from concurrent.futures import FIRST_COMPLETED items = list(willexecutors.items()) if not items: return willexecutors def _ping_one(url, we): we = Willexecutors.get_info_task( url, we, timeout=timeout, max_retries=0, retry_sleep=0 ) ok = we.get("status") == 200 return url, we, ok def _fire_tick(): if on_tick is not None: try: on_tick() except Exception as cb_err: _logger.error(f"ping on_tick callback error: {cb_err}") workers = max(1, min(max_workers, len(items))) # Manual pool (no ``with``) so we can poll futures in short slices and # drive ``on_tick`` from THIS thread between waits (reliable Qt repaint). pool = ThreadPoolExecutor(max_workers=workers, thread_name_prefix="bal-ping") futures = {pool.submit(_ping_one, url, we) for url, we in items} try: pending = set(futures) while pending: done, pending = wait( pending, timeout=tick_interval, return_when=FIRST_COMPLETED ) for fut in done: try: url, we, ok = fut.result() except Exception as e: # defensive: one server never crashes all _logger.error(f"ping_servers_parallel worker error: {e}") continue willexecutors[url] = we if on_each is not None: try: on_each(url, we, ok) except Exception as cb_err: _logger.error(f"ping on_each callback error: {cb_err}") # Drive the elapsed-time counter from the calling thread. _fire_tick() finally: try: pool.shutdown(wait=False, cancel_futures=True) except TypeError: pool.shutdown(wait=False) return willexecutors @staticmethod def push_transactions_parallel(willexecutors, *, on_each=None, max_workers=8, deadline=PUSH_GLOBAL_DEADLINE, on_timeout=None, on_tick=None, tick_interval=1.0): """Push transactions to multiple will-executors concurrently. Like :meth:`ping_servers_parallel` but for the ``pushtxs`` operation. Each server keeps a short retry behaviour (:meth:`push_transactions_to_willexecutor`) so a real transaction is not lost to a transient hiccup, but servers are contacted in parallel and results are reported via ``on_each(url, we_dict, ok, exc)`` as they complete. A global wall-clock ``deadline`` (seconds) caps the whole operation: if some servers are still pending when it elapses, we stop waiting, mark them via ``on_timeout(url, we_dict)`` and return, so the caller (the wizard) is never stuck behind one unresponsive server. Pass ``deadline=None`` to wait indefinitely (old behaviour). ``on_tick()`` is invoked periodically (every ``tick_interval`` seconds) **from the calling thread** while waiting for workers. This lets a Qt caller refresh an elapsed-time counter from the same thread that drives ``on_each`` (so its pyqtSignal repaints reliably), instead of relying on a separate heartbeat thread whose signal emissions are not marshalled. Returns ``{url: (ok, exception_or_None)}`` for the servers that answered in time (timed-out servers are reported via ``on_timeout``). """ from concurrent.futures import ThreadPoolExecutor, wait from concurrent.futures import FIRST_COMPLETED targets = [(url, we) for url, we in willexecutors.items() if "txs" in we] results = {} if not targets: return results def _push_one(url, we): try: ok = Willexecutors.push_transactions_to_willexecutor(we) return url, we, ok, None except Willexecutors.AlreadyPresentException as ape: return url, we, False, ape except Exception as e: return url, we, False, e def _fire_tick(): if on_tick is not None: try: on_tick() except Exception as cb_err: _logger.error(f"push on_tick callback error: {cb_err}") workers = max(1, min(max_workers, len(targets))) # NOTE: we do not use ``with ThreadPoolExecutor(...)`` here because its # __exit__ calls shutdown(wait=True), which would block on a hung worker # and defeat the whole point of the global deadline. We shut the pool # down without waiting once the deadline elapses; the daemon worker(s) # stuck on a dead socket will be torn down when their request finally # times out (PUSH_TIMEOUT), without holding up the wizard. pool = ThreadPoolExecutor(max_workers=workers, thread_name_prefix="bal-push") fut_to_url = {pool.submit(_push_one, url, we): (url, we) for url, we in targets} start = time.time() try: # Poll the futures in short slices so we can call ``on_tick`` from # THIS thread between waits. ``wait(..., timeout=tick_interval)`` # returns as soon as a future completes OR the slice elapses, # whichever comes first, so the counter advances ~once per second # while the parallel push runs. pending = set(fut_to_url.keys()) while pending: if deadline is not None and (time.time() - start) >= deadline: break slice_timeout = tick_interval if deadline is not None: remaining = deadline - (time.time() - start) slice_timeout = max(0.0, min(tick_interval, remaining)) done, pending = wait( pending, timeout=slice_timeout, return_when=FIRST_COMPLETED ) for fut in done: try: url, we, ok, exc = fut.result() except Exception as e: _logger.error( f"push_transactions_parallel worker error: {e}" ) continue results[url] = (ok, exc) if on_each is not None: try: on_each(url, we, ok, exc) except Exception as cb_err: _logger.error(f"push on_each callback error: {cb_err}") # Drive the elapsed-time counter from the calling thread. _fire_tick() # Any server still pending here hit the global deadline. if pending: elapsed = time.time() - start _logger.warning( f"push global deadline ({deadline}s) reached after " f"{elapsed:.1f}s; {len(pending)} server(s) " f"did not answer in time" ) for fut in pending: url, we = fut_to_url[fut] if url in results: continue if on_timeout is not None: try: on_timeout(url, we) except Exception as cb_err: _logger.error( f"push on_timeout callback error: {cb_err}" ) finally: # Do not block on still-running workers (Python 3.9+: cancel queued). try: pool.shutdown(wait=False, cancel_futures=True) except TypeError: pool.shutdown(wait=False) return results @staticmethod def check_transactions_parallel(items, *, on_each=None, max_workers=8, deadline=CHECK_GLOBAL_DEADLINE, on_timeout=None, on_tick=None, tick_interval=1.0): """Check (searchtx) several will-executors concurrently. Same design as :meth:`push_transactions_parallel`, but for the "Check" operation: it verifies that each will-executor still holds its transaction. ``items`` is an iterable of ``(wid, url)`` pairs (one per will-item that has a will-executor). Each server is contacted in parallel with a short fail-fast retry (:meth:`check_transaction`), results are reported via ``on_each(wid, url, result_or_None, exc)`` as they arrive, ``on_tick()`` is called periodically from the calling thread to refresh a counter, and a global ``deadline`` guarantees the dialog never freezes behind one unresponsive server (pending servers are reported via ``on_timeout(wid, url)``). Returns ``{wid: (result_or_None, exception_or_None)}`` for the servers that answered in time. """ from concurrent.futures import ThreadPoolExecutor, wait from concurrent.futures import FIRST_COMPLETED targets = [(wid, url) for wid, url in items if url] results = {} if not targets: return results def _check_one(wid, url): try: res = Willexecutors.check_transaction(wid, url) return wid, url, res, None except Exception as e: return wid, url, None, e def _fire_tick(): if on_tick is not None: try: on_tick() except Exception as cb_err: _logger.error(f"check on_tick callback error: {cb_err}") workers = max(1, min(max_workers, len(targets))) # Manual pool (no ``with``): we must not block on a hung worker when the # global deadline elapses (see push_transactions_parallel for details). pool = ThreadPoolExecutor(max_workers=workers, thread_name_prefix="bal-check") fut_to_target = {pool.submit(_check_one, wid, url): (wid, url) for wid, url in targets} start = time.time() try: pending = set(fut_to_target.keys()) while pending: if deadline is not None and (time.time() - start) >= deadline: break slice_timeout = tick_interval if deadline is not None: remaining = deadline - (time.time() - start) slice_timeout = max(0.0, min(tick_interval, remaining)) done, pending = wait( pending, timeout=slice_timeout, return_when=FIRST_COMPLETED ) for fut in done: try: wid, url, res, exc = fut.result() except Exception as e: _logger.error( f"check_transactions_parallel worker error: {e}" ) continue results[wid] = (res, exc) if on_each is not None: try: on_each(wid, url, res, exc) except Exception as cb_err: _logger.error(f"check on_each callback error: {cb_err}") # Drive the elapsed-time counter from the calling thread. _fire_tick() # Any server still pending here hit the global deadline. if pending: elapsed = time.time() - start _logger.warning( f"check global deadline ({deadline}s) reached after " f"{elapsed:.1f}s; {len(pending)} server(s) " f"did not answer in time" ) for fut in pending: wid, url = fut_to_target[fut] if wid in results: continue if on_timeout is not None: try: on_timeout(wid, url) except Exception as cb_err: _logger.error( f"check on_timeout callback error: {cb_err}" ) finally: try: pool.shutdown(wait=False, cancel_futures=True) except TypeError: pool.shutdown(wait=False) return results @staticmethod def initialize_willexecutor(willexecutor, url, status=None, old_willexecutor=None): old_willexecutor=old_willexecutor if old_willexecutor is not None else {} willexecutor["url"] = url if status is not None: willexecutor["status"] = status else: willexecutor["status"] = old_willexecutor.get("status",willexecutor.get("status","Ko")) willexecutor["selected"]=Willexecutors.is_selected(old_willexecutor) or willexecutor.get("selected",False) willexecutor["address"]=old_willexecutor.get("address",willexecutor.get("address","")) willexecutor["promo_code"]=old_willexecutor.get("promo_code",willexecutor.get("promo_code")) @staticmethod def download_list(old_willexecutors,welist_server): try: welist_server = welist_server if welist_server[-1] == '/' else welist_server+'/' willexecutors = Willexecutors.send_request( "get", f"{welist_server}data/{chainname}?page=0&limit=100", ) # del willexecutors["status"] for w in willexecutors: if w not in ("status", "url"): Willexecutors.initialize_willexecutor( willexecutors[w], w, None, old_willexecutors.get(w,None) ) # bal_plugin.WILLEXECUTORS.set(l) # bal_plugin.config.set_key(bal_plugin.WILLEXECUTORS,l,save=True) return willexecutors except Exception as e: _logger.error(f"Failed to download willexecutors list: {e}") return {} @staticmethod def get_willexecutors_list_from_json(): try: with open("willexecutors.json") as f: willexecutors = json.load(f) for w in willexecutors: willexecutor = willexecutors[w] Willexecutors.initialize_willexecutor(willexecutor, w, "New", False) # bal_plugin.WILLEXECUTORS.set(willexecutors) return willexecutors except Exception as e: _logger.error(f"error opening willexecutors json: {e}") return {} @staticmethod def check_transaction(txid, url, *, timeout=CHECK_TIMEOUT, max_retries=CHECK_MAX_RETRIES, retry_sleep=CHECK_RETRY_SLEEP): _logger.debug(f"{url}:{txid}") try: w = Willexecutors.send_request( "post", url + "/searchtx", data=txid.encode("ascii"), timeout=timeout, max_retries=max_retries, retry_sleep=retry_sleep, ) return w except Exception as e: _logger.error(f"error contacting {url} for checking txs {e}") raise e @staticmethod def compute_id(willexecutor): return "{}-{}".format(willexecutor.get("url"), willexecutor.get("chain")) #class WillExecutor: # def __init__( # self, # url, # base_fee, # chain, # info, # version, # status, # is_selected=False, # promo_code="", # ): # self.url = url # self.base_fee = base_fee # self.chain = chain # self.info = info # self.version = version # self.status = status # self.promo_code = promo_code # self.is_selected = is_selected # self.id = self.compute_id() # # def from_dict(d): # return WillExecutor( # url=d.get("url", "http://localhost:8000"), # base_fee=d.get("base_fee", 1000), # chain=d.get("chain", chainname), # info=d.get("info", ""), # version=d.get("version", 0), # status=d.get("status", "Ko"), # is_selected=d.get("is_selected", "False"), # promo_code=d.get("promo_code", ""), # ) # # def to_dict(self): # return { # "url": self.url, # "base_fee": self.base_fee, # "chain": self.chain, # "info": self.info, # "version": self.version, # "promo_code": self.promo_code, # } # # def compute_id(self): # return f"{self.url}-{self.chain}"