forked from bitcoinafterlife/bal-electrum-plugin
552 lines
19 KiB
Python
552 lines
19 KiB
Python
"""
|
|
bal.core.util
|
|
=============
|
|
|
|
Small, stateless helper functions shared across the whole plugin.
|
|
|
|
This module is intentionally GUI-free: it only deals with locktimes, amount
|
|
encoding/decoding, and comparing transactions / inputs / outputs / heirs.
|
|
|
|
Only UNIX timestamps are used for locktimes; block-height locktimes have been
|
|
removed. A locktime is always an absolute UNIX timestamp (seconds since epoch)
|
|
or a relative string like "30d" (30 days) or "1y" (1 year).
|
|
"""
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
class Util:
|
|
"""Namespace of static helpers (kept as a class to preserve the original
|
|
``Util.method(...)`` call sites used throughout the plugin)."""
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Locktime helpers
|
|
# ------------------------------------------------------------------ #
|
|
@staticmethod
|
|
def locktime_to_str(locktime):
|
|
"""Render a locktime for display as an ISO date string."""
|
|
try:
|
|
locktime = int(locktime)
|
|
dt = datetime.fromtimestamp(locktime).isoformat()
|
|
return dt
|
|
except Exception:
|
|
pass
|
|
return str(locktime)
|
|
|
|
@staticmethod
|
|
def str_to_locktime(locktime):
|
|
"""Parse a user-entered locktime string into its stored form.
|
|
|
|
Relative values keep their suffix (``"30d"``, ``"1y"``);
|
|
absolute ISO dates are converted to an integer UNIX timestamp.
|
|
"""
|
|
try:
|
|
if locktime[-1] in ("y", "d"):
|
|
return locktime
|
|
else:
|
|
return int(locktime)
|
|
except Exception:
|
|
pass
|
|
dt_object = datetime.fromisoformat(locktime)
|
|
timestamp = dt_object.timestamp()
|
|
return int(timestamp)
|
|
|
|
@staticmethod
|
|
def parse_locktime_string(locktime, now=None):
|
|
"""Resolve a (possibly relative) locktime string into a concrete int.
|
|
|
|
Supported forms:
|
|
* plain int / timestamp -> returned unchanged
|
|
* ``"<n>y"`` -> n years from now (as a timestamp)
|
|
* ``"<n>d"`` -> n days from now (as a timestamp)
|
|
|
|
When *now* is provided (a ``datetime``), relative strings are resolved
|
|
relative to that instant instead of ``datetime.now()``. This is used
|
|
when checking a signed will so that ``"30d"`` always resolves to the
|
|
*original* creation-time + 30 days, preventing spurious postpone
|
|
detections on every check.
|
|
"""
|
|
try:
|
|
return int(locktime)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
if now is None:
|
|
now = datetime.now()
|
|
if locktime[-1] == "y":
|
|
locktime = str(int(locktime[:-1]) * 365) + "d"
|
|
if locktime[-1] == "d":
|
|
return int(
|
|
(now + timedelta(days=int(locktime[:-1])))
|
|
.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
.timestamp()
|
|
)
|
|
return int(locktime)
|
|
except Exception:
|
|
pass
|
|
return 0
|
|
|
|
@staticmethod
|
|
def int_locktime(seconds=0, minutes=0, hours=0, days=0):
|
|
"""Convert a human duration into seconds."""
|
|
return int(
|
|
seconds
|
|
+ minutes * 60
|
|
+ hours * 60 * 60
|
|
+ days * 60 * 60 * 24
|
|
)
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Amount helpers
|
|
# ------------------------------------------------------------------ #
|
|
@staticmethod
|
|
def encode_amount(amount, decimal_point):
|
|
"""Convert a displayed BTC amount into integer satoshis.
|
|
|
|
Percentage amounts (e.g. ``"50%"``) are passed through unchanged, since
|
|
they are resolved later against the wallet balance.
|
|
"""
|
|
if Util.is_perc(amount):
|
|
return amount
|
|
else:
|
|
try:
|
|
return int(float(amount) * pow(10, decimal_point))
|
|
except Exception:
|
|
return 0
|
|
|
|
@staticmethod
|
|
def decode_amount(amount, decimal_point):
|
|
"""Inverse of :meth:`encode_amount`: satoshis -> displayed string."""
|
|
if Util.is_perc(amount):
|
|
return amount
|
|
else:
|
|
basestr = "{{:0.{}f}}".format(decimal_point)
|
|
try:
|
|
return basestr.format(float(amount) / pow(10, decimal_point))
|
|
except Exception:
|
|
return str(amount)
|
|
|
|
@staticmethod
|
|
def is_perc(value):
|
|
"""True if ``value`` is a percentage string such as ``"25%"``."""
|
|
try:
|
|
return value[-1] == "%"
|
|
except Exception:
|
|
return False
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Heir / will-executor comparison helpers
|
|
# ------------------------------------------------------------------ #
|
|
@staticmethod
|
|
def cmp_array(heira, heirb):
|
|
"""Element-wise equality of two sequences (length-safe)."""
|
|
try:
|
|
if len(heira) != len(heirb):
|
|
return False
|
|
for h in range(0, len(heira)):
|
|
if heira[h] != heirb[h]:
|
|
return False
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
@staticmethod
|
|
def cmp_heir(heira, heirb):
|
|
"""Two heirs are "the same" when address (0) and amount (1) match."""
|
|
if heira[0] == heirb[0] and heira[1] == heirb[1]:
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def cmp_willexecutor(willexecutora, willexecutorb):
|
|
"""Compare two will-executor dicts by url / address / base_fee."""
|
|
if willexecutora == willexecutorb:
|
|
return True
|
|
try:
|
|
if (
|
|
willexecutora["url"] == willexecutorb["url"]
|
|
and willexecutora["address"] == willexecutorb["address"]
|
|
and willexecutora["base_fee"] == willexecutorb["base_fee"]
|
|
):
|
|
return True
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
@staticmethod
|
|
def search_heir_by_values(heirs, heir, values):
|
|
"""Return the key of the first heir in ``heirs`` matching ``heir`` on
|
|
every column listed in ``values`` (or ``False`` if none)."""
|
|
for h, v in heirs.items():
|
|
found = False
|
|
for val in values:
|
|
if val in v and v[val] != heir[val]:
|
|
found = True
|
|
|
|
if not found:
|
|
return h
|
|
return False
|
|
|
|
@staticmethod
|
|
def cmp_heir_by_values(heira, heirb, values):
|
|
"""True when two heirs agree on every column index in ``values``."""
|
|
for v in values:
|
|
if heira[v] != heirb[v]:
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def cmp_heirs_by_values(
|
|
heirsa, heirsb, values, exclude_willexecutors=False, reverse=True
|
|
):
|
|
"""Set-equality of two heir collections, comparing only ``values``.
|
|
|
|
When ``exclude_willexecutors`` is set, synthetic will-executor heirs
|
|
(those whose key contains the ``w!ll3x3c"`` marker) are skipped. The
|
|
``reverse`` flag makes the comparison symmetric by running it both ways.
|
|
"""
|
|
for heira in heirsa:
|
|
if (
|
|
exclude_willexecutors and 'w!ll3x3c"' not in heira
|
|
) or not exclude_willexecutors:
|
|
found = False
|
|
for heirb in heirsb:
|
|
if Util.cmp_heir_by_values(heirsa[heira], heirsb[heirb], values):
|
|
found = True
|
|
if not found:
|
|
return False
|
|
if reverse:
|
|
return Util.cmp_heirs_by_values(
|
|
heirsb,
|
|
heirsa,
|
|
values,
|
|
exclude_willexecutors=exclude_willexecutors,
|
|
reverse=False,
|
|
)
|
|
else:
|
|
return True
|
|
|
|
@staticmethod
|
|
def cmp_heirs(
|
|
heirsa,
|
|
heirsb,
|
|
cmp_function=lambda x, y: x[0] == y[0] and x[3] == y[3],
|
|
reverse=True,
|
|
):
|
|
"""Compare two heir collections using a custom ``cmp_function``.
|
|
|
|
Will-executor entries are ignored. As with
|
|
:meth:`cmp_heirs_by_values`, ``reverse`` makes the relation symmetric.
|
|
"""
|
|
try:
|
|
for heir in heirsa:
|
|
if 'w!ll3x3c"' not in heir:
|
|
if heir not in heirsb or not cmp_function(
|
|
heirsa[heir], heirsb[heir]
|
|
):
|
|
if not Util.search_heir_by_values(heirsb, heirsa[heir], [0, 3]):
|
|
return False
|
|
if reverse:
|
|
return Util.cmp_heirs(heirsb, heirsa, cmp_function, False)
|
|
else:
|
|
return True
|
|
except Exception as e:
|
|
raise e
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Transaction input/output comparison helpers
|
|
# ------------------------------------------------------------------ #
|
|
@staticmethod
|
|
def cmp_inputs(inputsa, inputsb):
|
|
"""True when both input lists reference the same set of UTXOs."""
|
|
if len(inputsa) != len(inputsb):
|
|
return False
|
|
for inputa in inputsa:
|
|
if not Util.in_utxo(inputa, inputsb):
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def cmp_outputs(outputsa, outputsb, willexecutor_output=None):
|
|
"""True when both output lists contain the same (address, value) pairs.
|
|
|
|
The optional ``willexecutor_output`` is treated as a wildcard match so
|
|
that the will-executor's fee output does not break the comparison.
|
|
"""
|
|
if len(outputsa) != len(outputsb):
|
|
return False
|
|
for outputa in outputsa:
|
|
if not Util.cmp_output(outputa, willexecutor_output):
|
|
if not Util.in_output(outputa, outputsb):
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def cmp_txs(txa, txb):
|
|
"""Two transactions are equivalent when their inputs and outputs match."""
|
|
if not Util.cmp_inputs(txa.inputs(), txb.inputs()):
|
|
return False
|
|
if not Util.cmp_outputs(txa.outputs(), txb.outputs()):
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def get_value_amount(txa, txb):
|
|
"""Sum of the values of outputs that appear (same addr+value) in both
|
|
transactions. Returns ``False`` as soon as an output of ``txa`` shares
|
|
neither amount nor address with any output of ``txb``."""
|
|
outputsa = txa.outputs()
|
|
value_amount = 0
|
|
|
|
for outa in outputsa:
|
|
same_amount, same_address = Util.din_output(outa, txb.outputs())
|
|
if not (same_amount or same_address):
|
|
return False
|
|
if same_amount and same_address:
|
|
value_amount += outa.value
|
|
if same_amount:
|
|
pass
|
|
if same_address:
|
|
pass
|
|
|
|
return value_amount
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Locktime arithmetic
|
|
# ------------------------------------------------------------------ #
|
|
@staticmethod
|
|
def chk_locktime(timestamp_to_check, locktime):
|
|
"""Return True if ``locktime`` is still in the future."""
|
|
locktime = int(locktime)
|
|
return locktime > timestamp_to_check
|
|
|
|
@staticmethod
|
|
def anticipate_locktime(locktime, hours=0, days=0):
|
|
"""Move a locktime earlier by the given amount (only timestamp locktimes).
|
|
|
|
Never returns a value below 1.
|
|
"""
|
|
locktime = int(locktime)
|
|
try:
|
|
dt = datetime.fromtimestamp(locktime)
|
|
except (OverflowError, OSError, ValueError):
|
|
dt = datetime.fromtimestamp(min(locktime, 2 ** 31 - 1))
|
|
dt -= timedelta(seconds=hours * 3600 + days * 86400)
|
|
out = dt.timestamp()
|
|
if out < 1:
|
|
out = 1
|
|
return out
|
|
|
|
@staticmethod
|
|
def cmp_locktime(locktimea, locktimeb):
|
|
"""Compare two relative locktime strings sharing the same unit."""
|
|
if locktimea == locktimeb:
|
|
return 0
|
|
strlocktimea = str(locktimea)
|
|
strlocktimeb = str(locktimeb)
|
|
if locktimea[-1] in "yd":
|
|
if locktimeb[-1] == locktimea[-1]:
|
|
return int(strlocktimea[-1]) - int(strlocktimeb[-1])
|
|
else:
|
|
return int(locktimea) - (locktimeb)
|
|
|
|
@staticmethod
|
|
def is_locktime_increased(old, new):
|
|
"""True when *new* locktime spec is longer/greater than *old*."""
|
|
def _to_days(v):
|
|
if isinstance(v, str) and v[-1:] in ("d", "y"):
|
|
n = int(v[:-1])
|
|
return n * 365 if v[-1] == "y" else n
|
|
return int(v)
|
|
return _to_days(new) > _to_days(old)
|
|
|
|
@staticmethod
|
|
def get_locktimes(will):
|
|
"""Return the distinct locktimes used by the transactions in ``will``."""
|
|
locktimes = {}
|
|
for txid, willitem in will.items():
|
|
locktimes[willitem["tx"].locktime] = True
|
|
return locktimes.keys()
|
|
|
|
@staticmethod
|
|
def get_lowest_locktimes(locktimes):
|
|
"""Return sorted list of timestamp locktimes."""
|
|
sorted_timestamps = []
|
|
for locktime in locktimes:
|
|
locktime = Util.parse_locktime_string(locktime)
|
|
if locktime is not None:
|
|
sorted_timestamps.append(locktime)
|
|
return sorted(sorted_timestamps)
|
|
|
|
@staticmethod
|
|
def search_willtx_per_io(will, tx):
|
|
"""Find a will entry whose tx has the same inputs/outputs as ``tx``."""
|
|
for wid, w in will.items():
|
|
if Util.cmp_txs(w["tx"], tx["tx"]):
|
|
return wid, w
|
|
return None, None
|
|
|
|
@staticmethod
|
|
def invalidate_will(will):
|
|
raise Exception("not implemented")
|
|
|
|
@staticmethod
|
|
def get_will_spent_utxos(will):
|
|
"""Collect every input spent by any transaction in ``will``."""
|
|
utxos = []
|
|
for txid, willitem in will.items():
|
|
utxos += willitem["tx"].inputs()
|
|
|
|
return utxos
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# UTXO helpers
|
|
# ------------------------------------------------------------------ #
|
|
@staticmethod
|
|
def utxo_to_str(utxo):
|
|
"""Best-effort conversion of a UTXO / input object to its ``txid:n`` str."""
|
|
try:
|
|
return utxo.to_str()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
return utxo.prevout.to_str()
|
|
except Exception:
|
|
pass
|
|
return str(utxo)
|
|
|
|
@staticmethod
|
|
def cmp_utxo(utxoa, utxob):
|
|
"""True when two UTXOs refer to the same outpoint."""
|
|
utxoa = Util.utxo_to_str(utxoa)
|
|
utxob = Util.utxo_to_str(utxob)
|
|
if utxoa == utxob:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def in_utxo(utxo, utxos):
|
|
"""Membership test for a UTXO inside an iterable of UTXOs."""
|
|
for s_u in utxos:
|
|
if Util.cmp_utxo(s_u, utxo):
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def txid_in_utxo(txid, utxos):
|
|
"""True if any UTXO in ``utxos`` is spent from transaction ``txid``."""
|
|
for s_u in utxos:
|
|
if s_u.prevout.txid == txid:
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def cmp_output(outputa, outputb):
|
|
"""Two outputs are equal when both address and value match."""
|
|
return outputa.address == outputb.address and outputa.value == outputb.value
|
|
|
|
@staticmethod
|
|
def in_output(output, outputs):
|
|
"""Membership test for an output inside an iterable of outputs."""
|
|
for s_o in outputs:
|
|
if Util.cmp_output(s_o, output):
|
|
return True
|
|
return False
|
|
|
|
# check all output with the same amount if none have the same address it can be a change
|
|
# return true true same address same amount
|
|
# return true false same amount different address
|
|
# return false false different amount, different address not found
|
|
@staticmethod
|
|
def din_output(out, outputs):
|
|
"""Detailed output lookup used to tell a change output apart.
|
|
|
|
Returns a ``(same_amount, same_address)`` tuple:
|
|
* ``(True, True)`` -> an output with same amount *and* address
|
|
* ``(True, False)`` -> same amount but different address (maybe change)
|
|
* ``(False, False)``-> no output with this amount
|
|
"""
|
|
same_amount = []
|
|
for s_o in outputs:
|
|
if int(out.value) == int(s_o.value):
|
|
same_amount.append(s_o)
|
|
if out.address == s_o.address:
|
|
return True, True
|
|
else:
|
|
pass
|
|
|
|
if len(same_amount) > 0:
|
|
return True, False
|
|
else:
|
|
return False, False
|
|
|
|
@staticmethod
|
|
def get_change_output(wallet, in_amount, out_amount, fee):
|
|
"""Build a change ``PartialTxOutput`` if the leftover exceeds dust."""
|
|
change_amount = int(in_amount - out_amount - fee)
|
|
if change_amount > wallet.dust_threshold():
|
|
change_addresses = wallet.get_change_addresses_for_new_transaction()
|
|
out = PartialTxOutput.from_address_and_value(
|
|
change_addresses[0], change_amount
|
|
)
|
|
out.is_change = True
|
|
return out
|
|
|
|
@staticmethod
|
|
def get_current_height(network):
|
|
"""Return the current UNIX timestamp for locktime purposes.
|
|
|
|
Returns time.time() as the reference timestamp.
|
|
"""
|
|
return int(datetime.now().timestamp())
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Misc helpers
|
|
# ------------------------------------------------------------------ #
|
|
@staticmethod
|
|
def copy(dicto, dictfrom):
|
|
"""Shallow copy of ``dictfrom`` entries into ``dicto`` (in place)."""
|
|
for k, v in dictfrom.items():
|
|
dicto[k] = v
|
|
|
|
@staticmethod
|
|
def fix_will_settings_tx_fees(will_settings):
|
|
"""Migrate the legacy ``tx_fees`` key to ``baltx_fees`` in settings.
|
|
|
|
Returns True when a migration was performed (caller should persist).
|
|
"""
|
|
tx_fees = will_settings.get("tx_fees", False)
|
|
have_to_update = False
|
|
if tx_fees:
|
|
will_settings["baltx_fees"] = tx_fees
|
|
del will_settings["tx_fees"]
|
|
have_to_update = True
|
|
return have_to_update
|
|
|
|
@staticmethod
|
|
def fix_will_tx_fees(will):
|
|
"""Same legacy migration as above but applied to every will entry."""
|
|
have_to_update = False
|
|
for txid, willitem in will.items():
|
|
tx_fees = willitem.get("tx_fees", False)
|
|
if tx_fees:
|
|
will[txid]["baltx_fees"] = tx_fees
|
|
del will[txid]["tx_fees"]
|
|
have_to_update = True
|
|
return have_to_update
|
|
|
|
@staticmethod
|
|
def text_to_hex(text: str) -> str:
|
|
"""Convert text to a hexadecimal string (used for OP_RETURN payloads)."""
|
|
hex_string = text.encode('utf-8').hex()
|
|
return hex_string
|
|
|
|
@staticmethod
|
|
def hex_to_text(hex_string: str) -> str:
|
|
"""Convert a hexadecimal string back to text (for verification)."""
|
|
try:
|
|
return bytes.fromhex(hex_string).decode('utf-8')
|
|
except Exception:
|
|
return "Error: Invalid hex string"
|