forked from bitcoinafterlife/bal-electrum-plugin
remove bal with pycache
This commit is contained in:
551
bal/core/util.py
551
bal/core/util.py
@@ -1,551 +0,0 @@
|
||||
"""
|
||||
bal.core.util
|
||||
=============
|
||||
|
||||
Small, stateless helper functions shared across the whole plugin.
|
||||
|
||||
This module is intentionally GUI-free: it only deals with locktimes, amount
|
||||
encoding/decoding, and comparing transactions / inputs / outputs / heirs.
|
||||
|
||||
Only UNIX timestamps are used for locktimes; block-height locktimes have been
|
||||
removed. A locktime is always an absolute UNIX timestamp (seconds since epoch)
|
||||
or a relative string like "30d" (30 days) or "1y" (1 year).
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
class Util:
|
||||
"""Namespace of static helpers (kept as a class to preserve the original
|
||||
``Util.method(...)`` call sites used throughout the plugin)."""
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Locktime helpers
|
||||
# ------------------------------------------------------------------ #
|
||||
@staticmethod
|
||||
def locktime_to_str(locktime):
|
||||
"""Render a locktime for display as an ISO date string."""
|
||||
try:
|
||||
locktime = int(locktime)
|
||||
dt = datetime.fromtimestamp(locktime).isoformat()
|
||||
return dt
|
||||
except Exception:
|
||||
pass
|
||||
return str(locktime)
|
||||
|
||||
@staticmethod
|
||||
def str_to_locktime(locktime):
|
||||
"""Parse a user-entered locktime string into its stored form.
|
||||
|
||||
Relative values keep their suffix (``"30d"``, ``"1y"``);
|
||||
absolute ISO dates are converted to an integer UNIX timestamp.
|
||||
"""
|
||||
try:
|
||||
if locktime[-1] in ("y", "d"):
|
||||
return locktime
|
||||
else:
|
||||
return int(locktime)
|
||||
except Exception:
|
||||
pass
|
||||
dt_object = datetime.fromisoformat(locktime)
|
||||
timestamp = dt_object.timestamp()
|
||||
return int(timestamp)
|
||||
|
||||
@staticmethod
|
||||
def parse_locktime_string(locktime, now=None):
|
||||
"""Resolve a (possibly relative) locktime string into a concrete int.
|
||||
|
||||
Supported forms:
|
||||
* plain int / timestamp -> returned unchanged
|
||||
* ``"<n>y"`` -> n years from now (as a timestamp)
|
||||
* ``"<n>d"`` -> n days from now (as a timestamp)
|
||||
|
||||
When *now* is provided (a ``datetime``), relative strings are resolved
|
||||
relative to that instant instead of ``datetime.now()``. This is used
|
||||
when checking a signed will so that ``"30d"`` always resolves to the
|
||||
*original* creation-time + 30 days, preventing spurious postpone
|
||||
detections on every check.
|
||||
"""
|
||||
try:
|
||||
return int(locktime)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
if now is None:
|
||||
now = datetime.now()
|
||||
if locktime[-1] == "y":
|
||||
locktime = str(int(locktime[:-1]) * 365) + "d"
|
||||
if locktime[-1] == "d":
|
||||
return int(
|
||||
(now + timedelta(days=int(locktime[:-1])))
|
||||
.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
.timestamp()
|
||||
)
|
||||
return int(locktime)
|
||||
except Exception:
|
||||
pass
|
||||
return 0
|
||||
|
||||
@staticmethod
|
||||
def int_locktime(seconds=0, minutes=0, hours=0, days=0):
|
||||
"""Convert a human duration into seconds."""
|
||||
return int(
|
||||
seconds
|
||||
+ minutes * 60
|
||||
+ hours * 60 * 60
|
||||
+ days * 60 * 60 * 24
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Amount helpers
|
||||
# ------------------------------------------------------------------ #
|
||||
@staticmethod
|
||||
def encode_amount(amount, decimal_point):
|
||||
"""Convert a displayed BTC amount into integer satoshis.
|
||||
|
||||
Percentage amounts (e.g. ``"50%"``) are passed through unchanged, since
|
||||
they are resolved later against the wallet balance.
|
||||
"""
|
||||
if Util.is_perc(amount):
|
||||
return amount
|
||||
else:
|
||||
try:
|
||||
return int(float(amount) * pow(10, decimal_point))
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
@staticmethod
|
||||
def decode_amount(amount, decimal_point):
|
||||
"""Inverse of :meth:`encode_amount`: satoshis -> displayed string."""
|
||||
if Util.is_perc(amount):
|
||||
return amount
|
||||
else:
|
||||
basestr = "{{:0.{}f}}".format(decimal_point)
|
||||
try:
|
||||
return basestr.format(float(amount) / pow(10, decimal_point))
|
||||
except Exception:
|
||||
return str(amount)
|
||||
|
||||
@staticmethod
|
||||
def is_perc(value):
|
||||
"""True if ``value`` is a percentage string such as ``"25%"``."""
|
||||
try:
|
||||
return value[-1] == "%"
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Heir / will-executor comparison helpers
|
||||
# ------------------------------------------------------------------ #
|
||||
@staticmethod
|
||||
def cmp_array(heira, heirb):
|
||||
"""Element-wise equality of two sequences (length-safe)."""
|
||||
try:
|
||||
if len(heira) != len(heirb):
|
||||
return False
|
||||
for h in range(0, len(heira)):
|
||||
if heira[h] != heirb[h]:
|
||||
return False
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def cmp_heir(heira, heirb):
|
||||
"""Two heirs are "the same" when address (0) and amount (1) match."""
|
||||
if heira[0] == heirb[0] and heira[1] == heirb[1]:
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def cmp_willexecutor(willexecutora, willexecutorb):
|
||||
"""Compare two will-executor dicts by url / address / base_fee."""
|
||||
if willexecutora == willexecutorb:
|
||||
return True
|
||||
try:
|
||||
if (
|
||||
willexecutora["url"] == willexecutorb["url"]
|
||||
and willexecutora["address"] == willexecutorb["address"]
|
||||
and willexecutora["base_fee"] == willexecutorb["base_fee"]
|
||||
):
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def search_heir_by_values(heirs, heir, values):
|
||||
"""Return the key of the first heir in ``heirs`` matching ``heir`` on
|
||||
every column listed in ``values`` (or ``False`` if none)."""
|
||||
for h, v in heirs.items():
|
||||
found = False
|
||||
for val in values:
|
||||
if val in v and v[val] != heir[val]:
|
||||
found = True
|
||||
|
||||
if not found:
|
||||
return h
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def cmp_heir_by_values(heira, heirb, values):
|
||||
"""True when two heirs agree on every column index in ``values``."""
|
||||
for v in values:
|
||||
if heira[v] != heirb[v]:
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def cmp_heirs_by_values(
|
||||
heirsa, heirsb, values, exclude_willexecutors=False, reverse=True
|
||||
):
|
||||
"""Set-equality of two heir collections, comparing only ``values``.
|
||||
|
||||
When ``exclude_willexecutors`` is set, synthetic will-executor heirs
|
||||
(those whose key contains the ``w!ll3x3c"`` marker) are skipped. The
|
||||
``reverse`` flag makes the comparison symmetric by running it both ways.
|
||||
"""
|
||||
for heira in heirsa:
|
||||
if (
|
||||
exclude_willexecutors and 'w!ll3x3c"' not in heira
|
||||
) or not exclude_willexecutors:
|
||||
found = False
|
||||
for heirb in heirsb:
|
||||
if Util.cmp_heir_by_values(heirsa[heira], heirsb[heirb], values):
|
||||
found = True
|
||||
if not found:
|
||||
return False
|
||||
if reverse:
|
||||
return Util.cmp_heirs_by_values(
|
||||
heirsb,
|
||||
heirsa,
|
||||
values,
|
||||
exclude_willexecutors=exclude_willexecutors,
|
||||
reverse=False,
|
||||
)
|
||||
else:
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def cmp_heirs(
|
||||
heirsa,
|
||||
heirsb,
|
||||
cmp_function=lambda x, y: x[0] == y[0] and x[3] == y[3],
|
||||
reverse=True,
|
||||
):
|
||||
"""Compare two heir collections using a custom ``cmp_function``.
|
||||
|
||||
Will-executor entries are ignored. As with
|
||||
:meth:`cmp_heirs_by_values`, ``reverse`` makes the relation symmetric.
|
||||
"""
|
||||
try:
|
||||
for heir in heirsa:
|
||||
if 'w!ll3x3c"' not in heir:
|
||||
if heir not in heirsb or not cmp_function(
|
||||
heirsa[heir], heirsb[heir]
|
||||
):
|
||||
if not Util.search_heir_by_values(heirsb, heirsa[heir], [0, 3]):
|
||||
return False
|
||||
if reverse:
|
||||
return Util.cmp_heirs(heirsb, heirsa, cmp_function, False)
|
||||
else:
|
||||
return True
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Transaction input/output comparison helpers
|
||||
# ------------------------------------------------------------------ #
|
||||
@staticmethod
|
||||
def cmp_inputs(inputsa, inputsb):
|
||||
"""True when both input lists reference the same set of UTXOs."""
|
||||
if len(inputsa) != len(inputsb):
|
||||
return False
|
||||
for inputa in inputsa:
|
||||
if not Util.in_utxo(inputa, inputsb):
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def cmp_outputs(outputsa, outputsb, willexecutor_output=None):
|
||||
"""True when both output lists contain the same (address, value) pairs.
|
||||
|
||||
The optional ``willexecutor_output`` is treated as a wildcard match so
|
||||
that the will-executor's fee output does not break the comparison.
|
||||
"""
|
||||
if len(outputsa) != len(outputsb):
|
||||
return False
|
||||
for outputa in outputsa:
|
||||
if not Util.cmp_output(outputa, willexecutor_output):
|
||||
if not Util.in_output(outputa, outputsb):
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def cmp_txs(txa, txb):
|
||||
"""Two transactions are equivalent when their inputs and outputs match."""
|
||||
if not Util.cmp_inputs(txa.inputs(), txb.inputs()):
|
||||
return False
|
||||
if not Util.cmp_outputs(txa.outputs(), txb.outputs()):
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def get_value_amount(txa, txb):
|
||||
"""Sum of the values of outputs that appear (same addr+value) in both
|
||||
transactions. Returns ``False`` as soon as an output of ``txa`` shares
|
||||
neither amount nor address with any output of ``txb``."""
|
||||
outputsa = txa.outputs()
|
||||
value_amount = 0
|
||||
|
||||
for outa in outputsa:
|
||||
same_amount, same_address = Util.din_output(outa, txb.outputs())
|
||||
if not (same_amount or same_address):
|
||||
return False
|
||||
if same_amount and same_address:
|
||||
value_amount += outa.value
|
||||
if same_amount:
|
||||
pass
|
||||
if same_address:
|
||||
pass
|
||||
|
||||
return value_amount
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Locktime arithmetic
|
||||
# ------------------------------------------------------------------ #
|
||||
@staticmethod
|
||||
def chk_locktime(timestamp_to_check, locktime):
|
||||
"""Return True if ``locktime`` is still in the future."""
|
||||
locktime = int(locktime)
|
||||
return locktime > timestamp_to_check
|
||||
|
||||
@staticmethod
|
||||
def anticipate_locktime(locktime, hours=0, days=0):
|
||||
"""Move a locktime earlier by the given amount (only timestamp locktimes).
|
||||
|
||||
Never returns a value below 1.
|
||||
"""
|
||||
locktime = int(locktime)
|
||||
try:
|
||||
dt = datetime.fromtimestamp(locktime)
|
||||
except (OverflowError, OSError, ValueError):
|
||||
dt = datetime.fromtimestamp(min(locktime, 2 ** 31 - 1))
|
||||
dt -= timedelta(seconds=hours * 3600 + days * 86400)
|
||||
out = dt.timestamp()
|
||||
if out < 1:
|
||||
out = 1
|
||||
return out
|
||||
|
||||
@staticmethod
|
||||
def cmp_locktime(locktimea, locktimeb):
|
||||
"""Compare two relative locktime strings sharing the same unit."""
|
||||
if locktimea == locktimeb:
|
||||
return 0
|
||||
strlocktimea = str(locktimea)
|
||||
strlocktimeb = str(locktimeb)
|
||||
if locktimea[-1] in "yd":
|
||||
if locktimeb[-1] == locktimea[-1]:
|
||||
return int(strlocktimea[-1]) - int(strlocktimeb[-1])
|
||||
else:
|
||||
return int(locktimea) - (locktimeb)
|
||||
|
||||
@staticmethod
|
||||
def is_locktime_increased(old, new):
|
||||
"""True when *new* locktime spec is longer/greater than *old*."""
|
||||
def _to_days(v):
|
||||
if isinstance(v, str) and v[-1:] in ("d", "y"):
|
||||
n = int(v[:-1])
|
||||
return n * 365 if v[-1] == "y" else n
|
||||
return int(v)
|
||||
return _to_days(new) > _to_days(old)
|
||||
|
||||
@staticmethod
|
||||
def get_locktimes(will):
|
||||
"""Return the distinct locktimes used by the transactions in ``will``."""
|
||||
locktimes = {}
|
||||
for txid, willitem in will.items():
|
||||
locktimes[willitem["tx"].locktime] = True
|
||||
return locktimes.keys()
|
||||
|
||||
@staticmethod
|
||||
def get_lowest_locktimes(locktimes):
|
||||
"""Return sorted list of timestamp locktimes."""
|
||||
sorted_timestamps = []
|
||||
for locktime in locktimes:
|
||||
locktime = Util.parse_locktime_string(locktime)
|
||||
if locktime is not None:
|
||||
sorted_timestamps.append(locktime)
|
||||
return sorted(sorted_timestamps)
|
||||
|
||||
@staticmethod
|
||||
def search_willtx_per_io(will, tx):
|
||||
"""Find a will entry whose tx has the same inputs/outputs as ``tx``."""
|
||||
for wid, w in will.items():
|
||||
if Util.cmp_txs(w["tx"], tx["tx"]):
|
||||
return wid, w
|
||||
return None, None
|
||||
|
||||
@staticmethod
|
||||
def invalidate_will(will):
|
||||
raise Exception("not implemented")
|
||||
|
||||
@staticmethod
|
||||
def get_will_spent_utxos(will):
|
||||
"""Collect every input spent by any transaction in ``will``."""
|
||||
utxos = []
|
||||
for txid, willitem in will.items():
|
||||
utxos += willitem["tx"].inputs()
|
||||
|
||||
return utxos
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# UTXO helpers
|
||||
# ------------------------------------------------------------------ #
|
||||
@staticmethod
|
||||
def utxo_to_str(utxo):
|
||||
"""Best-effort conversion of a UTXO / input object to its ``txid:n`` str."""
|
||||
try:
|
||||
return utxo.to_str()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
return utxo.prevout.to_str()
|
||||
except Exception:
|
||||
pass
|
||||
return str(utxo)
|
||||
|
||||
@staticmethod
|
||||
def cmp_utxo(utxoa, utxob):
|
||||
"""True when two UTXOs refer to the same outpoint."""
|
||||
utxoa = Util.utxo_to_str(utxoa)
|
||||
utxob = Util.utxo_to_str(utxob)
|
||||
if utxoa == utxob:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def in_utxo(utxo, utxos):
|
||||
"""Membership test for a UTXO inside an iterable of UTXOs."""
|
||||
for s_u in utxos:
|
||||
if Util.cmp_utxo(s_u, utxo):
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def txid_in_utxo(txid, utxos):
|
||||
"""True if any UTXO in ``utxos`` is spent from transaction ``txid``."""
|
||||
for s_u in utxos:
|
||||
if s_u.prevout.txid == txid:
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def cmp_output(outputa, outputb):
|
||||
"""Two outputs are equal when both address and value match."""
|
||||
return outputa.address == outputb.address and outputa.value == outputb.value
|
||||
|
||||
@staticmethod
|
||||
def in_output(output, outputs):
|
||||
"""Membership test for an output inside an iterable of outputs."""
|
||||
for s_o in outputs:
|
||||
if Util.cmp_output(s_o, output):
|
||||
return True
|
||||
return False
|
||||
|
||||
# check all output with the same amount if none have the same address it can be a change
|
||||
# return true true same address same amount
|
||||
# return true false same amount different address
|
||||
# return false false different amount, different address not found
|
||||
@staticmethod
|
||||
def din_output(out, outputs):
|
||||
"""Detailed output lookup used to tell a change output apart.
|
||||
|
||||
Returns a ``(same_amount, same_address)`` tuple:
|
||||
* ``(True, True)`` -> an output with same amount *and* address
|
||||
* ``(True, False)`` -> same amount but different address (maybe change)
|
||||
* ``(False, False)``-> no output with this amount
|
||||
"""
|
||||
same_amount = []
|
||||
for s_o in outputs:
|
||||
if int(out.value) == int(s_o.value):
|
||||
same_amount.append(s_o)
|
||||
if out.address == s_o.address:
|
||||
return True, True
|
||||
else:
|
||||
pass
|
||||
|
||||
if len(same_amount) > 0:
|
||||
return True, False
|
||||
else:
|
||||
return False, False
|
||||
|
||||
@staticmethod
|
||||
def get_change_output(wallet, in_amount, out_amount, fee):
|
||||
"""Build a change ``PartialTxOutput`` if the leftover exceeds dust."""
|
||||
change_amount = int(in_amount - out_amount - fee)
|
||||
if change_amount > wallet.dust_threshold():
|
||||
change_addresses = wallet.get_change_addresses_for_new_transaction()
|
||||
out = PartialTxOutput.from_address_and_value(
|
||||
change_addresses[0], change_amount
|
||||
)
|
||||
out.is_change = True
|
||||
return out
|
||||
|
||||
@staticmethod
|
||||
def get_current_height(network):
|
||||
"""Return the current UNIX timestamp for locktime purposes.
|
||||
|
||||
Returns time.time() as the reference timestamp.
|
||||
"""
|
||||
return int(datetime.now().timestamp())
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Misc helpers
|
||||
# ------------------------------------------------------------------ #
|
||||
@staticmethod
|
||||
def copy(dicto, dictfrom):
|
||||
"""Shallow copy of ``dictfrom`` entries into ``dicto`` (in place)."""
|
||||
for k, v in dictfrom.items():
|
||||
dicto[k] = v
|
||||
|
||||
@staticmethod
|
||||
def fix_will_settings_tx_fees(will_settings):
|
||||
"""Migrate the legacy ``tx_fees`` key to ``baltx_fees`` in settings.
|
||||
|
||||
Returns True when a migration was performed (caller should persist).
|
||||
"""
|
||||
tx_fees = will_settings.get("tx_fees", False)
|
||||
have_to_update = False
|
||||
if tx_fees:
|
||||
will_settings["baltx_fees"] = tx_fees
|
||||
del will_settings["tx_fees"]
|
||||
have_to_update = True
|
||||
return have_to_update
|
||||
|
||||
@staticmethod
|
||||
def fix_will_tx_fees(will):
|
||||
"""Same legacy migration as above but applied to every will entry."""
|
||||
have_to_update = False
|
||||
for txid, willitem in will.items():
|
||||
tx_fees = willitem.get("tx_fees", False)
|
||||
if tx_fees:
|
||||
will[txid]["baltx_fees"] = tx_fees
|
||||
del will[txid]["tx_fees"]
|
||||
have_to_update = True
|
||||
return have_to_update
|
||||
|
||||
@staticmethod
|
||||
def text_to_hex(text: str) -> str:
|
||||
"""Convert text to a hexadecimal string (used for OP_RETURN payloads)."""
|
||||
hex_string = text.encode('utf-8').hex()
|
||||
return hex_string
|
||||
|
||||
@staticmethod
|
||||
def hex_to_text(hex_string: str) -> str:
|
||||
"""Convert a hexadecimal string back to text (for verification)."""
|
||||
try:
|
||||
return bytes.fromhex(hex_string).decode('utf-8')
|
||||
except Exception:
|
||||
return "Error: Invalid hex string"
|
||||
Reference in New Issue
Block a user