"""Willexecutors module for BAL Electrum Plugin. This module provides functionality to manage will executor servers that broadcast timelocked transactions at the appropriate locktime. Classes: Willexecutors: Static class for managing executor list and communication WillExecutor: Data class representing a single will executor """ import json from datetime import datetime import time from aiohttp import ClientResponse from electrum import constants from electrum.i18n import _ from electrum.logging import get_logger from electrum.network import Network from .bal import BalPlugin DEFAULT_TIMEOUT = 5 _logger = get_logger(__name__) chainname = constants.net.NET_NAME if constants.net.NET_NAME != "mainnet" else "bitcoin" class Willexecutors: """Static class managing will executor servers. Provides methods to save/load configurations, communicate via HTTP, push transactions, and download executor lists from remote sources. """ def save(bal_plugin, willexecutors): """Save will executor configuration to plugin settings. Args: bal_plugin: BAL plugin instance willexecutors: Dictionary of executor configs keyed by URL """ _logger.debug(f"save {willexecutors},{chainname}") aw = bal_plugin.WILLEXECUTORS.get() aw[chainname] = willexecutors bal_plugin.WILLEXECUTORS.set(aw) _logger.debug(f"saved: {aw}") # bal_plugin.WILLEXECUTORS.set(willexecutors) def get_willexecutors( bal_plugin, update=False, bal_window=False, force=False, task=True ): """Retrieve and update the list of available will executors. Args: bal_plugin: BAL plugin instance update: If True, ping servers to refresh data bal_window: GUI window for user prompts force: Force update regardless of cached data age task: Run as background task if True Returns: dict: Sorted dictionary of executor configurations """ willexecutors = bal_plugin.WILLEXECUTORS.get() willexecutors = willexecutors.get(chainname, {}) to_del = [] for w in willexecutors: if not isinstance(willexecutors[w], dict): to_del.append(w) continue Willexecutors.initialize_willexecutor(willexecutors[w], w) for w in to_del: _logger.error( "error Willexecutor to delete type:{} {}".format( type(willexecutors[w]), w ) ) del willexecutors[w] bal = bal_plugin.WILLEXECUTORS.default.get(chainname, {}) for bal_url, bal_executor in bal.items(): if bal_url not in willexecutors: _logger.debug(f"force add {bal_url} willexecutor") willexecutors[bal_url] = bal_executor # if update: # found = False # for url, we in willexecutors.items(): # if Willexecutors.is_selected(we): # found = True # if found or force: # if bal_plugin.PING_WILLEXECUTORS.get() or force: # ping_willexecutors = True # if bal_plugin.ASK_PING_WILLEXECUTORS.get() and not force: # if bal_window: # ping_willexecutors = bal_window.window.question( # _( # "Contact willexecutors servers to update payment informations?" # ) # ) # if ping_willexecutors: # if task: # bal_window.ping_willexecutors(willexecutors, task) # else: # bal_window.ping_willexecutors_task(willexecutors) w_sorted = dict( sorted( willexecutors.items(), key=lambda w: w[1].get("sort", 0), reverse=True ) ) return w_sorted def is_selected(willexecutor, value=None): """Check or set the selection status of an executor. Args: willexecutor: Executor configuration dictionary value: Optional boolean to set selection status Returns: bool: Current selection status """ if not willexecutor: return False if value is not None: willexecutor["selected"] = value try: return willexecutor["selected"] except Exception: willexecutor["selected"] = False return False def get_willexecutor_transactions(will, force=False): """Collect transactions grouped by executor for valid, complete wills. Args: will: Dictionary of will items keyed by ID force: Include already-pushed transactions Returns: dict: Executors with their aggregated transactions """ willexecutors = {} for wid, willitem in will.items(): if willitem.get_status("VALID"): if willitem.get_status("COMPLETE"): if not willitem.get_status("PUSHED") or force: if willexecutor := willitem.we: url = willexecutor["url"] if willexecutor and Willexecutors.is_selected(willexecutor): if url not in willexecutors: willexecutor["txs"] = "" willexecutor["txsids"] = [] willexecutor["broadcast_status"] = _("Waiting...") willexecutors[url] = willexecutor willexecutors[url]["txs"] += str(willitem.tx) + "\n" willexecutors[url]["txsids"].append(wid) return willexecutors # def only_selected_list(willexecutors): # out = {} # for url, v in willexecutors.items(): # if Willexecutors.is_selected(url): # out[url] = v # def push_transactions_to_willexecutors(will): # willexecutors = Willexecutors.get_transactions_to_be_pushed() # for url in willexecutors: # willexecutor = willexecutors[url] # if Willexecutors.is_selected(willexecutor): # if "txs" in willexecutor: # Willexecutors.push_transactions_to_willexecutor( # willexecutors[url]["txs"], url # ) def send_request( method, url, data=None, *, timeout=10, handle_response=None, count_reply=0 ): """Send HTTP request to a will executor server. Args: method: HTTP method (get/post) url: Target server URL data: Request payload timeout: Timeout in seconds handle_response: Response processing function count_reply: Retry counter for timeouts Returns: Server response object Raises: Exception: On connection errors or invalid method """ network = Network.get_instance() if not network: raise Exception("You are offline.") _logger.debug(f"<-- {method} {url} {data}") headers = {} headers["user-agent"] = f"BalPlugin v:{BalPlugin.__version__}" headers["Content-Type"] = "text/plain" if not handle_response: handle_response = Willexecutors.handle_response try: if method == "get": response = Network.send_http_on_proxy( method, url, params=data, headers=headers, on_finish=handle_response, timeout=timeout, ) elif method == "post": response = Network.send_http_on_proxy( method, url, body=data, headers=headers, on_finish=handle_response, timeout=timeout, ) else: raise Exception(f"unexpected {method=!r}") except TimeoutError: if count_reply < 10: _logger.debug(f"timeout({count_reply}) error: retry in 3 sec...") time.sleep(3) return Willexecutors.send_request( method, url, data, timeout=timeout, handle_response=handle_response, count_reply=count_reply + 1, ) else: _logger.debug(f"Too many timeouts: {count_reply}") except Exception as e: raise e else: _logger.debug(f"--> {response}") return response def get_we_url_from_response(resp): """Extract base URL from response object. Args: resp: Response object with url attribute Returns: str: Base URL without trailing paths """ url_slices = str(resp.url).split("/") if len(url_slices) > 2: url_slices = url_slices[:-2] return "/".join(url_slices) async def handle_response(resp: ClientResponse): """Parse JSON response from executor server. Args: resp: aiohttp ClientResponse object Returns: Parsed JSON data or raw text on parse failure """ r = await resp.text() try: r = json.loads(r) # url = Willexecutors.get_we_url_from_response(resp) # r["url"]= url # r["status"]=resp.status except Exception as e: _logger.debug(f"error handling response:{e}") pass return r class AlreadyPresentException(Exception): """Raised when transactions already exist on executor server.""" pass def push_transactions_to_willexecutor(willexecutor): """Push timelocked transactions to an executor server for broadcast. Args: willexecutor: Dict containing url and txs keys Returns: bool: True on success, False on failure Raises: AlreadyPresentException: If transactions already exist """ out = True try: _logger.debug(f"{willexecutor['url']}: {willexecutor['txs']}") if w := Willexecutors.send_request( "post", willexecutor["url"] + "/" + chainname + "/pushtxs", data=willexecutor["txs"].encode("ascii"), ): willexecutor["broadcast_status"] = _("Success") _logger.debug(f"pushed: {w}") if w != "thx": _logger.debug(f"error: {w}") raise Exception(w) else: raise Exception("empty reply from:{willexecutor['url']}") except Exception as e: _logger.debug(f"error:{e}") if str(e) == "already present": raise Willexecutors.AlreadyPresentException() out = False willexecutor["broadcast_status"] = _("Failed") return out def ping_servers(willexecutors): """Ping all executor servers to update their information. Args: willexecutors: Dictionary of executor configurations """ for url, we in willexecutors.items(): Willexecutors.get_info_task(url, we) def get_info_task(url, willexecutor): """Fetch current information from a single executor server. Args: url: Executor server URL willexecutor: Configuration dict to update Returns: Updated willexecutor dict with status, base_fee, address """ w = None try: _logger.info("GETINFO_WILLEXECUTOR") _logger.debug(url) w = Willexecutors.send_request("get", url + "/" + chainname + "/info") if isinstance(w, dict): willexecutor["url"] = url willexecutor["status"] = 200 willexecutor["base_fee"] = w["base_fee"] willexecutor["address"] = w["address"] willexecutor["info"] = w["info"] _logger.debug(f"response_data {w}") except Exception as e: _logger.error(f"error {e} contacting {url}: {w}") willexecutor["status"] = "KO" willexecutor["last_update"] = datetime.now().timestamp() return willexecutor def initialize_willexecutor(willexecutor, url, status=None, old_willexecutor={}): """Initialize or merge executor configuration preserving user settings. Args: willexecutor: New executor configuration dict url: Executor server URL status: Optional status override old_willexecutor: Previous config to preserve user preferences """ willexecutor["url"] = url if status is not None: willexecutor["status"] = status else: willexecutor["status"] = old_willexecutor.get("status",willexecutor.get("status","Ko")) willexecutor["selected"]=Willexecutors.is_selected(old_willexecutor) or willexecutor.get("selected",False) willexecutor["address"]=old_willexecutor.get("address",willexecutor.get("address","")) willexecutor["promo_code"]=old_willexecutor.get("promo_code",willexecutor.get("promo_code")) def download_list(old_willexecutors): """Download latest executor list from remote source. Args: old_willexecutors: Existing configs to merge with new list Returns: dict: Merged executor configurations """ try: willexecutors = Willexecutors.send_request( "get", f"https://welist.bitcoin-after.life/data/{chainname}?page=0&limit=100", ) # del willexecutors["status"] for w in willexecutors: if w not in ("status", "url"): Willexecutors.initialize_willexecutor( willexecutors[w], w, None, old_willexecutors.get(w,{}) ) # bal_plugin.WILLEXECUTORS.set(l) # bal_plugin.config.set_key(bal_plugin.WILLEXECUTORS,l,save=True) return willexecutors except Exception as e: _logger.error(f"Failed to download willexecutors list: {e}") return {} def get_willexecutors_list_from_json(): """Load executor list from local willexecutors.json file. Returns: dict: Executor configurations from JSON file """ try: with open("willexecutors.json") as f: willexecutors = json.load(f) for w in willexecutors: willexecutor = willexecutors[w] Willexecutors.initialize_willexecutor(willexecutor, w, "New", False) # bal_plugin.WILLEXECUTORS.set(willexecutors) return willexecutors except Exception as e: _logger.error(f"error opening willexecutors json: {e}") return {} def check_transaction(txid, url): """Check if a transaction exists on executor server. Args: txid: Transaction ID string url: Executor server URL Returns: Server response about transaction existence """ _logger.debug(f"{url}:{txid}") try: w = Willexecutors.send_request( "post", url + "/searchtx", data=txid.encode("ascii") ) return w except Exception as e: _logger.error(f"error contacting {url} for checking txs {e}") raise e def compute_id(willexecutor): """Compute unique identifier for an executor. Args: willexecutor: Executor configuration dict Returns: str: Unique ID combining URL and chain name """ return "{}-{}".format(willexecutor.get("url"), willexecutor.get("chain")) class WillExecutor: """Data class representing a single will executor server. Attributes: url: Executor server URL base_fee: Fixed fee in satoshis chain: Bitcoin chain name info: Additional executor information version: Plugin version compatibility status: Connection status is_selected: User selection flag promo_code: Promotional discount code """ def __init__( self, url, base_fee, chain, info, version, status, is_selected=False, promo_code="", ): """Initialize a new WillExecutor instance. Args: url: Executor server URL base_fee: Fixed fee in satoshis chain: Bitcoin chain name info: Additional executor information version: Plugin version compatibility status: Connection status (OK/Ko) is_selected: Whether user has selected this executor promo_code: Promotional discount code """ self.url = url self.base_fee = base_fee self.chain = chain self.info = info self.version = version self.status = status self.promo_code = promo_code self.is_selected = is_selected self.id = self.compute_id() def from_dict(d): """Create WillExecutor instance from a dictionary. Args: d: Dictionary containing executor data fields Returns: WillExecutor: New instance with defaults for missing fields """ return WillExecutor( url=d.get("url", "http://localhost:8000"), base_fee=d.get("base_fee", 1000), chain=d.get("chain", chainname), info=d.get("info", ""), version=d.get("version", 0), status=d.get("status", "Ko"), is_selected=d.get("is_selected", "False"), promo_code=d.get("promo_code", ""), ) def to_dict(self): """Convert WillExecutor to dictionary representation. Returns: dict: Serializable representation excluding computed fields """ return { "url": self.url, "base_fee": self.base_fee, "chain": self.chain, "info": self.info, "version": self.version, "promo_code": self.promo_code, } def compute_id(self): """Generate unique identifier for this executor. Returns: str: Unique ID from URL and chain name """ return f"{self.url}-{self.chain}"