Files
bal-electrum-plugin/willexecutors.py

548 lines
19 KiB
Python

"""Willexecutors module for BAL Electrum Plugin.
This module provides functionality to manage will executor servers that
broadcast timelocked transactions at the appropriate locktime.
Classes:
Willexecutors: Static class for managing executor list and communication
WillExecutor: Data class representing a single will executor
"""
import json
from datetime import datetime
import time
from aiohttp import ClientResponse
from electrum import constants
from electrum.i18n import _
from electrum.logging import get_logger
from electrum.network import Network
from .bal import BalPlugin
DEFAULT_TIMEOUT = 5
_logger = get_logger(__name__)
chainname = constants.net.NET_NAME if constants.net.NET_NAME != "mainnet" else "bitcoin"
class Willexecutors:
"""Static class managing will executor servers.
Provides methods to save/load configurations, communicate via HTTP,
push transactions, and download executor lists from remote sources.
"""
def save(bal_plugin, willexecutors):
"""Save will executor configuration to plugin settings.
Args:
bal_plugin: BAL plugin instance
willexecutors: Dictionary of executor configs keyed by URL
"""
_logger.debug(f"save {willexecutors},{chainname}")
aw = bal_plugin.WILLEXECUTORS.get()
aw[chainname] = willexecutors
bal_plugin.WILLEXECUTORS.set(aw)
_logger.debug(f"saved: {aw}")
# bal_plugin.WILLEXECUTORS.set(willexecutors)
def get_willexecutors(
bal_plugin, update=False, bal_window=False, force=False, task=True
):
"""Retrieve and update the list of available will executors.
Args:
bal_plugin: BAL plugin instance
update: If True, ping servers to refresh data
bal_window: GUI window for user prompts
force: Force update regardless of cached data age
task: Run as background task if True
Returns:
dict: Sorted dictionary of executor configurations
"""
willexecutors = bal_plugin.WILLEXECUTORS.get()
willexecutors = willexecutors.get(chainname, {})
to_del = []
for w in willexecutors:
if not isinstance(willexecutors[w], dict):
to_del.append(w)
continue
Willexecutors.initialize_willexecutor(willexecutors[w], w)
for w in to_del:
_logger.error(
"error Willexecutor to delete type:{} {}".format(
type(willexecutors[w]), w
)
)
del willexecutors[w]
bal = bal_plugin.WILLEXECUTORS.default.get(chainname, {})
for bal_url, bal_executor in bal.items():
if bal_url not in willexecutors:
_logger.debug(f"force add {bal_url} willexecutor")
willexecutors[bal_url] = bal_executor
# if update:
# found = False
# for url, we in willexecutors.items():
# if Willexecutors.is_selected(we):
# found = True
# if found or force:
# if bal_plugin.PING_WILLEXECUTORS.get() or force:
# ping_willexecutors = True
# if bal_plugin.ASK_PING_WILLEXECUTORS.get() and not force:
# if bal_window:
# ping_willexecutors = bal_window.window.question(
# _(
# "Contact willexecutors servers to update payment informations?"
# )
# )
# if ping_willexecutors:
# if task:
# bal_window.ping_willexecutors(willexecutors, task)
# else:
# bal_window.ping_willexecutors_task(willexecutors)
w_sorted = dict(
sorted(
willexecutors.items(), key=lambda w: w[1].get("sort", 0), reverse=True
)
)
return w_sorted
def is_selected(willexecutor, value=None):
"""Check or set the selection status of an executor.
Args:
willexecutor: Executor configuration dictionary
value: Optional boolean to set selection status
Returns:
bool: Current selection status
"""
if not willexecutor:
return False
if value is not None:
willexecutor["selected"] = value
try:
return willexecutor["selected"]
except Exception:
willexecutor["selected"] = False
return False
def get_willexecutor_transactions(will, force=False):
"""Collect transactions grouped by executor for valid, complete wills.
Args:
will: Dictionary of will items keyed by ID
force: Include already-pushed transactions
Returns:
dict: Executors with their aggregated transactions
"""
willexecutors = {}
for wid, willitem in will.items():
if willitem.get_status("VALID"):
if willitem.get_status("COMPLETE"):
if not willitem.get_status("PUSHED") or force:
if willexecutor := willitem.we:
url = willexecutor["url"]
if willexecutor and Willexecutors.is_selected(willexecutor):
if url not in willexecutors:
willexecutor["txs"] = ""
willexecutor["txsids"] = []
willexecutor["broadcast_status"] = _("Waiting...")
willexecutors[url] = willexecutor
willexecutors[url]["txs"] += str(willitem.tx) + "\n"
willexecutors[url]["txsids"].append(wid)
return willexecutors
# def only_selected_list(willexecutors):
# out = {}
# for url, v in willexecutors.items():
# if Willexecutors.is_selected(url):
# out[url] = v
# def push_transactions_to_willexecutors(will):
# willexecutors = Willexecutors.get_transactions_to_be_pushed()
# for url in willexecutors:
# willexecutor = willexecutors[url]
# if Willexecutors.is_selected(willexecutor):
# if "txs" in willexecutor:
# Willexecutors.push_transactions_to_willexecutor(
# willexecutors[url]["txs"], url
# )
def send_request(
method, url, data=None, *, timeout=10, handle_response=None, count_reply=0
):
"""Send HTTP request to a will executor server.
Args:
method: HTTP method (get/post)
url: Target server URL
data: Request payload
timeout: Timeout in seconds
handle_response: Response processing function
count_reply: Retry counter for timeouts
Returns:
Server response object
Raises:
Exception: On connection errors or invalid method
"""
network = Network.get_instance()
if not network:
raise Exception("You are offline.")
_logger.debug(f"<-- {method} {url} {data}")
headers = {}
headers["user-agent"] = f"BalPlugin v:{BalPlugin.__version__}"
headers["Content-Type"] = "text/plain"
if not handle_response:
handle_response = Willexecutors.handle_response
try:
if method == "get":
response = Network.send_http_on_proxy(
method,
url,
params=data,
headers=headers,
on_finish=handle_response,
timeout=timeout,
)
elif method == "post":
response = Network.send_http_on_proxy(
method,
url,
body=data,
headers=headers,
on_finish=handle_response,
timeout=timeout,
)
else:
raise Exception(f"unexpected {method=!r}")
except TimeoutError:
if count_reply < 10:
_logger.debug(f"timeout({count_reply}) error: retry in 3 sec...")
time.sleep(3)
return Willexecutors.send_request(
method,
url,
data,
timeout=timeout,
handle_response=handle_response,
count_reply=count_reply + 1,
)
else:
_logger.debug(f"Too many timeouts: {count_reply}")
except Exception as e:
raise e
else:
_logger.debug(f"--> {response}")
return response
def get_we_url_from_response(resp):
"""Extract base URL from response object.
Args:
resp: Response object with url attribute
Returns:
str: Base URL without trailing paths
"""
url_slices = str(resp.url).split("/")
if len(url_slices) > 2:
url_slices = url_slices[:-2]
return "/".join(url_slices)
async def handle_response(resp: ClientResponse):
"""Parse JSON response from executor server.
Args:
resp: aiohttp ClientResponse object
Returns:
Parsed JSON data or raw text on parse failure
"""
r = await resp.text()
try:
r = json.loads(r)
# url = Willexecutors.get_we_url_from_response(resp)
# r["url"]= url
# r["status"]=resp.status
except Exception as e:
_logger.debug(f"error handling response:{e}")
pass
return r
class AlreadyPresentException(Exception):
"""Raised when transactions already exist on executor server."""
pass
def push_transactions_to_willexecutor(willexecutor):
"""Push timelocked transactions to an executor server for broadcast.
Args:
willexecutor: Dict containing url and txs keys
Returns:
bool: True on success, False on failure
Raises:
AlreadyPresentException: If transactions already exist
"""
out = True
try:
_logger.debug(f"{willexecutor['url']}: {willexecutor['txs']}")
if w := Willexecutors.send_request(
"post",
willexecutor["url"] + "/" + chainname + "/pushtxs",
data=willexecutor["txs"].encode("ascii"),
):
willexecutor["broadcast_status"] = _("Success")
_logger.debug(f"pushed: {w}")
if w != "thx":
_logger.debug(f"error: {w}")
raise Exception(w)
else:
raise Exception("empty reply from:{willexecutor['url']}")
except Exception as e:
_logger.debug(f"error:{e}")
if str(e) == "already present":
raise Willexecutors.AlreadyPresentException()
out = False
willexecutor["broadcast_status"] = _("Failed")
return out
def ping_servers(willexecutors):
"""Ping all executor servers to update their information.
Args:
willexecutors: Dictionary of executor configurations
"""
for url, we in willexecutors.items():
Willexecutors.get_info_task(url, we)
def get_info_task(url, willexecutor):
"""Fetch current information from a single executor server.
Args:
url: Executor server URL
willexecutor: Configuration dict to update
Returns:
Updated willexecutor dict with status, base_fee, address
"""
w = None
try:
_logger.info("GETINFO_WILLEXECUTOR")
_logger.debug(url)
w = Willexecutors.send_request("get", url + "/" + chainname + "/info")
if isinstance(w, dict):
willexecutor["url"] = url
willexecutor["status"] = 200
willexecutor["base_fee"] = w["base_fee"]
willexecutor["address"] = w["address"]
willexecutor["info"] = w["info"]
_logger.debug(f"response_data {w}")
except Exception as e:
_logger.error(f"error {e} contacting {url}: {w}")
willexecutor["status"] = "KO"
willexecutor["last_update"] = datetime.now().timestamp()
return willexecutor
def initialize_willexecutor(willexecutor, url, status=None, old_willexecutor={}):
"""Initialize or merge executor configuration preserving user settings.
Args:
willexecutor: New executor configuration dict
url: Executor server URL
status: Optional status override
old_willexecutor: Previous config to preserve user preferences
"""
willexecutor["url"] = url
if status is not None:
willexecutor["status"] = status
else:
willexecutor["status"] = old_willexecutor.get("status",willexecutor.get("status","Ko"))
willexecutor["selected"]=Willexecutors.is_selected(old_willexecutor) or willexecutor.get("selected",False)
willexecutor["address"]=old_willexecutor.get("address",willexecutor.get("address",""))
willexecutor["promo_code"]=old_willexecutor.get("promo_code",willexecutor.get("promo_code"))
def download_list(old_willexecutors):
"""Download latest executor list from remote source.
Args:
old_willexecutors: Existing configs to merge with new list
Returns:
dict: Merged executor configurations
"""
try:
willexecutors = Willexecutors.send_request(
"get",
f"https://welist.bitcoin-after.life/data/{chainname}?page=0&limit=100",
)
# del willexecutors["status"]
for w in willexecutors:
if w not in ("status", "url"):
Willexecutors.initialize_willexecutor(
willexecutors[w], w, None, old_willexecutors.get(w,{})
)
# bal_plugin.WILLEXECUTORS.set(l)
# bal_plugin.config.set_key(bal_plugin.WILLEXECUTORS,l,save=True)
return willexecutors
except Exception as e:
_logger.error(f"Failed to download willexecutors list: {e}")
return {}
def get_willexecutors_list_from_json():
"""Load executor list from local willexecutors.json file.
Returns:
dict: Executor configurations from JSON file
"""
try:
with open("willexecutors.json") as f:
willexecutors = json.load(f)
for w in willexecutors:
willexecutor = willexecutors[w]
Willexecutors.initialize_willexecutor(willexecutor, w, "New", False)
# bal_plugin.WILLEXECUTORS.set(willexecutors)
return willexecutors
except Exception as e:
_logger.error(f"error opening willexecutors json: {e}")
return {}
def check_transaction(txid, url):
"""Check if a transaction exists on executor server.
Args:
txid: Transaction ID string
url: Executor server URL
Returns:
Server response about transaction existence
"""
_logger.debug(f"{url}:{txid}")
try:
w = Willexecutors.send_request(
"post", url + "/searchtx", data=txid.encode("ascii")
)
return w
except Exception as e:
_logger.error(f"error contacting {url} for checking txs {e}")
raise e
def compute_id(willexecutor):
"""Compute unique identifier for an executor.
Args:
willexecutor: Executor configuration dict
Returns:
str: Unique ID combining URL and chain name
"""
return "{}-{}".format(willexecutor.get("url"), willexecutor.get("chain"))
class WillExecutor:
"""Data class representing a single will executor server.
Attributes:
url: Executor server URL
base_fee: Fixed fee in satoshis
chain: Bitcoin chain name
info: Additional executor information
version: Plugin version compatibility
status: Connection status
is_selected: User selection flag
promo_code: Promotional discount code
"""
def __init__(
self,
url,
base_fee,
chain,
info,
version,
status,
is_selected=False,
promo_code="",
):
"""Initialize a new WillExecutor instance.
Args:
url: Executor server URL
base_fee: Fixed fee in satoshis
chain: Bitcoin chain name
info: Additional executor information
version: Plugin version compatibility
status: Connection status (OK/Ko)
is_selected: Whether user has selected this executor
promo_code: Promotional discount code
"""
self.url = url
self.base_fee = base_fee
self.chain = chain
self.info = info
self.version = version
self.status = status
self.promo_code = promo_code
self.is_selected = is_selected
self.id = self.compute_id()
def from_dict(d):
"""Create WillExecutor instance from a dictionary.
Args:
d: Dictionary containing executor data fields
Returns:
WillExecutor: New instance with defaults for missing fields
"""
return WillExecutor(
url=d.get("url", "http://localhost:8000"),
base_fee=d.get("base_fee", 1000),
chain=d.get("chain", chainname),
info=d.get("info", ""),
version=d.get("version", 0),
status=d.get("status", "Ko"),
is_selected=d.get("is_selected", "False"),
promo_code=d.get("promo_code", ""),
)
def to_dict(self):
"""Convert WillExecutor to dictionary representation.
Returns:
dict: Serializable representation excluding computed fields
"""
return {
"url": self.url,
"base_fee": self.base_fee,
"chain": self.chain,
"info": self.info,
"version": self.version,
"promo_code": self.promo_code,
}
def compute_id(self):
"""Generate unique identifier for this executor.
Returns:
str: Unique ID from URL and chain name
"""
return f"{self.url}-{self.chain}"