""" bal.core.util ============= Small, stateless helper functions shared across the whole plugin. This module is intentionally GUI-free: it only deals with locktimes, amount encoding/decoding, and comparing transactions / inputs / outputs / heirs. Only UNIX timestamps are used for locktimes; block-height locktimes have been removed. A locktime is always an absolute UNIX timestamp (seconds since epoch) or a relative string like "30d" (30 days) or "1y" (1 year). """ from datetime import datetime, timedelta class Util: """Namespace of static helpers (kept as a class to preserve the original ``Util.method(...)`` call sites used throughout the plugin).""" # ------------------------------------------------------------------ # # Locktime helpers # ------------------------------------------------------------------ # @staticmethod def locktime_to_str(locktime): """Render a locktime for display as an ISO date string.""" try: locktime = int(locktime) dt = datetime.fromtimestamp(locktime).isoformat() return dt except Exception: pass return str(locktime) @staticmethod def str_to_locktime(locktime): """Parse a user-entered locktime string into its stored form. Relative values keep their suffix (``"30d"``, ``"1y"``); absolute ISO dates are converted to an integer UNIX timestamp. """ try: if locktime[-1] in ("y", "d"): return locktime else: return int(locktime) except Exception: pass dt_object = datetime.fromisoformat(locktime) timestamp = dt_object.timestamp() return int(timestamp) @staticmethod def parse_locktime_string(locktime, now=None): """Resolve a (possibly relative) locktime string into a concrete int. Supported forms: * plain int / timestamp -> returned unchanged * ``"y"`` -> n years from now (as a timestamp) * ``"d"`` -> n days from now (as a timestamp) When *now* is provided (a ``datetime``), relative strings are resolved relative to that instant instead of ``datetime.now()``. This is used when checking a signed will so that ``"30d"`` always resolves to the *original* creation-time + 30 days, preventing spurious postpone detections on every check. """ try: return int(locktime) except Exception: pass try: if now is None: now = datetime.now() if locktime[-1] == "y": locktime = str(int(locktime[:-1]) * 365) + "d" if locktime[-1] == "d": return int( (now + timedelta(days=int(locktime[:-1]))) .replace(hour=0, minute=0, second=0, microsecond=0) .timestamp() ) return int(locktime) except Exception: pass return 0 @staticmethod def int_locktime(seconds=0, minutes=0, hours=0, days=0): """Convert a human duration into seconds.""" return int( seconds + minutes * 60 + hours * 60 * 60 + days * 60 * 60 * 24 ) # ------------------------------------------------------------------ # # Amount helpers # ------------------------------------------------------------------ # @staticmethod def encode_amount(amount, decimal_point): """Convert a displayed BTC amount into integer satoshis. Percentage amounts (e.g. ``"50%"``) are passed through unchanged, since they are resolved later against the wallet balance. """ if Util.is_perc(amount): return amount else: try: return int(float(amount) * pow(10, decimal_point)) except Exception: return 0 @staticmethod def decode_amount(amount, decimal_point): """Inverse of :meth:`encode_amount`: satoshis -> displayed string.""" if Util.is_perc(amount): return amount else: basestr = "{{:0.{}f}}".format(decimal_point) try: return basestr.format(float(amount) / pow(10, decimal_point)) except Exception: return str(amount) @staticmethod def is_perc(value): """True if ``value`` is a percentage string such as ``"25%"``.""" try: return value[-1] == "%" except Exception: return False # ------------------------------------------------------------------ # # Heir / will-executor comparison helpers # ------------------------------------------------------------------ # @staticmethod def cmp_array(heira, heirb): """Element-wise equality of two sequences (length-safe).""" try: if len(heira) != len(heirb): return False for h in range(0, len(heira)): if heira[h] != heirb[h]: return False return True except Exception: return False @staticmethod def cmp_heir(heira, heirb): """Two heirs are "the same" when address (0) and amount (1) match.""" if heira[0] == heirb[0] and heira[1] == heirb[1]: return True return False @staticmethod def cmp_willexecutor(willexecutora, willexecutorb): """Compare two will-executor dicts by url / address / base_fee.""" if willexecutora == willexecutorb: return True try: if ( willexecutora["url"] == willexecutorb["url"] and willexecutora["address"] == willexecutorb["address"] and willexecutora["base_fee"] == willexecutorb["base_fee"] ): return True except Exception: return False return False @staticmethod def search_heir_by_values(heirs, heir, values): """Return the key of the first heir in ``heirs`` matching ``heir`` on every column listed in ``values`` (or ``False`` if none).""" for h, v in heirs.items(): found = False for val in values: if val in v and v[val] != heir[val]: found = True if not found: return h return False @staticmethod def cmp_heir_by_values(heira, heirb, values): """True when two heirs agree on every column index in ``values``.""" for v in values: if heira[v] != heirb[v]: return False return True @staticmethod def cmp_heirs_by_values( heirsa, heirsb, values, exclude_willexecutors=False, reverse=True ): """Set-equality of two heir collections, comparing only ``values``. When ``exclude_willexecutors`` is set, synthetic will-executor heirs (those whose key contains the ``w!ll3x3c"`` marker) are skipped. The ``reverse`` flag makes the comparison symmetric by running it both ways. """ for heira in heirsa: if ( exclude_willexecutors and 'w!ll3x3c"' not in heira ) or not exclude_willexecutors: found = False for heirb in heirsb: if Util.cmp_heir_by_values(heirsa[heira], heirsb[heirb], values): found = True if not found: return False if reverse: return Util.cmp_heirs_by_values( heirsb, heirsa, values, exclude_willexecutors=exclude_willexecutors, reverse=False, ) else: return True @staticmethod def cmp_heirs( heirsa, heirsb, cmp_function=lambda x, y: x[0] == y[0] and x[3] == y[3], reverse=True, ): """Compare two heir collections using a custom ``cmp_function``. Will-executor entries are ignored. As with :meth:`cmp_heirs_by_values`, ``reverse`` makes the relation symmetric. """ try: for heir in heirsa: if 'w!ll3x3c"' not in heir: if heir not in heirsb or not cmp_function( heirsa[heir], heirsb[heir] ): if not Util.search_heir_by_values(heirsb, heirsa[heir], [0, 3]): return False if reverse: return Util.cmp_heirs(heirsb, heirsa, cmp_function, False) else: return True except Exception as e: raise e # ------------------------------------------------------------------ # # Transaction input/output comparison helpers # ------------------------------------------------------------------ # @staticmethod def cmp_inputs(inputsa, inputsb): """True when both input lists reference the same set of UTXOs.""" if len(inputsa) != len(inputsb): return False for inputa in inputsa: if not Util.in_utxo(inputa, inputsb): return False return True @staticmethod def cmp_outputs(outputsa, outputsb, willexecutor_output=None): """True when both output lists contain the same (address, value) pairs. The optional ``willexecutor_output`` is treated as a wildcard match so that the will-executor's fee output does not break the comparison. """ if len(outputsa) != len(outputsb): return False for outputa in outputsa: if not Util.cmp_output(outputa, willexecutor_output): if not Util.in_output(outputa, outputsb): return False return True @staticmethod def cmp_txs(txa, txb): """Two transactions are equivalent when their inputs and outputs match.""" if not Util.cmp_inputs(txa.inputs(), txb.inputs()): return False if not Util.cmp_outputs(txa.outputs(), txb.outputs()): return False return True @staticmethod def get_value_amount(txa, txb): """Sum of the values of outputs that appear (same addr+value) in both transactions. Returns ``False`` as soon as an output of ``txa`` shares neither amount nor address with any output of ``txb``.""" outputsa = txa.outputs() value_amount = 0 for outa in outputsa: same_amount, same_address = Util.din_output(outa, txb.outputs()) if not (same_amount or same_address): return False if same_amount and same_address: value_amount += outa.value if same_amount: pass if same_address: pass return value_amount # ------------------------------------------------------------------ # # Locktime arithmetic # ------------------------------------------------------------------ # @staticmethod def chk_locktime(timestamp_to_check, locktime): """Return True if ``locktime`` is still in the future.""" locktime = int(locktime) return locktime > timestamp_to_check @staticmethod def anticipate_locktime(locktime, hours=0, days=0): """Move a locktime earlier by the given amount (only timestamp locktimes). Never returns a value below 1. """ locktime = int(locktime) try: dt = datetime.fromtimestamp(locktime) except (OverflowError, OSError, ValueError): dt = datetime.fromtimestamp(min(locktime, 2 ** 31 - 1)) dt -= timedelta(seconds=hours * 3600 + days * 86400) out = dt.timestamp() if out < 1: out = 1 return out @staticmethod def cmp_locktime(locktimea, locktimeb): """Compare two relative locktime strings sharing the same unit.""" if locktimea == locktimeb: return 0 strlocktimea = str(locktimea) strlocktimeb = str(locktimeb) if locktimea[-1] in "yd": if locktimeb[-1] == locktimea[-1]: return int(strlocktimea[-1]) - int(strlocktimeb[-1]) else: return int(locktimea) - (locktimeb) @staticmethod def is_locktime_increased(old, new): """True when *new* locktime spec is longer/greater than *old*.""" def _to_days(v): if isinstance(v, str) and v[-1:] in ("d", "y"): n = int(v[:-1]) return n * 365 if v[-1] == "y" else n return int(v) return _to_days(new) > _to_days(old) @staticmethod def get_locktimes(will): """Return the distinct locktimes used by the transactions in ``will``.""" locktimes = {} for txid, willitem in will.items(): locktimes[willitem["tx"].locktime] = True return locktimes.keys() @staticmethod def get_lowest_locktimes(locktimes): """Return sorted list of timestamp locktimes.""" sorted_timestamps = [] for locktime in locktimes: locktime = Util.parse_locktime_string(locktime) if locktime is not None: sorted_timestamps.append(locktime) return sorted(sorted_timestamps) @staticmethod def search_willtx_per_io(will, tx): """Find a will entry whose tx has the same inputs/outputs as ``tx``.""" for wid, w in will.items(): if Util.cmp_txs(w["tx"], tx["tx"]): return wid, w return None, None @staticmethod def invalidate_will(will): raise Exception("not implemented") @staticmethod def get_will_spent_utxos(will): """Collect every input spent by any transaction in ``will``.""" utxos = [] for txid, willitem in will.items(): utxos += willitem["tx"].inputs() return utxos # ------------------------------------------------------------------ # # UTXO helpers # ------------------------------------------------------------------ # @staticmethod def utxo_to_str(utxo): """Best-effort conversion of a UTXO / input object to its ``txid:n`` str.""" try: return utxo.to_str() except Exception: pass try: return utxo.prevout.to_str() except Exception: pass return str(utxo) @staticmethod def cmp_utxo(utxoa, utxob): """True when two UTXOs refer to the same outpoint.""" utxoa = Util.utxo_to_str(utxoa) utxob = Util.utxo_to_str(utxob) if utxoa == utxob: return True else: return False @staticmethod def in_utxo(utxo, utxos): """Membership test for a UTXO inside an iterable of UTXOs.""" for s_u in utxos: if Util.cmp_utxo(s_u, utxo): return True return False @staticmethod def txid_in_utxo(txid, utxos): """True if any UTXO in ``utxos`` is spent from transaction ``txid``.""" for s_u in utxos: if s_u.prevout.txid == txid: return True return False @staticmethod def cmp_output(outputa, outputb): """Two outputs are equal when both address and value match.""" return outputa.address == outputb.address and outputa.value == outputb.value @staticmethod def in_output(output, outputs): """Membership test for an output inside an iterable of outputs.""" for s_o in outputs: if Util.cmp_output(s_o, output): return True return False # check all output with the same amount if none have the same address it can be a change # return true true same address same amount # return true false same amount different address # return false false different amount, different address not found @staticmethod def din_output(out, outputs): """Detailed output lookup used to tell a change output apart. Returns a ``(same_amount, same_address)`` tuple: * ``(True, True)`` -> an output with same amount *and* address * ``(True, False)`` -> same amount but different address (maybe change) * ``(False, False)``-> no output with this amount """ same_amount = [] for s_o in outputs: if int(out.value) == int(s_o.value): same_amount.append(s_o) if out.address == s_o.address: return True, True else: pass if len(same_amount) > 0: return True, False else: return False, False @staticmethod def get_change_output(wallet, in_amount, out_amount, fee): """Build a change ``PartialTxOutput`` if the leftover exceeds dust.""" change_amount = int(in_amount - out_amount - fee) if change_amount > wallet.dust_threshold(): change_addresses = wallet.get_change_addresses_for_new_transaction() out = PartialTxOutput.from_address_and_value( change_addresses[0], change_amount ) out.is_change = True return out @staticmethod def get_current_height(network): """Return the current UNIX timestamp for locktime purposes. Returns time.time() as the reference timestamp. """ return int(datetime.now().timestamp()) # ------------------------------------------------------------------ # # Misc helpers # ------------------------------------------------------------------ # @staticmethod def copy(dicto, dictfrom): """Shallow copy of ``dictfrom`` entries into ``dicto`` (in place).""" for k, v in dictfrom.items(): dicto[k] = v @staticmethod def fix_will_settings_tx_fees(will_settings): """Migrate the legacy ``tx_fees`` key to ``baltx_fees`` in settings. Returns True when a migration was performed (caller should persist). """ tx_fees = will_settings.get("tx_fees", False) have_to_update = False if tx_fees: will_settings["baltx_fees"] = tx_fees del will_settings["tx_fees"] have_to_update = True return have_to_update @staticmethod def fix_will_tx_fees(will): """Same legacy migration as above but applied to every will entry.""" have_to_update = False for txid, willitem in will.items(): tx_fees = willitem.get("tx_fees", False) if tx_fees: will[txid]["baltx_fees"] = tx_fees del will[txid]["tx_fees"] have_to_update = True return have_to_update @staticmethod def text_to_hex(text: str) -> str: """Convert text to a hexadecimal string (used for OP_RETURN payloads).""" hex_string = text.encode('utf-8').hex() return hex_string @staticmethod def hex_to_text(hex_string: str) -> str: """Convert a hexadecimal string back to text (for verification).""" try: return bytes.fromhex(hex_string).decode('utf-8') except Exception: return "Error: Invalid hex string"