""" bal.core.will ============= The "will": the set of time-locked inheritance transactions plus all the logic to keep it coherent over time. Two classes live here: * :class:`Will` - a namespace of static methods operating on a *will* dictionary (mapping ``txid -> WillItem``): building the parent/child tree, anticipating locktimes, detecting replaced/invalidated/confirmed entries, validating that the will still matches the heirs and will-executors, and building an "invalidation" transaction. * :class:`WillItem` - a single will transaction together with its status flags, heirs, will-executor and fee. Transaction states: * ANTICIPATED - Transaction was generated with a locktime 1-day earlier than a pre-existing transaction sharing the same heirs. Remains VALID. * REPLACED - Transaction was replaced because at least one of its inputs is spent by a new transaction with a lower locktime. Loses VALID status. Propagates to children. * INVALIDATED - Transaction can no longer be spent because at least one of its inputs has been spent by a mempool/confirmed transaction and the previous transaction no longer exists in the will. Loses VALID status. * UPDATED - Transaction is spendable and valid, but a new transaction replaces it with the same locktime and heirs. * PENDING - Transaction is pending in the mempool (unconfirmed). * CONFIRMED - Transaction is confirmed in the blockchain. Separation of concerns ----------------------- The original ``WillItem`` carried a ``get_color()`` method returning hard-coded hex colours for the GUI. That was pure presentation living inside the core logic, so it has been **moved** to ``bal.gui.qt.theme.status_color(will_item)``. The status flags themselves (the source of truth) stay here; only the mapping "status -> colour" now lives in the GUI layer. No behaviour changed. """ import copy import time from datetime import datetime from electrum.i18n import _ from electrum.logging import Logger, get_logger from electrum.transaction import ( PartialTransaction, PartialTxInput, PartialTxOutput, Transaction, TxOutpoint, tx_from_any, ) from electrum.util import ( bfh, ) from .util import Util from .willexecutors import Willexecutors MIN_LOCKTIME = 1 _logger = get_logger(__name__) class Will: @staticmethod def get_children(will, willid): out = [] for _id in will: inputs = will[_id].tx.inputs() for idi in range(0, len(inputs)): _input = inputs[idi] if _input.prevout.txid.hex() == willid: out.append([_id, idi, _input.prevout.out_idx]) return out # build a tree with parent transactions @staticmethod def add_willtree(will): for willid in will: will[willid].children = Will.get_children(will, willid) for child in will[willid].children: if not will[child[0]].father: will[child[0]].father = willid # return a list of will sorted by locktime @staticmethod def get_sorted_will(will): return sorted(will.items(), key=lambda x: x[1]["tx"].locktime) @staticmethod def only_valid(will): for k, v in will.items(): if v.get_status("VALID"): yield k @staticmethod def needs_server_check(w): """Return True if ``w`` should be queried on its will-executor server when the user presses Check (or on Electrum close). A will is queried only when it is VALID, has a will-executor assigned, was actually PUSHED (sent), and is not yet CHECKED. The ``PUSHED`` condition is essential: querying the server for a will that was *never* sent would make the server (correctly) answer "I don't have this tx", which ``WillItem.set_check_willexecutor`` then records as CHECK_FAIL. A freshly signed-but-not-sent will would therefore turn red, even though it is merely "signed, not sent" (which must stay blue / #2bc8ed, as in the original BAL behaviour). Restricting the check to PUSHED wills matches the original ``check()`` logic and avoids that false failure. """ return bool( w.get_status("VALID") and w.we and w.get_status("PUSHED") and not w.get_status("CHECKED") ) @staticmethod def search_equal_tx(will, tx, wid): for w in will: if w != wid and not tx.to_json() != will[w]["tx"].to_json(): if will[w]["tx"].txid() != tx.txid(): if Util.cmp_txs(will[w]["tx"], tx): return will[w]["tx"] return False @staticmethod def get_tx_from_any(x): try: a = str(x) return tx_from_any(a) except Exception as e: raise e return x @staticmethod def add_info_from_will(will, wid, wallet): if isinstance(will[wid].tx, str): will[wid].tx = Will.get_tx_from_any(will[wid].tx) if wallet: will[wid].tx.add_info_from_wallet(wallet) for txin in will[wid].tx.inputs(): txid = txin.prevout.txid.hex() if txid in will: change = will[txid].tx.outputs()[txin.prevout.out_idx] txin._trusted_value_sats = change.value try: txin.script_descriptor = change.script_descriptor except Exception: pass txin.is_mine = True txin._TxInput__address = change.address txin._TxInput__scriptpubkey = change.scriptpubkey txin._TxInput__value_sats = change.value txin._trusted_value_sats = change.value @staticmethod def normalize_will(will, wallet=None, others_inputs=None): others_input = others_inputs if others_inputs is not None else {} to_delete = [] to_add = {} # add info from wallet willitems = {} for wid in will: Will.add_info_from_will(will, wid, wallet) willitems[wid] = WillItem(will[wid]) will = willitems errors = {} for wid in will: txid = will[wid].tx.txid() if txid is None: _logger.error("##########") _logger.error(wid) _logger.error(will[wid]) _logger.error(will[wid].tx.to_json()) _logger.error("txid is none") will[wid].set_status("ERROR", True) errors[wid] = will[wid] continue if txid != wid: outputs = will[wid].tx.outputs() ow = will[wid] ow.normalize_locktime(others_input) will[wid] = WillItem(ow.to_dict()) for i in range(0, len(outputs)): Will.change_input( will, wid, i, outputs[i], others_input, to_delete, to_add ) to_delete.append(wid) to_add[ow.tx.txid()] = ow.to_dict() # for eid, err in errors.items(): # new_txid = err.tx.txid() for k, w in to_add.items(): will[k] = w for wid in to_delete: if wid in will: del will[wid] @staticmethod def new_input(txid, idx, change): prevout = TxOutpoint(txid=bfh(txid), out_idx=idx) inp = PartialTxInput(prevout=prevout) inp._trusted_value_sats = change.value inp.is_mine = True inp._TxInput__address = change.address inp._TxInput__scriptpubkey = change.scriptpubkey inp._TxInput__value_sats = change.value return inp # Sentinel returned by ``check_anticipate`` meaning "do not anticipate": # it is larger than any valid 32-bit locktime, so ``set_anticipate``'s # ``min(old_locktime, sentinel)`` keeps the old locktime untouched. NO_ANTICIPATE = 4294967295 + 1 @staticmethod def check_anticipate(ow: "WillItem", nw: "WillItem"): """Decide the locktime the *new* will item should take w.r.t. an old one. Both ``ow`` (old) and ``nw`` (new) spend (at least) one common input, so only one of them can ever be mined. When the new will differs in a way that affects the heirs or *increases* the amount they must wait for, the new transaction is given a locktime ONE DAY EARLIER than the old one (``anticipate``), so it would be mined first and supersede the old one. The decision tree: * Heirs (address + requested amount) differ -> anticipate * Same heirs, both have a will-executor: - same will-executor url: * old base_fee > new base_fee -> anticipate (heirs effectively receive more -> bring it forward) * the per-byte ``tx_fees`` changed -> anticipate * otherwise (e.g. base_fee merely *increased*, nothing else changed) -> keep the old locktime (this is the UPDATED case: a new tx with the SAME locktime and SAME heirs replaces the old one) - different will-executor url -> keep the old locktime * Same heirs, no will-executor change: - the resolved heir amounts (column 3) differ -> anticipate - otherwise -> keep the locktime Returns the chosen locktime, or :data:`NO_ANTICIPATE` when the new locktime is already earlier than ``anticipate`` (nothing to do). """ anticipate = Util.anticipate_locktime(ow.tx.locktime, days=1) if int(nw.tx.locktime) < int(anticipate): # The new will is already earlier than one day before the old one; # there is nothing to anticipate. return Will.NO_ANTICIPATE if not Util.cmp_heirs_by_values( ow.heirs, nw.heirs, [0, 1], exclude_willexecutors=True ): # Heirs (address / requested amount) changed -> bring it forward. return anticipate if nw.we and ow.we: if ow.we["url"] == nw.we["url"]: if int(ow.we["base_fee"]) > int(nw.we["base_fee"]): # Will-executor now takes a SMALLER fee -> heirs get more, # so anticipate. return anticipate if int(ow.tx_fees) != int(nw.tx_fees): # Mining fee rate changed -> anticipate. return anticipate # Same url, base_fee not lowered, same tx_fees: this is a plain # update (e.g. base_fee increased). Keep the same locktime. return ow.tx.locktime # Different will-executor URL: keep the same locktime. return ow.tx.locktime # No will-executor on at least one side. if nw.we == ow.we: if not Util.cmp_heirs_by_values(ow.heirs, nw.heirs, [0, 3]): # Resolved heir amounts differ -> anticipate. return anticipate return ow.tx.locktime # One has a will-executor, the other doesn't: keep the same locktime. return ow.tx.locktime @staticmethod def change_input(will, otxid, idx, change, others_inputs, to_delete, to_append): ow = will[otxid] ntxid = ow.tx.txid() if otxid != ntxid: for wid in will: w = will[wid] inputs = w.tx.inputs() outputs = w.tx.outputs() found = False old_txid = w.tx.txid() # ntx = None for i in range(0, len(inputs)): if ( inputs[i].prevout.txid.hex() == otxid and inputs[i].prevout.out_idx == idx ): if isinstance(w.tx, Transaction): will[wid].tx = PartialTransaction.from_tx(w.tx) will[wid].tx.set_rbf(True) will[wid].tx._inputs[i] = Will.new_input(wid, idx, change) found = True if found: pass new_txid = will[wid].tx.txid() if old_txid != new_txid: to_delete.append(old_txid) to_append[new_txid] = will[wid] outputs = will[wid].tx.outputs() for i in range(0, len(outputs)): Will.change_input( will, wid, i, outputs[i], others_inputs, to_delete, to_append, ) @staticmethod def get_all_inputs(will, only_valid=False): all_inputs = {} for w, wi in will.items(): if not only_valid or wi.get_status("VALID"): inputs = wi.tx.inputs() for i in inputs: prevout_str = i.prevout.to_str() inp = [w, will[w], i] if prevout_str not in all_inputs: all_inputs[prevout_str] = [inp] else: all_inputs[prevout_str].append(inp) return all_inputs @staticmethod def get_all_inputs_min_locktime(all_inputs): all_inputs_min_locktime = {} for i, values in all_inputs.items(): min_locktime = min(values, key=lambda x: x[1].tx.locktime)[1].tx.locktime for w in values: if w[1].tx.locktime == min_locktime: if i not in all_inputs_min_locktime: all_inputs_min_locktime[i] = [w] else: all_inputs_min_locktime[i].append(w) return all_inputs_min_locktime @staticmethod def search_anticipate_rec(will, old_inputs): redo = False to_delete = [] to_append = {} new_inputs = Will.get_all_inputs(will, only_valid=True) for nid, nwi in will.items(): if nwi.search_anticipate(new_inputs): if nid != nwi.tx.txid(): redo = True to_delete.append(nid) to_append[nwi.tx.txid()] = nwi outputs = nwi.tx.outputs() for i in range(0, len(outputs)): Will.change_input( will, nid, i, outputs[i], new_inputs, to_delete, to_append ) if nwi.search_anticipate(old_inputs): if nid != nwi.tx.txid(): redo = True to_delete.append(nid) to_append[nwi.tx.txid()] = nwi outputs = nwi.tx.outputs() for i in range(0, len(outputs)): Will.change_input( will, nid, i, outputs[i], new_inputs, to_delete, to_append ) for w in to_delete: try: del will[w] except Exception: pass for k, w in to_append.items(): will[k] = w if redo: Will.search_anticipate_rec(will, old_inputs) @staticmethod def update_will(old_will, new_will): all_old_inputs = Will.get_all_inputs(old_will, only_valid=True) # all_inputs_min_locktime = Will.get_all_inputs_min_locktime(all_old_inputs) # all_new_inputs = Will.get_all_inputs(new_will) # check if the new input is already spent by other transaction # if it is use the same locktime, or anticipate. Will.search_anticipate_rec(new_will, all_old_inputs) other_inputs = Will.get_all_inputs(old_will, {}) try: Will.normalize_will(new_will, others_inputs=other_inputs) except Exception as e: raise e for oid in Will.only_valid(old_will): if oid in new_will: new_heirs = new_will[oid].heirs new_we = new_will[oid].we new_will[oid] = old_will[oid] new_will[oid].heirs = new_heirs new_will[oid].we = new_we continue else: continue @staticmethod def get_higher_input_for_tx(will): out = {} for wid in will: wtx = will[wid].tx found = False for inp in wtx.inputs(): if inp.prevout.txid.hex() in will: found = True break if not found: out[inp.prevout.to_str()] = inp return out @staticmethod def invalidate_will(will, wallet, fees_per_byte): will_only_valid = Will.only_valid_list(will) inputs = Will.get_all_inputs(will_only_valid) utxos = wallet.get_utxos() filtered_inputs = [] prevout_to_spend = [] current_timestamp = int(time.time()) for prevout_str, ws in inputs.items(): for w in ws: if w[0] not in filtered_inputs: filtered_inputs.append(w[0]) if prevout_str not in prevout_to_spend: prevout_to_spend.append(prevout_str) balance = 0 utxo_to_spend = [] for utxo in utxos: utxo_str = utxo.prevout.to_str() if utxo_str in prevout_to_spend: balance += inputs[utxo_str][0][2].value_sats() utxo_to_spend.append(utxo) if len(utxo_to_spend) > 0: change_addresses = wallet.get_change_addresses_for_new_transaction() out = PartialTxOutput.from_address_and_value(change_addresses[0], balance) out.is_change = True locktime = current_timestamp tx = PartialTransaction.from_io( utxo_to_spend, [out], locktime=locktime, version=2 ) tx.set_rbf(True) fee = tx.estimated_size() * fees_per_byte if balance - fee > 0: out = PartialTxOutput.from_address_and_value( change_addresses[0], balance - fee ) tx = PartialTransaction.from_io( utxo_to_spend, [out], locktime=locktime, version=2 ) tx.set_rbf(True) _logger.debug(f"invalidation tx: {tx}") return tx else: _logger.debug(f"balance({balance}) - fee({fee}) <=0") pass else: _logger.debug("len utxo_to_spend <=0") pass @staticmethod def is_new(will): for wid, w in will.items(): if w.get_status("VALID") and not w.get_status("COMPLETE"): return True @staticmethod def search_rai(all_inputs, all_utxos, will, wallet): # will_only_valid = Will.only_valid_or_replaced_list(will) for inp, ws in all_inputs.items(): inutxo = Util.in_utxo(inp, all_utxos) for w in ws: wi = w[1] if ( wi.get_status("VALID") or wi.get_status("CONFIRMED") or wi.get_status("PENDING") ): prevout_id = w[2].prevout.txid.hex() if not inutxo: if prevout_id in will: wo = will[prevout_id] if wo.get_status("REPLACED"): wi.set_status("REPLACED", True) if wo.get_status("INVALIDATED"): wi.set_status("INVALIDATED", True) else: if wallet.db.get_transaction(wi._id): wi.set_status("CONFIRMED", True) else: wi.set_status("INVALIDATED", True) for child in wi.search(all_inputs): if child.tx.locktime < wi.tx.locktime: _logger.debug("a child was found") wi.set_status("REPLACED", True) else: pass Will.search_updated(all_inputs) @staticmethod def search_updated(all_inputs): """Mark superseded-but-still-valid transactions as ``UPDATED``. When the user changes something that does NOT move the deadline nor the money paid to the heirs -- the classic case being a will-executor's ``base_fee`` increase -- the plugin rebuilds the inheritance keeping the SAME locktime and the SAME heirs (same destination address and amount). The result is two transactions that: * spend the same wallet UTXO(s), * share the exact same ``tx.locktime``, * pay the same heirs the same amounts, but have different txids (because the will-executor fee output changed). Both remain technically spendable, so neither must lose ``VALID``. The newer transaction (the one created later, i.e. the larger ``time``) becomes the active one, and the older transaction it superseded is flagged ``UPDATED`` while keeping ``VALID`` (per the state spec). Detection is symmetric over every shared input: for each pair of valid will items sharing an input we compare locktime and heirs; the older of the two (by creation ``time``) is the one that was updated. """ seen_pairs = set() for inp, ws in all_inputs.items(): # Only valid transactions can be "updated"; replaced/invalidated # ones already lost their VALID status and are handled elsewhere. valid_ws = [w for w in ws if w[1].get_status("VALID")] for i in range(len(valid_ws)): for j in range(i + 1, len(valid_ws)): wa = valid_ws[i][1] wb = valid_ws[j][1] if wa._id == wb._id: continue # Avoid processing the same pair twice (it may share more # than one input). pair_key = tuple(sorted((wa._id, wb._id))) if pair_key in seen_pairs: continue seen_pairs.add(pair_key) if int(wa.tx.locktime) != int(wb.tx.locktime): # Different deadlines -> this is an anticipate/replace # case, not an update. continue if not Util.cmp_heirs_by_values( wa.heirs or {}, wb.heirs or {}, [0, 1], exclude_willexecutors=True, ): # Heirs (address + requested amount) differ -> not a # plain update. continue # Same locktime and same heirs: the older transaction was # superseded by the newer one. Pick the older by creation # time (fall back to a stable order when time is missing). ta = wa.time if wa.time is not None else 0 tb = wb.time if wb.time is not None else 0 if ta == tb: older = wa if wa._id <= wb._id else wb else: older = wa if ta < tb else wb older.set_status("UPDATED", True) @staticmethod def utxos_strs(utxos): return [Util.utxo_to_str(u) for u in utxos] @staticmethod def set_invalidate(wid, will=None): will = will if will is not None else {} will[wid].set_status("INVALIDATED", True) if will[wid].children: for c in will[wid].children.items(): Will.set_invalidate(c[0], will) @staticmethod def check_tx_height(tx, wallet): info = wallet.get_tx_info(tx) return info.tx_mined_status.height() # check if transactions are still technically valid @staticmethod def check_invalidated(willtree, utxos_list, wallet): """Reconcile each will transaction against the wallet's view of its inputs. For every will item whose parent is gone/confirmed/pending, we look at its inputs. If an input is no longer an unspent output of the wallet (it is not in ``utxos_list``), then *something* has spent it, and we ask Electrum what happened to OUR transaction via its mined-status height: * ``height > 0`` -> confirmed in a block -> CONFIRMED * ``height == 0`` (UNCONFIRMED) -> seen in the mempool -> PENDING * ``height == -1`` (UNCONF_PARENT)-> mempool (unconf parent)-> PENDING * anything else (LOCAL / FUTURE / unknown) -> the input was spent by a different transaction, so ours can no longer be broadcast -> INVALIDATED (cascades to children). Electrum's height sentinels (from ``electrum.address_synchronizer``): ``TX_HEIGHT_LOCAL = -2``, ``TX_HEIGHT_UNCONFIRMED = 0``, ``TX_HEIGHT_UNCONF_PARENT = -1``, ``TX_HEIGHT_FUTURE = -3``. """ for wid, w in willtree.items(): if ( not w.father or willtree[w.father].get_status("CONFIRMED") or willtree[w.father].get_status("PENDING") ): for inp in w.tx.inputs(): inp_str = Util.utxo_to_str(inp) if inp_str not in utxos_list: if wallet: height = Will.check_tx_height(w.tx, wallet) if height > 0: # Mined in a block. w.set_status("CONFIRMED", True) elif height in (0, -1): # Seen in the mempool (unconfirmed, possibly with # an unconfirmed parent). w.set_status("PENDING", True) else: # LOCAL / FUTURE / unknown: the spent input was # taken by some other transaction, so this will # can no longer be broadcast. Will.set_invalidate(wid, willtree) # def reflect_to_children(treeitem): # if not treeitem.get_status("VALID"): # _logger.debug(f"{tree:item._id} status not valid looking for children") # for child in treeitem.children: # wc = willtree[child] # if wc.get_status("VALID"): # if treeitem.get_status("INVALIDATED"): # wc.set_status("INVALIDATED", True) # if treeitem.get_status("REPLACED"): # wc.set_status("REPLACED", True) # if wc.children: # Will.reflect_to_children(wc) @staticmethod def check_amounts(heirs, willexecutors, all_utxos, timestamp_to_check, dust): fixed_heirs, fixed_amount, perc_heirs, perc_amount, fixed_amount_with_dust = ( heirs.fixed_percent_lists_amount(timestamp_to_check, dust, reverse=True) ) wallet_balance = 0 for utxo in all_utxos: wallet_balance += utxo.value_sats() if fixed_amount >= wallet_balance: raise FixedAmountException( f"Fixed amount({fixed_amount}) >= {wallet_balance}" ) if perc_amount != 100: raise PercAmountException(f"Perc amount({perc_amount}) =! 100%") for url, wex in willexecutors.items(): if Willexecutors.is_selected(wex): temp_balance = wallet_balance - int(wex["base_fee"]) if fixed_amount >= temp_balance: raise FixedAmountException( f"Willexecutor{url} excess base fee({wex['base_fee']}), {fixed_amount} >={temp_balance}" ) @staticmethod def check_will(will, all_utxos, wallet, timestamp_to_check): Will.add_willtree(will) utxos_list = Will.utxos_strs(all_utxos) Will.check_invalidated(will, utxos_list, wallet) all_inputs = Will.get_all_inputs(will, only_valid=True) all_inputs_min_locktime = Will.get_all_inputs_min_locktime(all_inputs) Will.check_will_expired( all_inputs_min_locktime, timestamp_to_check ) all_inputs = Will.get_all_inputs(will, only_valid=True) Will.search_rai(all_inputs, all_utxos, will, wallet) @staticmethod def get_min_locktime(will,default_value=None): return min((v.tx.locktime for v in will.values() if v.get_status('VALID')), default=default_value) @staticmethod def is_will_valid( will, timestamp_to_check, tx_fees, all_utxos, heirs=None, willexecutors=None, self_willexecutor=False, wallet=False, callback_not_valid_tx=None, ): heirs = heirs if heirs is not None else {} willexecutors= willexecutors if willexecutors is not None else {} Will.check_will(will, all_utxos, wallet, timestamp_to_check) if heirs: if not Will.check_willexecutors_and_heirs( will, heirs, willexecutors, self_willexecutor, timestamp_to_check, tx_fees, ): raise NotCompleteWillException() all_inputs = Will.get_all_inputs(will, only_valid=True) _logger.info("check all utxo in wallet are spent") if all_inputs: for utxo in all_utxos: if utxo.value_sats() > 68 * tx_fees: if not Util.in_utxo(utxo, all_inputs.keys()): _logger.info("utxo is not spent", utxo.to_json()) _logger.debug(all_inputs.keys()) raise NotCompleteWillException( "Some utxo in the wallet is not included" ) _logger.info("will ok") return True @staticmethod def check_will_expired(all_inputs_min_locktime, timestamp_to_check): _logger.info("check if some transaction is expired") for prevout_str, wid in all_inputs_min_locktime.items(): for w in wid: if w[1].get_status("VALID"): locktime = int(wid[0][1].tx.locktime) if locktime < int(timestamp_to_check): raise WillExpiredException( f"Will Expired {wid[0][0]}: {locktime}<{timestamp_to_check}" ) else: _logger.debug(f"Will Not Expired {wid[0][0]}: {datetime.fromtimestamp(locktime).isoformat()} > {datetime.fromtimestamp(timestamp_to_check).isoformat()}") # def check_all_input_spent_are_in_wallet(): # _logger.info("check all input spent are in wallet or valid txs") # for inp, ws in all_inputs.items(): # if not Util.in_utxo(inp, all_utxos): # for w in ws: # if w[1].get_status("VALID"): # prevout_id = w[2].prevout.txid.hex() # parentwill = will.get(prevout_id, False) # if not parentwill or not parentwill.get_status("VALID"): # w[1].set_status("INVALIDATED", True) @staticmethod def only_valid_list(will): out = {} for wid, w in will.items(): if w.get_status("VALID"): out[wid] = w return out @staticmethod def only_valid_or_replaced_list(will): out = [] for wid, w in will.items(): wi = w if wi.get_status("VALID") or wi.get_status("REPLACED"): out.append(wid) return out @staticmethod def check_willexecutors_and_heirs( will, heirs, willexecutors, self_willexecutor, check_date, tx_fees ): _logger.debug("check willexecutors heirs") no_willexecutor = 0 willexecutors_found = {} heirs_found = {} will_only_valid = Will.only_valid_list(will) if len(will_only_valid) < 1: return False for wid in Will.only_valid_list(will): w = will[wid] if w.tx_fees != tx_fees: raise TxFeesChangedException(f"{tx_fees}: {w.tx_fees}") for wheir in w.heirs: if not 'w!ll3x3c"' == wheir[:9]: their = will[wid].heirs[wheir] if heir := heirs.get(wheir, None): if heir[0] == their[0] and heir[1] == their[1]: # The requested (possibly new) locktime for this heir. _base = ( datetime.fromtimestamp(w.time) if w.time and isinstance(heir[2], str) and heir[2][-1:] in ("d", "y") else None ) new_locktime = Util.parse_locktime_string( heir[2], now=_base ) tx_locktime = int(w.tx.locktime) # Count the heir as found when the resolved locktime # matches OR when the raw locktime spec is unchanged # (the latter covers anticipation, which reduces # tx.locktime but leaves the stored spec intact). if ( new_locktime == tx_locktime or their[2] == heir[2] ): count = heirs_found.get(wheir, 0) heirs_found[wheir] = count + 1 elif new_locktime > tx_locktime and ( w.get_status("COMPLETE") or w.get_status("PUSHED") ): # POSTPONE: compare raw specs to avoid false # positives from anticipation. if Util.is_locktime_increased(their[2], heir[2]): raise WillPostponedException( f"{wheir}: locktime postponed " f"{their[2]}->{heir[2]} " f"on a signed/sent will" ) # new_locktime < tx_locktime (anticipate) is left to # check_will_expired -> WillExpiredException. else: # The will still carries this heir, but the heir is no # longer present in the current heirs set: the user # removed it. This must trigger a rebuild exactly like # "heir added" does, otherwise the removed heir would # silently stay in the inheritance transaction. Raising # HeirNotFoundException reuses the same rebuild path used # by the Check button and by on_close (Electrum quit). _logger.debug( f"heir removed, transaction is not valid:" f"{wheir} {wid}, {w}" ) raise HeirNotFoundException(wheir) if willexecutor := w.we: count = willexecutors_found.get(willexecutor["url"], 0) if Util.cmp_willexecutor( willexecutor, willexecutors.get(willexecutor["url"], None) ): willexecutors_found[willexecutor["url"]] = count + 1 else: no_willexecutor += 1 count_heirs = 0 for h in heirs: if Util.parse_locktime_string(heirs[h][2]) >= check_date: count_heirs += 1 if h not in heirs_found: _logger.debug(f"heir: {h} not found") raise HeirNotFoundException(h) if not count_heirs: raise NoHeirsException("there are not valid heirs") if self_willexecutor and no_willexecutor == 0: raise NoWillExecutorNotPresent("Backup tx") for url, we in willexecutors.items(): if Willexecutors.is_selected(we): if url not in willexecutors_found: _logger.debug(f"will-executor: {url} not fount") raise WillExecutorNotPresent(url) _logger.info("will is coherent with heirs and will-executors") return True class WillItem(Logger): STATUS_DEFAULT = { "ANTICIPATED": ["Anticipated", False], "BROADCASTED": ["Broadcasted", False], "CHECKED": ["Checked", False], "CHECK_FAIL": ["Check Failed", False], "COMPLETE": ["Signed", False], "CONFIRMED": ["Confirmed", False], "ERROR": ["Error", False], "EXPIRED": ["Expired", False], "EXPORTED": ["Exported", False], "IMPORTED": ["Imported", False], "INVALIDATED": ["Invalidated", False], "PENDING": ["Pending", False], "PUSH_FAIL": ["Push failed", False], "PUSHED": ["Pushed", False], "REPLACED": ["Replaced", False], "RESTORED": ["Restored", False], "UPDATED": ["Updated", False], "VALID": ["Valid", True], } def set_status(self, status, value=True): if self.STATUS[status][1] == bool(value): return None self.status += "." + (("NOT " if not value else "") + _(self.STATUS[status][0])) self.STATUS[status][1] = bool(value) if value: # ANITICIPATED: valid remains, no other changes # UPDATED: valid remains, no other changes if status in ["INVALIDATED", "REPLACED", "CONFIRMED", "PENDING"]: self.STATUS["VALID"][1] = False if status in ["CONFIRMED", "PENDING"]: self.STATUS["INVALIDATED"][1] = False if status in ["PUSHED"]: self.STATUS["PUSH_FAIL"][1] = False self.STATUS["CHECK_FAIL"][1] = False if status in ["CHECKED"]: self.STATUS["PUSHED"][1] = True self.STATUS["PUSH_FAIL"][1] = False return value def get_status(self, status): return self.STATUS[status][1] def __init__(self, w, _id=None, wallet=None): if isinstance( w, WillItem, ): self.__dict__ = w.__dict__.copy() else: self.tx = Will.get_tx_from_any(w["tx"]) self.heirs = w.get("heirs", None) self.we = w.get("willexecutor", None) self.status = w.get("status", None) self.description = w.get("description", None) self.time = w.get("time", None) self.change = w.get("change", None) self.tx_fees = w.get("baltx_fees", 0) self.father = w.get("Father", None) self.children = w.get("Children", None) self.STATUS = copy.deepcopy(WillItem.STATUS_DEFAULT) for s in self.STATUS: self.STATUS[s][1] = w.get(s, WillItem.STATUS_DEFAULT[s][1]) if not _id: self._id = self.tx.txid() else: self._id = _id if not self._id: self.status += "ERROR!!!" self.valid = False if wallet: self.tx.add_info_from_wallet(wallet) def to_dict(self): out = { "_id": self._id, "tx": self.tx, "heirs": self.heirs, "willexecutor": self.we, "status": self.status, "description": self.description, "time": self.time, "change": self.change, "baltx_fees": self.tx_fees, } for key in self.STATUS: try: out[key] = self.STATUS[key][1] except Exception as e: _logger.error(f"{key},{self.STATUS[key]} {e}") return out def __repr__(self): return str(self) def __str__(self): return str(self.to_dict()) def set_anticipate(self, ow: "WillItem"): nl = min(ow.tx.locktime, Will.check_anticipate(ow, self)) if int(nl) < self.tx.locktime: self.tx.locktime = int(nl) self.set_status("ANTICIPATED", True) return True else: return False def search_anticipate(self, all_inputs): anticipated = False for ow in self.search(all_inputs): if self.set_anticipate(ow): anticipated = True return anticipated def search(self, all_inputs): for inp in self.tx.inputs(): prevout_str = inp.prevout.to_str() oinps = all_inputs.get(prevout_str, []) for oinp in oinps: ow = oinp[1] if ow._id != self._id: yield ow def normalize_locktime(self, all_inputs): outputs = self.tx.outputs() for idx in range(0, len(outputs)): inps = all_inputs.get(f"{self._id}:{idx}", []) _logger.debug("****check locktime***") for inp in inps: if inp[0] != self._id: iw = inp[1] self.set_anticipate(iw) def set_check_willexecutor(self,resp): try: if resp : if "tx" in resp and resp["tx"] == str(self.tx): self.set_status("PUSHED") self.set_status("CHECKED") else: self.set_status("CHECK_FAIL") self.set_status("PUSHED", False) return True else: self.set_status("CHECK_FAIL") self.set_status("PUSHED", False) return False except Exception as e: _logger.error(f"exception checking transaction: {e}") self.set_status("CHECK_FAIL") # NOTE: the former ``get_color()`` method (which returned hard-coded hex # colours for the GUI) has been moved out of the core logic to # ``bal.gui.qt.theme.status_color``. The status flags above remain the # single source of truth; the GUI maps them to colours. class WillException(Exception): def __init__(self,msg="WillException"): self.msg=msg Exception.__init__(self) def __str__(self): return self.msg class WillExpiredException(WillException): pass class NotCompleteWillException(WillException): pass class HeirChangeException(NotCompleteWillException): pass class TxFeesChangedException(NotCompleteWillException): pass class HeirNotFoundException(NotCompleteWillException): pass class WillPostponedException(NotCompleteWillException): """An already signed/sent will is being postponed. When a will that has already been signed (``COMPLETE``) and/or pushed to will-executors (``PUSHED``) gets its locktime moved to a LATER date, the previously committed coins must be invalidated on-chain BEFORE rebuilding the new inheritance. Otherwise a will-executor could broadcast the old (earlier-locktime) transaction and execute the inheritance too early to collect the fees. Invalidating spends the same UTXOs now, permanently voiding the old pre-signed transaction. """ pass class WillexecutorChangeException(NotCompleteWillException): pass class NoWillExecutorNotPresent(NotCompleteWillException): pass class WillExecutorNotPresent(NotCompleteWillException): pass class NoHeirsException(WillException): pass class AmountException(WillException): pass class PercAmountException(AmountException): pass class FixedAmountException(AmountException): pass