""" Tests for multiverse feature: multiple locktimes among heirs. Run: source electrum/env/bin/activate QT_QPA_PLATFORM=offscreen python3 tests/test_multiverse.py """ import sys import os import time import copy from unittest.mock import MagicMock, patch sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir)) from bal.core.heirs import ( Heirs, prepare_transactions, HEIR_ADDRESS, HEIR_AMOUNT, HEIR_LOCKTIME, HEIR_REAL_AMOUNT, ) from bal.core.util import Util from bal.core.plugin_base import BalConfig from bal.core.will import WillItem, Will from electrum.transaction import ( PartialTransaction, PartialTxInput, TxOutpoint, ) from electrum.util import bfh # ------------------------------------------------------------------ # # Helpers # ------------------------------------------------------------------ # def fake_wallet(change_addr="1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa"): w = MagicMock() w.dust_threshold.return_value = 500 change_mock = MagicMock() change_mock.__getitem__.return_value = change_addr w.get_change_addresses_for_new_transaction.return_value = change_mock return w MAINNET_ADDR = "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa" def make_heir(name, addr, amount, locktime): return {name: [addr, amount, locktime]} # ------------------------------------------------------------------ # # parse_locktime_string – various formats # ------------------------------------------------------------------ # def test_parse_d_suffix_is_timestamp(): result = Util.parse_locktime_string("30d") assert result > 500000000 def test_parse_y_suffix_is_timestamp(): result = Util.parse_locktime_string("1y") assert result > 500000000 def test_parse_plain_int_preserved(): ts = int(time.time()) + 86400 assert Util.parse_locktime_string(str(ts)) == ts # ------------------------------------------------------------------ # # get_lowest_locktimes – returns sorted list # ------------------------------------------------------------------ # def test_get_lowest_locktimes_returns_sorted_list(): result = Util.get_lowest_locktimes(["30d", "90d", "1y"]) assert isinstance(result, list) assert len(result) == 3 for lt in result: assert lt > 500000000 assert result == sorted(result) def test_get_lowest_locktimes_empty(): assert Util.get_lowest_locktimes([]) == [] def test_get_lowest_locktimes_min(): now = int(time.time()) locktimes_list = [now + 86400, now, now + 2 * 86400] result = Util.get_lowest_locktimes(locktimes_list) assert result[0] == now assert result == sorted(result) # ------------------------------------------------------------------ # # prepare_lists with multiple locktimes (multiverse) # ------------------------------------------------------------------ # def test_prepare_lists_single_locktime(): """All heirs share the same locktime -> single group.""" wallet = fake_wallet() h = Heirs.__new__(Heirs) h.update(make_heir("a", MAINNET_ADDR, 5000, "30d")) h.update(make_heir("b", MAINNET_ADDR, 3000, "30d")) result, onlyfixed = h.prepare_lists(100000, 100, wallet) assert len(result) == 1 locktime = list(result.keys())[0] assert locktime > 500000000 assert len(result[locktime]) == 2 def test_prepare_lists_multiple_locktimes(): """Heirs with different locktimes -> multiple groups.""" wallet = fake_wallet() h = Heirs.__new__(Heirs) far_future_1 = int(time.time()) + 365 * 86400 far_future_2 = int(time.time()) + 2 * 365 * 86400 h.update(make_heir("a", MAINNET_ADDR, 5000, str(far_future_1))) h.update(make_heir("b", MAINNET_ADDR, 3000, str(far_future_2))) result, onlyfixed = h.prepare_lists(100000, 100, wallet) assert len(result) == 2 locktimes = sorted(result.keys()) assert locktimes[0] == far_future_1 assert locktimes[1] == far_future_2 def test_prepare_lists_mixed_locktimes(): """Two heirs share a locktime, one has different -> 2 groups.""" wallet = fake_wallet() h = Heirs.__new__(Heirs) far_future_1 = int(time.time()) + 365 * 86400 far_future_2 = int(time.time()) + 730 * 86400 h.update(make_heir("a", MAINNET_ADDR, 5000, str(far_future_1))) h.update(make_heir("b", MAINNET_ADDR, 3000, str(far_future_1))) h.update(make_heir("c", MAINNET_ADDR, 2000, str(far_future_2))) result, onlyfixed = h.prepare_lists(100000, 100, wallet) assert len(result) == 2 two_heir_group = [g for g in result.values() if len(g) == 2] assert len(two_heir_group) == 1 one_heir_group = [g for g in result.values() if len(g) == 1] assert len(one_heir_group) == 1 # ------------------------------------------------------------------ # # get_locktimes from different locktime styles # ------------------------------------------------------------------ # def test_get_locktimes_various(): """Mix of relative and absolute locktimes produces sorted timestamps.""" now = int(time.time()) tomorrow = now + 86400 day_after = now + 2 * 86400 h = Heirs.__new__(Heirs) h.update(make_heir("a", "addr1", 5000, "1d")) h.update(make_heir("b", "addr2", 3000, str(day_after))) h.update(make_heir("c", "addr3", 2000, str(tomorrow))) locktimes = h.get_locktimes(0) assert len(locktimes) == 3 for lt in locktimes: assert lt > 500000000 # ------------------------------------------------------------------ # # Locktime propagation: tx.nLockTime matches lowest heir locktime # ------------------------------------------------------------------ # def make_utxo(txid="deadbeef" * 8, vout=0, value=1_000_000): """Build a ``PartialTxInput`` with the minimum fields required by the transaction builder. ``prepare_transactions`` calls ``tx.remove_signatures()``, so we patch it out here.""" prevout = TxOutpoint(txid=bfh(txid), out_idx=vout) txin = PartialTxInput(prevout=prevout) txin._trusted_value_sats = value return txin def test_transaction_locktime_matches_lowest_heir(): """The built tx's nLockTime equals the resolved lowest heir locktime. With multiple locktime groups only the lowest is processed.""" lt_low = int(time.time()) + 86400 lt_high = int(time.time()) + 365 * 86400 locktimes = { lt_high: {"b": [MAINNET_ADDR, 10000, str(lt_high)]}, lt_low: {"a": [MAINNET_ADDR, 10000, str(lt_low)]}, } utxos = [make_utxo("aa" * 32, 0, 500_000)] fees = {lt_low: 100, lt_high: 100} with patch("bal.core.heirs.PartialTransaction.remove_signatures"), \ patch("bal.core.heirs.PartialTransaction.txid", side_effect=["10" * 32, "10" * 32]): wallet = fake_wallet() result = prepare_transactions(locktimes, utxos, fees, wallet) assert len(result) == 1, f"expected 1 tx (lowest locktime), got {len(result)}" tx = list(result.values())[0] assert tx.locktime == lt_low # ------------------------------------------------------------------ # # Anticipate: build txs, modify heirs, rebuild, verify replacement # ------------------------------------------------------------------ # def _make_willitem(tx, txid): """Minimal ``WillItem`` without serialization round-trip.""" wi = WillItem.__new__(WillItem) wi.tx = tx wi._id = txid wi.heirs = getattr(tx, "heirs", None) wi.we = getattr(tx, "willexecutor", None) wi.status = "" wi.STATUS = copy.deepcopy(WillItem.STATUS_DEFAULT) return wi def test_anticipate_single_heir_locktime_reduction(): """Old tx is REPLACED when the heir's locktime moves earlier.""" now = int(time.time()) lt_old = now + 10 * 86400 lt_new = now + 3 * 86400 h = Heirs.__new__(Heirs) h.update(make_heir("a", MAINNET_ADDR, 10000, str(lt_old))) locktimes_old, _ = h.prepare_lists(100000, 100, fake_wallet()) utxos = [make_utxo("10" * 32, 0, 500_000)] fees_old = {lt_old: 100} with patch("bal.core.heirs.PartialTransaction.remove_signatures"), \ patch("bal.core.heirs.PartialTransaction.txid", side_effect=["66" * 32, "66" * 32]): txs_old = prepare_transactions(locktimes_old, utxos[:], fees_old, fake_wallet()) old_txid = list(txs_old.keys())[0] old_wi = _make_willitem(list(txs_old.values())[0], old_txid) old_wi.set_status("VALID", True) h2 = Heirs.__new__(Heirs) h2.update(make_heir("a", MAINNET_ADDR, 10000, str(lt_new))) locktimes_new, _ = h2.prepare_lists(100000, 100, fake_wallet()) fees_new = {lt_new: 100} with patch("bal.core.heirs.PartialTransaction.remove_signatures"), \ patch("bal.core.heirs.PartialTransaction.txid", side_effect=["88" * 32, "88" * 32]): txs_new = prepare_transactions(locktimes_new, utxos[:], fees_new, fake_wallet()) new_txid = list(txs_new.keys())[0] new_wi = _make_willitem(list(txs_new.values())[0], new_txid) new_wi.set_status("VALID", True) will = {old_txid: old_wi, new_txid: new_wi} all_inputs = Will.get_all_inputs(will) Will.search_rai(all_inputs, utxos, will, None) assert old_wi.get_status("REPLACED"), \ "old tx should be REPLACED (new locktime is earlier)" assert not old_wi.get_status("VALID"), \ "old tx should no longer be VALID" assert new_wi.get_status("VALID"), \ "new tx should remain VALID" assert not new_wi.get_status("REPLACED"), \ "new tx should not be REPLACED" # ------------------------------------------------------------------ # # Multiverse default (plugin_base) # ------------------------------------------------------------------ # def test_multiverse_default_true(): """ENABLE_MULTIVERSE should default to True.""" config = MagicMock() config.get.side_effect = lambda key, default: default mv = BalConfig(config, "bal_enable_multiverse", True) assert mv.get() is True # ------------------------------------------------------------------ # # Main # ------------------------------------------------------------------ # if __name__ == "__main__": import inspect for name, obj in sorted(globals().items()): if name.startswith("test_") and inspect.isfunction(obj): obj() print(f" [OK] {name}") print("[OK] All multiverse tests passed")