diff --git a/util.py b/util.py index af79d28..3213b03 100644 --- a/util.py +++ b/util.py @@ -1,3 +1,7 @@ +# -*- coding: utf-8 -*- + +# Utility module for the BAL Electrum plugin. + import bisect from datetime import datetime, timedelta @@ -10,7 +14,26 @@ LOCKTIME_THRESHOLD = 500000000 class Util: + """Utility class providing static methods for the BAL Electrum Plugin. + + Contains helper functions for locktime handling, amount encoding/decoding, + heir and will-executor comparison, UTXO and transaction comparison, + debug printing, and data migration helpers. + """ def locktime_to_str(locktime): + """Convert a locktime value to a human-readable string. + + If the locktime is a Unix timestamp (greater than LOCKTIME_THRESHOLD), + it is converted to an ISO 8601 date string. Otherwise, it is returned + as a plain string (typically a block height). + + Args: + locktime: A locktime value, either a block height or a Unix timestamp. + + Returns: + str: An ISO 8601 date string for timestamps, or the numeric value + as a string for block heights. + """ try: locktime = int(locktime) if locktime > LOCKTIME_THRESHOLD: @@ -22,6 +45,20 @@ class Util: return str(locktime) def str_to_locktime(locktime): + """Convert a string representation to a locktime value. + + If the string ends with a suffix ('y', 'd', 'b'), it is preserved as-is + for later parsing. Numeric strings are converted to integers. ISO 8601 + date strings are converted to Unix timestamps. + + Args: + locktime (str): A locktime string, number, ISO date, or + suffixed interval ('y' for years, 'd' for days, 'b' for blocks). + + Returns: + int or str: An integer locktime value, or the original string if it + carries a suffix for deferred parsing. + """ try: if locktime[-1] in ("y", "d", "b"): return locktime @@ -34,6 +71,23 @@ class Util: return int(timestamp) def parse_locktime_string(locktime, w=None): + """Parse a locktime string into an integer locktime value. + + Supports multiple formats: + - Plain integer: returned directly. + - Suffix 'y': converted to days (N * 365) then to a timestamp at midnight. + - Suffix 'd': converted to a timestamp N days from now at midnight. + - Suffix 'b': added to the current block height (requires wallet network). + + Args: + locktime (str): The locktime string to parse. + w (optional): The Electrum wallet, used to resolve block-height-based + locktimes. Defaults to None. + + Returns: + int: The resolved locktime as an integer (timestamp or block height), + or 0 if parsing fails. + """ try: return int(locktime) @@ -61,6 +115,21 @@ class Util: return 0 def int_locktime(seconds=0, minutes=0, hours=0, days=0, blocks=0): + """Convert time intervals into a locktime integer value in seconds. + + Each unit is converted to seconds and summed. Block intervals are + estimated at 600 seconds (10 minutes) per block. + + Args: + seconds (int): Number of seconds. Defaults to 0. + minutes (int): Number of minutes. Defaults to 0. + hours (int): Number of hours. Defaults to 0. + days (int): Number of days. Defaults to 0. + blocks (int): Number of blocks (600 seconds each). Defaults to 0. + + Returns: + int: Total time in seconds. + """ return int( seconds + minutes * 60 @@ -70,6 +139,20 @@ class Util: ) def encode_amount(amount, decimal_point): + """Encode a human-readable amount string to an integer value. + + If the amount is a percentage (ends with '%'), it is returned as-is. + Otherwise, the float value is multiplied by 10^decimal_point to convert + to the smallest unit (e.g., satoshis for BTC). + + Args: + amount (str): The amount string, either a number or a percentage. + decimal_point (int): The number of decimal places for encoding. + + Returns: + str or int: The encoded integer amount, the original percentage string, + or 0 if encoding fails. + """ if Util.is_perc(amount): return amount else: @@ -79,6 +162,20 @@ class Util: return 0 def decode_amount(amount, decimal_point): + """Decode an integer amount to a human-readable string. + + If the amount is a percentage (ends with '%'), it is returned as-is. + Otherwise, the integer value is divided by 10^decimal_point and formatted + to the specified number of decimal places. + + Args: + amount: The amount to decode, either an integer or a percentage string. + decimal_point (int): The number of decimal places for formatting. + + Returns: + str: The decoded amount string formatted to decimal_point places, + the original percentage string, or a string representation on error. + """ if Util.is_perc(amount): return amount else: @@ -89,12 +186,30 @@ class Util: return str(amount) def is_perc(value): + """Check whether a value represents a percentage. + + Args: + value: The value to check. + + Returns: + bool: True if the value ends with '%', False otherwise or on error. + """ try: return value[-1] == "%" except Exception: return False def cmp_array(heira, heirb): + """Compare two heir arrays element by element. + + Args: + heira (list): First heir array. + heirb (list): Second heir array. + + Returns: + bool: True if both arrays have the same length and identical elements, + False otherwise. + """ try: if len(heira) != len(heirb): return False @@ -106,11 +221,32 @@ class Util: return False def cmp_heir(heira, heirb): + """Compare two heirs by their address and amount. + + Args: + heira: First heir (index 0=address, index 1=amount). + heirb: Second heir (same format). + + Returns: + bool: True if both heirs have the same address and amount. + """ if heira[0] == heirb[0] and heira[1] == heirb[1]: return True return False def cmp_willexecutor(willexecutora, willexecutorb): + """Compare two will executors for equality. + + Two executors are considered equal if they are the same object, or if they + share the same url, address, and base_fee. + + Args: + willexecutora (dict): First will executor dictionary. + willexecutorb (dict): Second will executor dictionary. + + Returns: + bool: True if the executors are equal, False otherwise. + """ if willexecutora == willexecutorb: return True try: @@ -125,6 +261,19 @@ class Util: return False def search_heir_by_values(heirs, heir, values): + """Search for an heir in a dict by matching specific field values. + + Iterates over all heirs and returns the key of the first heir whose + values for all specified fields match those of the given heir. + + Args: + heirs (dict): Dictionary of heirs keyed by ID. + heir (dict): The reference heir to match against. + values (list): List of field keys to compare. + + Returns: + str or bool: The key of the first matching heir, or False if no match. + """ for h, v in heirs.items(): found = False for val in values: @@ -136,6 +285,16 @@ class Util: return False def cmp_heir_by_values(heira, heirb, values): + """Compare two heirs by a set of specific field values. + + Args: + heira (dict): First heir data dictionary. + heirb (dict): Second heir data dictionary. + values (list): List of field keys to compare. + + Returns: + bool: True if all specified fields match between both heirs. + """ for v in values: if heira[v] != heirb[v]: return False @@ -144,6 +303,24 @@ class Util: def cmp_heirs_by_values( heirsa, heirsb, values, exclude_willexecutors=False, reverse=True ): + """Compare two heir dictionaries by specific field values. + + Performs a bidirectional comparison: every heir in heirsa must have a + matching heir in heirsb (and vice versa when reverse=True) for the + specified field values. + + Args: + heirsa (dict): First dictionary of heirs. + heirsb (dict): Second dictionary of heirs. + values (list): Field keys to compare. + exclude_willexecutors (bool): If True, exclude will-executor entries + (containing 'w!ll3x3c"') from comparison. Defaults to False. + reverse (bool): If True, also verify the reverse comparison. Defaults + to True. + + Returns: + bool: True if all matching heirs are found in both directions. + """ for heira in heirsa: if ( exclude_willexecutors and 'w!ll3x3c"' not in heira @@ -171,6 +348,26 @@ class Util: cmp_function=lambda x, y: x[0] == y[0] and x[3] == y[3], reverse=True, ): + """Compare two heir dictionaries using a custom comparison function. + + Performs a bidirectional comparison. Will-executor entries are excluded + from the comparison. If an exact key match fails, a fallback search by + values (indices 0 and 3) is attempted. + + Args: + heirsa (dict): First dictionary of heirs. + heirsb (dict): Second dictionary of heirs. + cmp_function (callable): A function taking two heir values and returning + a bool. Defaults to comparing index 0 (address) and index 3. + reverse (bool): If True, also verify the reverse comparison. Defaults + to True. + + Returns: + bool: True if all heirs match in both directions. + + Raises: + Exception: Re-raises any exception encountered during comparison. + """ try: for heir in heirsa: if 'w!ll3x3c"' not in heir: @@ -188,6 +385,16 @@ class Util: return False def cmp_inputs(inputsa, inputsb): + """Compare two lists of transaction inputs for equality. + + Args: + inputsa (list): First list of transaction inputs. + inputsb (list): Second list of transaction inputs. + + Returns: + bool: True if both lists have the same length and every input in + inputsa is found in inputsb. + """ if len(inputsa) != len(inputsb): return False for inputa in inputsa: @@ -196,6 +403,21 @@ class Util: return True def cmp_outputs(outputsa, outputsb, willexecutor_output=None): + """Compare two lists of transaction outputs for equality. + + Optionally excludes a specific will-executor output from the comparison, + as it may differ between transactions. + + Args: + outputsa (list): First list of transaction outputs. + outputsb (list): Second list of transaction outputs. + willexecutor_output: An output to exclude from comparison. Defaults + to None. + + Returns: + bool: True if both lists have the same length and all outputs match, + excluding the willexecutor_output if provided. + """ if len(outputsa) != len(outputsb): return False for outputa in outputsa: @@ -205,6 +427,15 @@ class Util: return True def cmp_txs(txa, txb): + """Compare two transactions for equality by inputs and outputs. + + Args: + txa: First transaction object. + txb: Second transaction object. + + Returns: + bool: True if both transactions have matching inputs and outputs. + """ if not Util.cmp_inputs(txa.inputs(), txb.inputs()): return False if not Util.cmp_outputs(txa.outputs(), txb.outputs()): @@ -212,6 +443,20 @@ class Util: return True def get_value_amount(txa, txb): + """Calculate the total value of outputs that appear in both transactions. + + For each output in txa, checks if a matching output (same amount and/or + same address) exists in txb. Returns the sum of matched output values, + or False if any output has neither a matching amount nor address. + + Args: + txa: Reference transaction (whose outputs are evaluated). + txb: Comparison transaction. + + Returns: + int or False: Total satoshi value of matched outputs, or False if + any output has no correspondence at all. + """ outputsa = txa.outputs() # outputsb = txb.outputs() value_amount = 0 @@ -230,6 +475,24 @@ class Util: return value_amount def chk_locktime(timestamp_to_check, block_height_to_check, locktime): + """Check whether a locktime has been reached. + + If the locktime is a Unix timestamp (above LOCKTIME_THRESHOLD), it is + compared against timestamp_to_check. If it is a block height (below the + threshold), it is compared against block_height_to_check. + + Note: + There is a known issue at the threshold boundary (LOCKTIME_THRESHOLD) + where the behavior is ambiguous. + + Args: + timestamp_to_check (int): Current Unix timestamp to compare against. + block_height_to_check (int): Current block height to compare against. + locktime (int): The locktime value to check. + + Returns: + bool: True if the locktime has been reached or exceeded. + """ # TODO BUG: WHAT HAPPEN AT THRESHOLD? locktime = int(locktime) if locktime > LOCKTIME_THRESHOLD and locktime > timestamp_to_check: @@ -240,6 +503,25 @@ class Util: return False def anticipate_locktime(locktime, blocks=0, hours=0, days=0): + """Anticipate a locktime by subtracting a time interval from it. + + Used to compute the checkalive time: the point at which the plugin + should verify whether the user is still alive before the locktime expires. + + For timestamp-based locktimes, the interval is subtracted as seconds. + For block-height-based locktimes, the interval is converted to block + equivalents (hours * 6 + days * 144) and subtracted. + + Args: + locktime (int): The original locktime value (timestamp or block height). + blocks (int): Number of blocks to subtract (each ~10 min). Defaults to 0. + hours (int): Number of hours to subtract. Defaults to 0. + days (int): Number of days to subtract. Defaults to 0. + + Returns: + int: The anticipated locktime, adjusted by the specified interval. + Returns at least 1 to avoid invalid values. + """ locktime = int(locktime) out = 0 if locktime > LOCKTIME_THRESHOLD: @@ -256,6 +538,18 @@ class Util: return out def cmp_locktime(locktimea, locktimeb): + """Compare two locktime values. + + If the locktimes are equal, returns 0. If both are suffixed strings + with the same unit suffix, compares their numeric parts. + + Args: + locktimea: First locktime value. + locktimeb: Second locktime value. + + Returns: + int: Negative if locktimea < locktimeb, 0 if equal, positive if greater. + """ if locktimea == locktimeb: return 0 strlocktimea = str(locktimea) @@ -269,17 +563,48 @@ class Util: return int(locktimea) - (locktimeb) def get_lowest_valid_tx(available_utxos, will): + """Find the lowest valid transaction in a will by locktime. + + Iterates through will items sorted by locktime to find the first + transaction whose inputs are still available. + + Args: + available_utxos: List of available UTXOs. + will (dict): Dictionary of will items keyed by transaction ID. + + Note: + This method is incomplete and currently does not return a value. + """ will = sorted(will.items(), key=lambda x: x[1]["tx"].locktime) for txid, willitem in will.items(): pass def get_locktimes(will): + """Extract all unique locktime values from a will. + + Args: + will (dict): Dictionary of will items keyed by transaction ID. + + Returns: + dict_keys: Collection of unique locktime values from the will. + """ locktimes = {} for txid, willitem in will.items(): locktimes[willitem["tx"].locktime] = True return locktimes.keys() def get_lowest_locktimes(locktimes): + """Separate and sort locktimes into timestamp and block-height groups. + + Locktimes below LOCKTIME_THRESHOLD are sorted as block heights; + those above are sorted as timestamps. + + Args: + locktimes: Iterable of locktime values. + + Returns: + tuple: (sorted_timestamp_list, sorted_block_list) + """ sorted_timestamp = [] sorted_block = [] for locktime in locktimes: @@ -292,18 +617,57 @@ class Util: return sorted(sorted_timestamp), sorted(sorted_block) def get_lowest_locktimes_from_will(will): + """Get the lowest locktimes from a will, separated by type. + + Convenience method that combines get_locktimes and get_lowest_locktimes. + + Args: + will (dict): Dictionary of will items keyed by transaction ID. + + Returns: + tuple: (sorted_timestamp_list, sorted_block_list) + """ return Util.get_lowest_locktimes(Util.get_locktimes(will)) def search_willtx_per_io(will, tx): + """Search for a will transaction matching a given transaction by inputs and outputs. + + Args: + will (dict): Dictionary of will items keyed by transaction ID. + tx (dict): Transaction dictionary with a 'tx' key containing the + transaction to match. + + Returns: + tuple: (will_id, will_item) if found, (None, None) otherwise. + """ for wid, w in will.items(): if Util.cmp_txs(w["tx"], tx["tx"]): return wid, w return None, None def invalidate_will(will): + """Invalidate a will. + + Note: + Not yet implemented. + + Args: + will (dict): Dictionary of will items to invalidate. + + Raises: + Exception: Always raises "not implemented". + """ raise Exception("not implemented") def get_will_spent_utxos(will): + """Collect all UTXOs spent by the transactions in a will. + + Args: + will (dict): Dictionary of will items keyed by transaction ID. + + Returns: + list: All input UTXOs from all transactions in the will. + """ utxos = [] for txid, willitem in will.items(): utxos += willitem["tx"].inputs() @@ -311,6 +675,17 @@ class Util: return utxos def utxo_to_str(utxo): + """Convert a UTXO to its string representation. + + Tries multiple methods: utxo.to_str(), utxo.prevout.to_str(), + or str() as a fallback. + + Args: + utxo: A UTXO object. + + Returns: + str: String representation of the UTXO. + """ try: return utxo.to_str() except Exception: @@ -322,6 +697,15 @@ class Util: return str(utxo) def cmp_utxo(utxoa, utxob): + """Compare two UTXOs for equality based on their string representation. + + Args: + utxoa: First UTXO object. + utxob: Second UTXO object. + + Returns: + bool: True if both UTXOs have the same string representation. + """ utxoa = Util.utxo_to_str(utxoa) utxob = Util.utxo_to_str(utxob) if utxoa == utxob: @@ -330,21 +714,57 @@ class Util: return False def in_utxo(utxo, utxos): + """Check if a UTXO exists in a list of UTXOs. + + Args: + utxo: The UTXO to search for. + utxos (list): List of UTXOs to search in. + + Returns: + bool: True if the UTXO is found in the list. + """ for s_u in utxos: if Util.cmp_utxo(s_u, utxo): return True return False def txid_in_utxo(txid, utxos): + """Check if a transaction ID exists in any UTXO's prevout. + + Args: + txid (str): The transaction ID to search for. + utxos (list): List of UTXOs to search in. + + Returns: + bool: True if any UTXO's prevout matches the given txid. + """ for s_u in utxos: if s_u.prevout.txid == txid: return True return False def cmp_output(outputa, outputb): + """Compare two transaction outputs for equality by address and value. + + Args: + outputa: First transaction output. + outputb: Second transaction output. + + Returns: + bool: True if both outputs have the same address and value. + """ return outputa.address == outputb.address and outputa.value == outputb.value def in_output(output, outputs): + """Check if an output exists in a list of outputs. + + Args: + output: The transaction output to search for. + outputs (list): List of transaction outputs to search in. + + Returns: + bool: True if a matching output is found in the list. + """ for s_o in outputs: if Util.cmp_output(s_o, output): return True @@ -356,6 +776,22 @@ class Util: # return false false different amount, different address not found def din_output(out, outputs): + """Differentiate an output against a list of outputs by amount and address. + + Checks whether the output's amount appears in the list, and whether + any output with the same amount also shares the same address. Used to + detect change outputs. + + Args: + out: The transaction output to check. + outputs (list): List of transaction outputs to compare against. + + Returns: + tuple: (same_amount, same_address) where: + - (True, True): same address and same amount found. + - (True, False): same amount but different address (potential change). + - (False, False): no matching amount found. + """ same_amount = [] for s_o in outputs: if int(out.value) == int(s_o.value): @@ -371,6 +807,22 @@ class Util: return False, False def get_change_output(wallet, in_amount, out_amount, fee): + """Create a change output for a transaction if the change exceeds the dust threshold. + + Calculates the change amount as inputs minus outputs minus fees, and + creates a PartialTxOutput directed to a new change address if the amount + is above the wallet's dust threshold. + + Args: + wallet: The Electrum wallet object. + in_amount (int): Total input amount in satoshis. + out_amount (int): Total output amount in satoshis. + fee (int): Transaction fee in satoshis. + + Returns: + PartialTxOutput or None: A change output if change exceeds dust threshold, + otherwise None. + """ change_amount = int(in_amount - out_amount - fee) if change_amount > wallet.dust_threshold(): change_addresses = wallet.get_change_addresses_for_new_transaction() @@ -381,6 +833,18 @@ class Util: return out def get_current_height(network): + """Get the current best block height from the Electrum network. + + Uses the minimum of the SPV-verified chain height and the server-reported + height to discourage fee sniping. Returns 0 if the network is unavailable, + stale, or suspiciously divergent. + + Args: + network: The Electrum network object. + + Returns: + int: The current block height, or 0 if unavailable/unreliable. + """ # if no network or not up to date, just set locktime to zero if not network: return 0 @@ -402,6 +866,16 @@ class Util: return height def print_var(var, name="", veryverbose=False): + """Print detailed debug information about a variable. + + Attempts to print various representations: str, repr, dict, dir, type, + to_json, and __slotnames__. + + Args: + var: The variable to inspect. + name (str): A label for the output header. Defaults to "". + veryverbose (bool): Unused parameter. Defaults to False. + """ print(f"---{name}---") if var is not None: try: @@ -436,6 +910,15 @@ class Util: print(f"---end {name}---") def print_utxo(utxo, name=""): + """Print detailed debug information about a UTXO. + + Prints the UTXO itself, its prevout, script_sig, witness, and private + attributes (address, scriptpubkey, value_sats). + + Args: + utxo: The UTXO object to inspect. + name (str): A label for the output header. Defaults to "". + """ print(f"---utxo-{name}---") Util.print_var(utxo, name) Util.print_prevout(utxo.prevout, name) @@ -447,12 +930,29 @@ class Util: print(f"---utxo-end {name}---") def print_prevout(prevout, name=""): + """Print detailed debug information about a prevout object. + + Args: + prevout: The prevout object to inspect. + name (str): A label for the output header. Defaults to "". + """ print(f"---prevout-{name}---") Util.print_var(prevout, f"{name}-prevout") Util.print_var(prevout._asdict()) print(f"---prevout-end {name}---") def export_meta_gui(electrum_window, title, exporter): + """Export data to a file via the Electrum GUI save dialog. + + Opens a file save dialog, calls the exporter function with the chosen + filename, and shows a success or error message. + + Args: + electrum_window: The Electrum main window object. + title (str): Description of the data being exported. + exporter (callable): A function that takes a filename and writes the + data to it. Raises FileExportFailed on error. + """ filter_ = "All files (*)" filename = getSaveFileName( parent=electrum_window, @@ -473,10 +973,27 @@ class Util: ) def copy(dicto, dictfrom): + """Copy all key-value pairs from one dictionary to another. + + Args: + dicto (dict): The destination dictionary. + dictfrom (dict): The source dictionary. + """ for k, v in dictfrom.items(): dicto[k] = v def fix_will_settings_tx_fees(will_settings): + """Migrate will settings from old 'tx_fees' key to 'baltx_fees'. + + Renames the 'tx_fees' key to 'baltx_fees' in the will settings dictionary + for backward compatibility. + + Args: + will_settings (dict): The will settings dictionary to update. + + Returns: + bool: True if the key was renamed, False if no migration was needed. + """ tx_fees = will_settings.get("tx_fees", False) have_to_update = False if tx_fees: @@ -486,6 +1003,17 @@ class Util: return have_to_update def fix_will_tx_fees(will): + """Migrate will transactions from old 'tx_fees' key to 'baltx_fees'. + + Iterates through all will items and renames the 'tx_fees' key to + 'baltx_fees' in each transaction for backward compatibility. + + Args: + will (dict): Dictionary of will items keyed by transaction ID. + + Returns: + bool: True if any key was renamed, False if no migration was needed. + """ have_to_update = False for txid, willitem in will.items(): tx_fees = willitem.get("tx_fees", False) @@ -496,13 +1024,27 @@ class Util: return have_to_update def text_to_hex(text: str) -> str: - """Convert text to hexadecimal string""" + """Convert text to a hexadecimal string. + + Args: + text (str): The text to convert. + + Returns: + str: The hexadecimal representation of the text. + """ hex_string = text.encode('utf-8').hex() return hex_string def hex_to_text(hex_string: str) -> str: - """Convert hexadecimal string back to text (for verification)""" + """Convert a hexadecimal string back to text. + + Args: + hex_string (str): The hexadecimal string to convert. + + Returns: + str: The decoded text, or an error message if the hex string is invalid. + """ try: return bytes.fromhex(hex_string).decode('utf-8') except Exception: