forked from bitcoinafterlife/bal-electrum-plugin
1052 lines
36 KiB
Python
1052 lines
36 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Utility module for the BAL Electrum plugin.
|
|
|
|
import bisect
|
|
from datetime import datetime, timedelta
|
|
|
|
from electrum.gui.qt.util import getSaveFileName
|
|
from electrum.i18n import _
|
|
from electrum.transaction import PartialTxOutput
|
|
from electrum.util import FileExportFailed
|
|
|
|
LOCKTIME_THRESHOLD = 500000000
|
|
|
|
|
|
class Util:
|
|
"""Utility class providing static methods for the BAL Electrum Plugin.
|
|
|
|
Contains helper functions for locktime handling, amount encoding/decoding,
|
|
heir and will-executor comparison, UTXO and transaction comparison,
|
|
debug printing, and data migration helpers.
|
|
"""
|
|
def locktime_to_str(locktime):
|
|
"""Convert a locktime value to a human-readable string.
|
|
|
|
If the locktime is a Unix timestamp (greater than LOCKTIME_THRESHOLD),
|
|
it is converted to an ISO 8601 date string. Otherwise, it is returned
|
|
as a plain string (typically a block height).
|
|
|
|
Args:
|
|
locktime: A locktime value, either a block height or a Unix timestamp.
|
|
|
|
Returns:
|
|
str: An ISO 8601 date string for timestamps, or the numeric value
|
|
as a string for block heights.
|
|
"""
|
|
try:
|
|
locktime = int(locktime)
|
|
if locktime > LOCKTIME_THRESHOLD:
|
|
dt = datetime.fromtimestamp(locktime).isoformat()
|
|
return dt
|
|
|
|
except Exception:
|
|
pass
|
|
return str(locktime)
|
|
|
|
def str_to_locktime(locktime):
|
|
"""Convert a string representation to a locktime value.
|
|
|
|
If the string ends with a suffix ('y', 'd', 'b'), it is preserved as-is
|
|
for later parsing. Numeric strings are converted to integers. ISO 8601
|
|
date strings are converted to Unix timestamps.
|
|
|
|
Args:
|
|
locktime (str): A locktime string, number, ISO date, or
|
|
suffixed interval ('y' for years, 'd' for days, 'b' for blocks).
|
|
|
|
Returns:
|
|
int or str: An integer locktime value, or the original string if it
|
|
carries a suffix for deferred parsing.
|
|
"""
|
|
try:
|
|
if locktime[-1] in ("y", "d", "b"):
|
|
return locktime
|
|
else:
|
|
return int(locktime)
|
|
except Exception:
|
|
pass
|
|
dt_object = datetime.fromisoformat(locktime)
|
|
timestamp = dt_object.timestamp()
|
|
return int(timestamp)
|
|
|
|
def parse_locktime_string(locktime, w=None):
|
|
"""Parse a locktime string into an integer locktime value.
|
|
|
|
Supports multiple formats:
|
|
- Plain integer: returned directly.
|
|
- Suffix 'y': converted to days (N * 365) then to a timestamp at midnight.
|
|
- Suffix 'd': converted to a timestamp N days from now at midnight.
|
|
- Suffix 'b': added to the current block height (requires wallet network).
|
|
|
|
Args:
|
|
locktime (str): The locktime string to parse.
|
|
w (optional): The Electrum wallet, used to resolve block-height-based
|
|
locktimes. Defaults to None.
|
|
|
|
Returns:
|
|
int: The resolved locktime as an integer (timestamp or block height),
|
|
or 0 if parsing fails.
|
|
"""
|
|
try:
|
|
return int(locktime)
|
|
|
|
except Exception:
|
|
pass
|
|
try:
|
|
now = datetime.now()
|
|
if locktime[-1] == "y":
|
|
locktime = str(int(locktime[:-1]) * 365) + "d"
|
|
if locktime[-1] == "d":
|
|
return int(
|
|
(now + timedelta(days=int(locktime[:-1])))
|
|
.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
.timestamp()
|
|
)
|
|
if locktime[-1] == "b":
|
|
locktime = int(locktime[:-1])
|
|
height = 0
|
|
if w:
|
|
height = Util.get_current_height(w.network)
|
|
locktime += int(height)
|
|
return int(locktime)
|
|
except Exception:
|
|
pass
|
|
return 0
|
|
|
|
def int_locktime(seconds=0, minutes=0, hours=0, days=0, blocks=0):
|
|
"""Convert time intervals into a locktime integer value in seconds.
|
|
|
|
Each unit is converted to seconds and summed. Block intervals are
|
|
estimated at 600 seconds (10 minutes) per block.
|
|
|
|
Args:
|
|
seconds (int): Number of seconds. Defaults to 0.
|
|
minutes (int): Number of minutes. Defaults to 0.
|
|
hours (int): Number of hours. Defaults to 0.
|
|
days (int): Number of days. Defaults to 0.
|
|
blocks (int): Number of blocks (600 seconds each). Defaults to 0.
|
|
|
|
Returns:
|
|
int: Total time in seconds.
|
|
"""
|
|
return int(
|
|
seconds
|
|
+ minutes * 60
|
|
+ hours * 60 * 60
|
|
+ days * 60 * 60 * 24
|
|
+ blocks * 600
|
|
)
|
|
|
|
def encode_amount(amount, decimal_point):
|
|
"""Encode a human-readable amount string to an integer value.
|
|
|
|
If the amount is a percentage (ends with '%'), it is returned as-is.
|
|
Otherwise, the float value is multiplied by 10^decimal_point to convert
|
|
to the smallest unit (e.g., satoshis for BTC).
|
|
|
|
Args:
|
|
amount (str): The amount string, either a number or a percentage.
|
|
decimal_point (int): The number of decimal places for encoding.
|
|
|
|
Returns:
|
|
str or int: The encoded integer amount, the original percentage string,
|
|
or 0 if encoding fails.
|
|
"""
|
|
if Util.is_perc(amount):
|
|
return amount
|
|
else:
|
|
try:
|
|
return int(float(amount) * pow(10, decimal_point))
|
|
except Exception:
|
|
return 0
|
|
|
|
def decode_amount(amount, decimal_point):
|
|
"""Decode an integer amount to a human-readable string.
|
|
|
|
If the amount is a percentage (ends with '%'), it is returned as-is.
|
|
Otherwise, the integer value is divided by 10^decimal_point and formatted
|
|
to the specified number of decimal places.
|
|
|
|
Args:
|
|
amount: The amount to decode, either an integer or a percentage string.
|
|
decimal_point (int): The number of decimal places for formatting.
|
|
|
|
Returns:
|
|
str: The decoded amount string formatted to decimal_point places,
|
|
the original percentage string, or a string representation on error.
|
|
"""
|
|
if Util.is_perc(amount):
|
|
return amount
|
|
else:
|
|
basestr = "{{:0.{}f}}".format(decimal_point)
|
|
try:
|
|
return basestr.format(float(amount) / pow(10, decimal_point))
|
|
except Exception:
|
|
return str(amount)
|
|
|
|
def is_perc(value):
|
|
"""Check whether a value represents a percentage.
|
|
|
|
Args:
|
|
value: The value to check.
|
|
|
|
Returns:
|
|
bool: True if the value ends with '%', False otherwise or on error.
|
|
"""
|
|
try:
|
|
return value[-1] == "%"
|
|
except Exception:
|
|
return False
|
|
|
|
def cmp_array(heira, heirb):
|
|
"""Compare two heir arrays element by element.
|
|
|
|
Args:
|
|
heira (list): First heir array.
|
|
heirb (list): Second heir array.
|
|
|
|
Returns:
|
|
bool: True if both arrays have the same length and identical elements,
|
|
False otherwise.
|
|
"""
|
|
try:
|
|
if len(heira) != len(heirb):
|
|
return False
|
|
for h in range(0, len(heira)):
|
|
if heira[h] != heirb[h]:
|
|
return False
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def cmp_heir(heira, heirb):
|
|
"""Compare two heirs by their address and amount.
|
|
|
|
Args:
|
|
heira: First heir (index 0=address, index 1=amount).
|
|
heirb: Second heir (same format).
|
|
|
|
Returns:
|
|
bool: True if both heirs have the same address and amount.
|
|
"""
|
|
if heira[0] == heirb[0] and heira[1] == heirb[1]:
|
|
return True
|
|
return False
|
|
|
|
def cmp_willexecutor(willexecutora, willexecutorb):
|
|
"""Compare two will executors for equality.
|
|
|
|
Two executors are considered equal if they are the same object, or if they
|
|
share the same url, address, and base_fee.
|
|
|
|
Args:
|
|
willexecutora (dict): First will executor dictionary.
|
|
willexecutorb (dict): Second will executor dictionary.
|
|
|
|
Returns:
|
|
bool: True if the executors are equal, False otherwise.
|
|
"""
|
|
if willexecutora == willexecutorb:
|
|
return True
|
|
try:
|
|
if (
|
|
willexecutora["url"] == willexecutorb["url"]
|
|
and willexecutora["address"] == willexecutorb["address"]
|
|
and willexecutora["base_fee"] == willexecutorb["base_fee"]
|
|
):
|
|
return True
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
def search_heir_by_values(heirs, heir, values):
|
|
"""Search for an heir in a dict by matching specific field values.
|
|
|
|
Iterates over all heirs and returns the key of the first heir whose
|
|
values for all specified fields match those of the given heir.
|
|
|
|
Args:
|
|
heirs (dict): Dictionary of heirs keyed by ID.
|
|
heir (dict): The reference heir to match against.
|
|
values (list): List of field keys to compare.
|
|
|
|
Returns:
|
|
str or bool: The key of the first matching heir, or False if no match.
|
|
"""
|
|
for h, v in heirs.items():
|
|
found = False
|
|
for val in values:
|
|
if val in v and v[val] != heir[val]:
|
|
found = True
|
|
|
|
if not found:
|
|
return h
|
|
return False
|
|
|
|
def cmp_heir_by_values(heira, heirb, values):
|
|
"""Compare two heirs by a set of specific field values.
|
|
|
|
Args:
|
|
heira (dict): First heir data dictionary.
|
|
heirb (dict): Second heir data dictionary.
|
|
values (list): List of field keys to compare.
|
|
|
|
Returns:
|
|
bool: True if all specified fields match between both heirs.
|
|
"""
|
|
for v in values:
|
|
if heira[v] != heirb[v]:
|
|
return False
|
|
return True
|
|
|
|
def cmp_heirs_by_values(
|
|
heirsa, heirsb, values, exclude_willexecutors=False, reverse=True
|
|
):
|
|
"""Compare two heir dictionaries by specific field values.
|
|
|
|
Performs a bidirectional comparison: every heir in heirsa must have a
|
|
matching heir in heirsb (and vice versa when reverse=True) for the
|
|
specified field values.
|
|
|
|
Args:
|
|
heirsa (dict): First dictionary of heirs.
|
|
heirsb (dict): Second dictionary of heirs.
|
|
values (list): Field keys to compare.
|
|
exclude_willexecutors (bool): If True, exclude will-executor entries
|
|
(containing 'w!ll3x3c"') from comparison. Defaults to False.
|
|
reverse (bool): If True, also verify the reverse comparison. Defaults
|
|
to True.
|
|
|
|
Returns:
|
|
bool: True if all matching heirs are found in both directions.
|
|
"""
|
|
for heira in heirsa:
|
|
if (
|
|
exclude_willexecutors and 'w!ll3x3c"' not in heira
|
|
) or not exclude_willexecutors:
|
|
found = False
|
|
for heirb in heirsb:
|
|
if Util.cmp_heir_by_values(heirsa[heira], heirsb[heirb], values):
|
|
found = True
|
|
if not found:
|
|
return False
|
|
if reverse:
|
|
return Util.cmp_heirs_by_values(
|
|
heirsb,
|
|
heirsa,
|
|
values,
|
|
exclude_willexecutors=exclude_willexecutors,
|
|
reverse=False,
|
|
)
|
|
else:
|
|
return True
|
|
|
|
def cmp_heirs(
|
|
heirsa,
|
|
heirsb,
|
|
cmp_function=lambda x, y: x[0] == y[0] and x[3] == y[3],
|
|
reverse=True,
|
|
):
|
|
"""Compare two heir dictionaries using a custom comparison function.
|
|
|
|
Performs a bidirectional comparison. Will-executor entries are excluded
|
|
from the comparison. If an exact key match fails, a fallback search by
|
|
values (indices 0 and 3) is attempted.
|
|
|
|
Args:
|
|
heirsa (dict): First dictionary of heirs.
|
|
heirsb (dict): Second dictionary of heirs.
|
|
cmp_function (callable): A function taking two heir values and returning
|
|
a bool. Defaults to comparing index 0 (address) and index 3.
|
|
reverse (bool): If True, also verify the reverse comparison. Defaults
|
|
to True.
|
|
|
|
Returns:
|
|
bool: True if all heirs match in both directions.
|
|
|
|
Raises:
|
|
Exception: Re-raises any exception encountered during comparison.
|
|
"""
|
|
try:
|
|
for heir in heirsa:
|
|
if 'w!ll3x3c"' not in heir:
|
|
if heir not in heirsb or not cmp_function(
|
|
heirsa[heir], heirsb[heir]
|
|
):
|
|
if not Util.search_heir_by_values(heirsb, heirsa[heir], [0, 3]):
|
|
return False
|
|
if reverse:
|
|
return Util.cmp_heirs(heirsb, heirsa, cmp_function, False)
|
|
else:
|
|
return True
|
|
except Exception as e:
|
|
raise e
|
|
return False
|
|
|
|
def cmp_inputs(inputsa, inputsb):
|
|
"""Compare two lists of transaction inputs for equality.
|
|
|
|
Args:
|
|
inputsa (list): First list of transaction inputs.
|
|
inputsb (list): Second list of transaction inputs.
|
|
|
|
Returns:
|
|
bool: True if both lists have the same length and every input in
|
|
inputsa is found in inputsb.
|
|
"""
|
|
if len(inputsa) != len(inputsb):
|
|
return False
|
|
for inputa in inputsa:
|
|
if not Util.in_utxo(inputa, inputsb):
|
|
return False
|
|
return True
|
|
|
|
def cmp_outputs(outputsa, outputsb, willexecutor_output=None):
|
|
"""Compare two lists of transaction outputs for equality.
|
|
|
|
Optionally excludes a specific will-executor output from the comparison,
|
|
as it may differ between transactions.
|
|
|
|
Args:
|
|
outputsa (list): First list of transaction outputs.
|
|
outputsb (list): Second list of transaction outputs.
|
|
willexecutor_output: An output to exclude from comparison. Defaults
|
|
to None.
|
|
|
|
Returns:
|
|
bool: True if both lists have the same length and all outputs match,
|
|
excluding the willexecutor_output if provided.
|
|
"""
|
|
if len(outputsa) != len(outputsb):
|
|
return False
|
|
for outputa in outputsa:
|
|
if not Util.cmp_output(outputa, willexecutor_output):
|
|
if not Util.in_output(outputa, outputsb):
|
|
return False
|
|
return True
|
|
|
|
def cmp_txs(txa, txb):
|
|
"""Compare two transactions for equality by inputs and outputs.
|
|
|
|
Args:
|
|
txa: First transaction object.
|
|
txb: Second transaction object.
|
|
|
|
Returns:
|
|
bool: True if both transactions have matching inputs and outputs.
|
|
"""
|
|
if not Util.cmp_inputs(txa.inputs(), txb.inputs()):
|
|
return False
|
|
if not Util.cmp_outputs(txa.outputs(), txb.outputs()):
|
|
return False
|
|
return True
|
|
|
|
def get_value_amount(txa, txb):
|
|
"""Calculate the total value of outputs that appear in both transactions.
|
|
|
|
For each output in txa, checks if a matching output (same amount and/or
|
|
same address) exists in txb. Returns the sum of matched output values,
|
|
or False if any output has neither a matching amount nor address.
|
|
|
|
Args:
|
|
txa: Reference transaction (whose outputs are evaluated).
|
|
txb: Comparison transaction.
|
|
|
|
Returns:
|
|
int or False: Total satoshi value of matched outputs, or False if
|
|
any output has no correspondence at all.
|
|
"""
|
|
outputsa = txa.outputs()
|
|
# outputsb = txb.outputs()
|
|
value_amount = 0
|
|
|
|
for outa in outputsa:
|
|
same_amount, same_address = Util.in_output(outa, txb.outputs())
|
|
if not (same_amount or same_address):
|
|
return False
|
|
if same_amount and same_address:
|
|
value_amount += outa.value
|
|
if same_amount:
|
|
pass
|
|
if same_address:
|
|
pass
|
|
|
|
return value_amount
|
|
|
|
def chk_locktime(timestamp_to_check, block_height_to_check, locktime):
|
|
"""Check whether a locktime has been reached.
|
|
|
|
If the locktime is a Unix timestamp (above LOCKTIME_THRESHOLD), it is
|
|
compared against timestamp_to_check. If it is a block height (below the
|
|
threshold), it is compared against block_height_to_check.
|
|
|
|
Note:
|
|
There is a known issue at the threshold boundary (LOCKTIME_THRESHOLD)
|
|
where the behavior is ambiguous.
|
|
|
|
Args:
|
|
timestamp_to_check (int): Current Unix timestamp to compare against.
|
|
block_height_to_check (int): Current block height to compare against.
|
|
locktime (int): The locktime value to check.
|
|
|
|
Returns:
|
|
bool: True if the locktime has been reached or exceeded.
|
|
"""
|
|
# TODO BUG: WHAT HAPPEN AT THRESHOLD?
|
|
locktime = int(locktime)
|
|
if locktime > LOCKTIME_THRESHOLD and locktime > timestamp_to_check:
|
|
return True
|
|
elif locktime < LOCKTIME_THRESHOLD and locktime > block_height_to_check:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def anticipate_locktime(locktime, blocks=0, hours=0, days=0):
|
|
"""Anticipate a locktime by subtracting a time interval from it.
|
|
|
|
Used to compute the checkalive time: the point at which the plugin
|
|
should verify whether the user is still alive before the locktime expires.
|
|
|
|
For timestamp-based locktimes, the interval is subtracted as seconds.
|
|
For block-height-based locktimes, the interval is converted to block
|
|
equivalents (hours * 6 + days * 144) and subtracted.
|
|
|
|
Args:
|
|
locktime (int): The original locktime value (timestamp or block height).
|
|
blocks (int): Number of blocks to subtract (each ~10 min). Defaults to 0.
|
|
hours (int): Number of hours to subtract. Defaults to 0.
|
|
days (int): Number of days to subtract. Defaults to 0.
|
|
|
|
Returns:
|
|
int: The anticipated locktime, adjusted by the specified interval.
|
|
Returns at least 1 to avoid invalid values.
|
|
"""
|
|
locktime = int(locktime)
|
|
out = 0
|
|
if locktime > LOCKTIME_THRESHOLD:
|
|
seconds = blocks * 600 + hours * 3600 + days * 86400
|
|
dt = datetime.fromtimestamp(locktime)
|
|
dt -= timedelta(seconds=seconds)
|
|
out = dt.timestamp()
|
|
else:
|
|
blocks -= hours * 6 + days * 144
|
|
out = locktime + blocks
|
|
|
|
if out < 1:
|
|
out = 1
|
|
return out
|
|
|
|
def cmp_locktime(locktimea, locktimeb):
|
|
"""Compare two locktime values.
|
|
|
|
If the locktimes are equal, returns 0. If both are suffixed strings
|
|
with the same unit suffix, compares their numeric parts.
|
|
|
|
Args:
|
|
locktimea: First locktime value.
|
|
locktimeb: Second locktime value.
|
|
|
|
Returns:
|
|
int: Negative if locktimea < locktimeb, 0 if equal, positive if greater.
|
|
"""
|
|
if locktimea == locktimeb:
|
|
return 0
|
|
strlocktimea = str(locktimea)
|
|
strlocktimeb = str(locktimeb)
|
|
# intlocktimea = Util.str_to_locktime(strlocktimea)
|
|
# intlocktimeb = Util.str_to_locktime(strlocktimeb)
|
|
if locktimea[-1] in "ydb":
|
|
if locktimeb[-1] == locktimea[-1]:
|
|
return int(strlocktimea[-1]) - int(strlocktimeb[-1])
|
|
else:
|
|
return int(locktimea) - (locktimeb)
|
|
|
|
def get_lowest_valid_tx(available_utxos, will):
|
|
"""Find the lowest valid transaction in a will by locktime.
|
|
|
|
Iterates through will items sorted by locktime to find the first
|
|
transaction whose inputs are still available.
|
|
|
|
Args:
|
|
available_utxos: List of available UTXOs.
|
|
will (dict): Dictionary of will items keyed by transaction ID.
|
|
|
|
Note:
|
|
This method is incomplete and currently does not return a value.
|
|
"""
|
|
will = sorted(will.items(), key=lambda x: x[1]["tx"].locktime)
|
|
for txid, willitem in will.items():
|
|
pass
|
|
|
|
def get_locktimes(will):
|
|
"""Extract all unique locktime values from a will.
|
|
|
|
Args:
|
|
will (dict): Dictionary of will items keyed by transaction ID.
|
|
|
|
Returns:
|
|
dict_keys: Collection of unique locktime values from the will.
|
|
"""
|
|
locktimes = {}
|
|
for txid, willitem in will.items():
|
|
locktimes[willitem["tx"].locktime] = True
|
|
return locktimes.keys()
|
|
|
|
def get_lowest_locktimes(locktimes):
|
|
"""Separate and sort locktimes into timestamp and block-height groups.
|
|
|
|
Locktimes below LOCKTIME_THRESHOLD are sorted as block heights;
|
|
those above are sorted as timestamps.
|
|
|
|
Args:
|
|
locktimes: Iterable of locktime values.
|
|
|
|
Returns:
|
|
tuple: (sorted_timestamp_list, sorted_block_list)
|
|
"""
|
|
sorted_timestamp = []
|
|
sorted_block = []
|
|
for locktime in locktimes:
|
|
locktime = Util.parse_locktime_string(locktime)
|
|
if locktime < LOCKTIME_THRESHOLD:
|
|
bisect.insort(sorted_block, locktime)
|
|
else:
|
|
bisect.insort(sorted_timestamp, locktime)
|
|
|
|
return sorted(sorted_timestamp), sorted(sorted_block)
|
|
|
|
def get_lowest_locktimes_from_will(will):
|
|
"""Get the lowest locktimes from a will, separated by type.
|
|
|
|
Convenience method that combines get_locktimes and get_lowest_locktimes.
|
|
|
|
Args:
|
|
will (dict): Dictionary of will items keyed by transaction ID.
|
|
|
|
Returns:
|
|
tuple: (sorted_timestamp_list, sorted_block_list)
|
|
"""
|
|
return Util.get_lowest_locktimes(Util.get_locktimes(will))
|
|
|
|
def search_willtx_per_io(will, tx):
|
|
"""Search for a will transaction matching a given transaction by inputs and outputs.
|
|
|
|
Args:
|
|
will (dict): Dictionary of will items keyed by transaction ID.
|
|
tx (dict): Transaction dictionary with a 'tx' key containing the
|
|
transaction to match.
|
|
|
|
Returns:
|
|
tuple: (will_id, will_item) if found, (None, None) otherwise.
|
|
"""
|
|
for wid, w in will.items():
|
|
if Util.cmp_txs(w["tx"], tx["tx"]):
|
|
return wid, w
|
|
return None, None
|
|
|
|
def invalidate_will(will):
|
|
"""Invalidate a will.
|
|
|
|
Note:
|
|
Not yet implemented.
|
|
|
|
Args:
|
|
will (dict): Dictionary of will items to invalidate.
|
|
|
|
Raises:
|
|
Exception: Always raises "not implemented".
|
|
"""
|
|
raise Exception("not implemented")
|
|
|
|
def get_will_spent_utxos(will):
|
|
"""Collect all UTXOs spent by the transactions in a will.
|
|
|
|
Args:
|
|
will (dict): Dictionary of will items keyed by transaction ID.
|
|
|
|
Returns:
|
|
list: All input UTXOs from all transactions in the will.
|
|
"""
|
|
utxos = []
|
|
for txid, willitem in will.items():
|
|
utxos += willitem["tx"].inputs()
|
|
|
|
return utxos
|
|
|
|
def utxo_to_str(utxo):
|
|
"""Convert a UTXO to its string representation.
|
|
|
|
Tries multiple methods: utxo.to_str(), utxo.prevout.to_str(),
|
|
or str() as a fallback.
|
|
|
|
Args:
|
|
utxo: A UTXO object.
|
|
|
|
Returns:
|
|
str: String representation of the UTXO.
|
|
"""
|
|
try:
|
|
return utxo.to_str()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
return utxo.prevout.to_str()
|
|
except Exception:
|
|
pass
|
|
return str(utxo)
|
|
|
|
def cmp_utxo(utxoa, utxob):
|
|
"""Compare two UTXOs for equality based on their string representation.
|
|
|
|
Args:
|
|
utxoa: First UTXO object.
|
|
utxob: Second UTXO object.
|
|
|
|
Returns:
|
|
bool: True if both UTXOs have the same string representation.
|
|
"""
|
|
utxoa = Util.utxo_to_str(utxoa)
|
|
utxob = Util.utxo_to_str(utxob)
|
|
if utxoa == utxob:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def in_utxo(utxo, utxos):
|
|
"""Check if a UTXO exists in a list of UTXOs.
|
|
|
|
Args:
|
|
utxo: The UTXO to search for.
|
|
utxos (list): List of UTXOs to search in.
|
|
|
|
Returns:
|
|
bool: True if the UTXO is found in the list.
|
|
"""
|
|
for s_u in utxos:
|
|
if Util.cmp_utxo(s_u, utxo):
|
|
return True
|
|
return False
|
|
|
|
def txid_in_utxo(txid, utxos):
|
|
"""Check if a transaction ID exists in any UTXO's prevout.
|
|
|
|
Args:
|
|
txid (str): The transaction ID to search for.
|
|
utxos (list): List of UTXOs to search in.
|
|
|
|
Returns:
|
|
bool: True if any UTXO's prevout matches the given txid.
|
|
"""
|
|
for s_u in utxos:
|
|
if s_u.prevout.txid == txid:
|
|
return True
|
|
return False
|
|
|
|
def cmp_output(outputa, outputb):
|
|
"""Compare two transaction outputs for equality by address and value.
|
|
|
|
Args:
|
|
outputa: First transaction output.
|
|
outputb: Second transaction output.
|
|
|
|
Returns:
|
|
bool: True if both outputs have the same address and value.
|
|
"""
|
|
return outputa.address == outputb.address and outputa.value == outputb.value
|
|
|
|
def in_output(output, outputs):
|
|
"""Check if an output exists in a list of outputs.
|
|
|
|
Args:
|
|
output: The transaction output to search for.
|
|
outputs (list): List of transaction outputs to search in.
|
|
|
|
Returns:
|
|
bool: True if a matching output is found in the list.
|
|
"""
|
|
for s_o in outputs:
|
|
if Util.cmp_output(s_o, output):
|
|
return True
|
|
return False
|
|
|
|
# check all output with the same amount if none have the same address it can be a change
|
|
# return true true same address same amount
|
|
# return true false same amount different address
|
|
# return false false different amount, different address not found
|
|
|
|
def din_output(out, outputs):
|
|
"""Differentiate an output against a list of outputs by amount and address.
|
|
|
|
Checks whether the output's amount appears in the list, and whether
|
|
any output with the same amount also shares the same address. Used to
|
|
detect change outputs.
|
|
|
|
Args:
|
|
out: The transaction output to check.
|
|
outputs (list): List of transaction outputs to compare against.
|
|
|
|
Returns:
|
|
tuple: (same_amount, same_address) where:
|
|
- (True, True): same address and same amount found.
|
|
- (True, False): same amount but different address (potential change).
|
|
- (False, False): no matching amount found.
|
|
"""
|
|
same_amount = []
|
|
for s_o in outputs:
|
|
if int(out.value) == int(s_o.value):
|
|
same_amount.append(s_o)
|
|
if out.address == s_o.address:
|
|
return True, True
|
|
else:
|
|
pass
|
|
|
|
if len(same_amount) > 0:
|
|
return True, False
|
|
else:
|
|
return False, False
|
|
|
|
def get_change_output(wallet, in_amount, out_amount, fee):
|
|
"""Create a change output for a transaction if the change exceeds the dust threshold.
|
|
|
|
Calculates the change amount as inputs minus outputs minus fees, and
|
|
creates a PartialTxOutput directed to a new change address if the amount
|
|
is above the wallet's dust threshold.
|
|
|
|
Args:
|
|
wallet: The Electrum wallet object.
|
|
in_amount (int): Total input amount in satoshis.
|
|
out_amount (int): Total output amount in satoshis.
|
|
fee (int): Transaction fee in satoshis.
|
|
|
|
Returns:
|
|
PartialTxOutput or None: A change output if change exceeds dust threshold,
|
|
otherwise None.
|
|
"""
|
|
change_amount = int(in_amount - out_amount - fee)
|
|
if change_amount > wallet.dust_threshold():
|
|
change_addresses = wallet.get_change_addresses_for_new_transaction()
|
|
out = PartialTxOutput.from_address_and_value(
|
|
change_addresses[0], change_amount
|
|
)
|
|
out.is_change = True
|
|
return out
|
|
|
|
def get_current_height(network):
|
|
"""Get the current best block height from the Electrum network.
|
|
|
|
Uses the minimum of the SPV-verified chain height and the server-reported
|
|
height to discourage fee sniping. Returns 0 if the network is unavailable,
|
|
stale, or suspiciously divergent.
|
|
|
|
Args:
|
|
network: The Electrum network object.
|
|
|
|
Returns:
|
|
int: The current block height, or 0 if unavailable/unreliable.
|
|
"""
|
|
# if no network or not up to date, just set locktime to zero
|
|
if not network:
|
|
return 0
|
|
chain = network.blockchain()
|
|
if chain.is_tip_stale():
|
|
return 0
|
|
# figure out current block height
|
|
chain_height = chain.height() # learnt from all connected servers, SPV-checked
|
|
server_height = (
|
|
network.get_server_height()
|
|
) # height claimed by main server, unverified
|
|
# note: main server might be lagging (either is slow, is malicious, or there is an SPV-invisible-hard-fork)
|
|
# - if it's lagging too much, it is the network's job to switch away
|
|
if server_height < chain_height - 10:
|
|
# the diff is suspiciously large... give up and use something non-fingerprintable
|
|
return 0
|
|
# discourage "fee sniping"
|
|
height = min(chain_height, server_height)
|
|
return height
|
|
|
|
def print_var(var, name="", veryverbose=False):
|
|
"""Print detailed debug information about a variable.
|
|
|
|
Attempts to print various representations: str, repr, dict, dir, type,
|
|
to_json, and __slotnames__.
|
|
|
|
Args:
|
|
var: The variable to inspect.
|
|
name (str): A label for the output header. Defaults to "".
|
|
veryverbose (bool): Unused parameter. Defaults to False.
|
|
"""
|
|
print(f"---{name}---")
|
|
if var is not None:
|
|
try:
|
|
print("str:", str(var))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
print("repr", repr(var))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
print("dict", dict(var))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
print("dir", dir(var))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
print("type", type(var))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
print("to_json", var.to_json())
|
|
except Exception:
|
|
pass
|
|
try:
|
|
print("__slotnames__", var.__slotnames__)
|
|
except Exception:
|
|
pass
|
|
|
|
print(f"---end {name}---")
|
|
|
|
def print_utxo(utxo, name=""):
|
|
"""Print detailed debug information about a UTXO.
|
|
|
|
Prints the UTXO itself, its prevout, script_sig, witness, and private
|
|
attributes (address, scriptpubkey, value_sats).
|
|
|
|
Args:
|
|
utxo: The UTXO object to inspect.
|
|
name (str): A label for the output header. Defaults to "".
|
|
"""
|
|
print(f"---utxo-{name}---")
|
|
Util.print_var(utxo, name)
|
|
Util.print_prevout(utxo.prevout, name)
|
|
Util.print_var(utxo.script_sig, f"{name}-script-sig")
|
|
Util.print_var(utxo.witness, f"{name}-witness")
|
|
print("_TxInput__address:", utxo._TxInput__address)
|
|
print("_TxInput__scriptpubkey:", utxo._TxInput__scriptpubkey)
|
|
print("_TxInput__value_sats:", utxo._TxInput__value_sats)
|
|
print(f"---utxo-end {name}---")
|
|
|
|
def print_prevout(prevout, name=""):
|
|
"""Print detailed debug information about a prevout object.
|
|
|
|
Args:
|
|
prevout: The prevout object to inspect.
|
|
name (str): A label for the output header. Defaults to "".
|
|
"""
|
|
print(f"---prevout-{name}---")
|
|
Util.print_var(prevout, f"{name}-prevout")
|
|
Util.print_var(prevout._asdict())
|
|
print(f"---prevout-end {name}---")
|
|
|
|
def export_meta_gui(electrum_window, title, exporter):
|
|
"""Export data to a file via the Electrum GUI save dialog.
|
|
|
|
Opens a file save dialog, calls the exporter function with the chosen
|
|
filename, and shows a success or error message.
|
|
|
|
Args:
|
|
electrum_window: The Electrum main window object.
|
|
title (str): Description of the data being exported.
|
|
exporter (callable): A function that takes a filename and writes the
|
|
data to it. Raises FileExportFailed on error.
|
|
"""
|
|
filter_ = "All files (*)"
|
|
filename = getSaveFileName(
|
|
parent=electrum_window,
|
|
title=_("Select file to save your {}".format(title)),
|
|
filename="BALplugin_{}".format(title),
|
|
filter=filter_,
|
|
config=electrum_window.config,
|
|
)
|
|
if not filename:
|
|
return
|
|
try:
|
|
exporter(filename)
|
|
except FileExportFailed as e:
|
|
electrum_window.show_critical(str(e))
|
|
else:
|
|
electrum_window.show_message(
|
|
_("Your {0} were exported to '{1}'".format(title, str(filename)))
|
|
)
|
|
|
|
def copy(dicto, dictfrom):
|
|
"""Copy all key-value pairs from one dictionary to another.
|
|
|
|
Args:
|
|
dicto (dict): The destination dictionary.
|
|
dictfrom (dict): The source dictionary.
|
|
"""
|
|
for k, v in dictfrom.items():
|
|
dicto[k] = v
|
|
|
|
def fix_will_settings_tx_fees(will_settings):
|
|
"""Migrate will settings from old 'tx_fees' key to 'baltx_fees'.
|
|
|
|
Renames the 'tx_fees' key to 'baltx_fees' in the will settings dictionary
|
|
for backward compatibility.
|
|
|
|
Args:
|
|
will_settings (dict): The will settings dictionary to update.
|
|
|
|
Returns:
|
|
bool: True if the key was renamed, False if no migration was needed.
|
|
"""
|
|
tx_fees = will_settings.get("tx_fees", False)
|
|
have_to_update = False
|
|
if tx_fees:
|
|
will_settings["baltx_fees"] = tx_fees
|
|
del will_settings["tx_fees"]
|
|
have_to_update = True
|
|
return have_to_update
|
|
|
|
def fix_will_tx_fees(will):
|
|
"""Migrate will transactions from old 'tx_fees' key to 'baltx_fees'.
|
|
|
|
Iterates through all will items and renames the 'tx_fees' key to
|
|
'baltx_fees' in each transaction for backward compatibility.
|
|
|
|
Args:
|
|
will (dict): Dictionary of will items keyed by transaction ID.
|
|
|
|
Returns:
|
|
bool: True if any key was renamed, False if no migration was needed.
|
|
"""
|
|
have_to_update = False
|
|
for txid, willitem in will.items():
|
|
tx_fees = willitem.get("tx_fees", False)
|
|
if tx_fees:
|
|
will[txid]["baltx_fees"] = tx_fees
|
|
del will[txid]["tx_fees"]
|
|
have_to_update = True
|
|
return have_to_update
|
|
|
|
def text_to_hex(text: str) -> str:
|
|
"""Convert text to a hexadecimal string.
|
|
|
|
Args:
|
|
text (str): The text to convert.
|
|
|
|
Returns:
|
|
str: The hexadecimal representation of the text.
|
|
"""
|
|
hex_string = text.encode('utf-8').hex()
|
|
return hex_string
|
|
|
|
|
|
def hex_to_text(hex_string: str) -> str:
|
|
"""Convert a hexadecimal string back to text.
|
|
|
|
Args:
|
|
hex_string (str): The hexadecimal string to convert.
|
|
|
|
Returns:
|
|
str: The decoded text, or an error message if the hex string is invalid.
|
|
"""
|
|
try:
|
|
return bytes.fromhex(hex_string).decode('utf-8')
|
|
except Exception:
|
|
return "Error: Invalid hex string"
|