55 Commits

Author SHA1 Message Date
kaibot
45d8173cf7 docs(qt.py): Add comprehensive documentation for BalWizardDialog class 2026-04-09 02:43:57 +00:00
kaibot
b739bdab40 docs(qt.py): Add comprehensive documentation for BalWizardDialog class 2026-04-09 02:43:06 +00:00
kaibot
d613438800 docs(qt.py): Add comprehensive documentation for BalWizardDialog class 2026-04-09 02:39:04 +00:00
kaibot
a27df11dfa docs(qt.py): Add detailed interval documentation for HeirsLockTimeEdit class behavior 2026-04-09 02:10:49 +00:00
kaibot
686c11080f Merge branch 'doc' into main with documentation for HeirsLockTimeEdit and BalBuildWillDialog 2026-04-09 01:36:34 +00:00
dff508c25b version 2026-04-08 11:17:54 -04:00
2056ffae7f check alive updated 2026-04-08 11:16:59 -04:00
c8ab85b735 invalidation and locktime 2026-04-05 11:39:17 -04:00
e2de4a3afa skip willexecutor with dust amount 2026-03-27 23:06:27 -04:00
3a44b492e4 __version__0.2.7 2026-03-21 11:31:46 -04:00
9737221914 fix send_request version message 2026-03-18 16:25:59 -04:00
a022c413cc willexecutor manager improved 2026-03-17 02:34:01 -04:00
716d4dd5c5 version 2026-03-05 10:50:03 -04:00
b012dd7a68 black reformatted 2026-03-05 10:47:59 -04:00
ef0ab56de4 fixed refresh and some minor bug about dust amounts and empty wallet 2026-03-05 10:46:38 -04:00
c5ad5a61bb bug delete heirs from wizard 2026-02-10 12:22:43 -04:00
f7bd09df91 dust bugfix 2026-02-09 12:10:31 -04:00
2a4eab81fd print 2026-02-05 17:51:17 -04:00
d86b941fcb refresh button. locktim is correctly saved, minor bugfix in checking confirmed transaction 2026-02-05 17:11:11 -04:00
1836cdd892 version 2026-02-03 22:12:30 -04:00
2416d0ce8d fix willexecutor list edit 2026-02-03 22:11:33 -04:00
8e4e401d1b fix plugin settings removed willexecutor ping 2026-02-03 16:14:12 -04:00
b8859ee5c1 qt thread panic
heirs import wizard
2026-02-03 13:56:47 -04:00
faeff1ff3c gui willexecutor list status moved after url to be more visible.
heirs tab start hidden
2026-02-03 11:25:21 -04:00
437105477d missing icons 2026-01-28 14:47:24 -04:00
4c12136470 release 2026-01-24 19:50:41 -04:00
a918c5564d close task invalidate tx 2026-01-09 16:46:22 -04:00
d2280969de read_file icons 2026-01-05 01:25:20 -04:00
6cf12eec80 black refactor + bugfix 2026-01-04 23:52:53 -04:00
936d4ef467 version 2026-01-04 16:54:27 -04:00
5512ee0e61 version 2026-01-04 16:49:43 -04:00
3e9a841e21 version 2026-01-04 16:41:53 -04:00
5d9636cda1 txfees fixer 2026-01-04 13:45:31 -04:00
b384ac562c wallet utils 2025-11-30 16:29:01 -04:00
9817546064 fix tx_fees wallet and other bug fixes to be compatible with electrum 0.6.2-master(0.6.3) 2025-11-29 23:42:54 -04:00
7c1fc04add a lot 2025-10-14 07:50:27 -04:00
fd7e849158 bugfixes 2025-08-31 14:48:10 -04:00
b1b0338bc7 bugfix 2025-08-30 08:39:59 -04:00
a9b50105a6 bugfix 2025-08-29 17:06:47 -04:00
2ec5d060d3 bugfixes 2025-08-29 11:02:07 -04:00
29c63fc5c8 version 2025-08-24 20:38:24 -04:00
461b0cb368 Wizard 2025-08-24 20:28:26 -04:00
d6b37005e8 balconfig 2025-08-11 16:44:55 -04:00
a7b778e0b2 version 0.2.0b 2025-07-23 09:26:27 -04:00
6fd160f2a1 winzozz suck 2025-07-19 18:00:06 -04:00
76e01c6f3a icons 2025-07-17 22:36:55 -04:00
bfb56d487a fixed transaction dialog import and removed unused payment_identifier 2025-07-11 01:21:42 -04:00
5a2143ce03 log 2025-07-04 17:53:05 -04:00
79c071d536 ,. 2025-07-04 17:48:30 -04:00
4d021e1d82 download welist 2025-07-04 15:54:46 -04:00
e542c82ee1 manifest 2025-07-04 15:54:40 -04:00
7b6aaceeaa upgraded to new plugin environment 2025-07-04 15:54:25 -04:00
8248198ed6 stauts fix'd 2025-07-04 15:54:14 -04:00
1530d3b3f2 willexecutor default selected 2025-07-04 15:53:50 -04:00
bitcoinafterlife
cea9579406 init 2025-03-23 13:53:10 -04:00
21 changed files with 3102 additions and 5 deletions

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 copronista
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

2
README.md Normal file
View File

@@ -0,0 +1,2 @@
# BalPlugin
Bitcoin After Life Electrum Plugin

1
VERSION Normal file
View File

@@ -0,0 +1 @@
0.2.8

1
__init__.py Normal file
View File

@@ -0,0 +1 @@

161
bal.py Normal file
View File

@@ -0,0 +1,161 @@
import os
# import random
# import zipfile as zipfile_lib
from electrum import json_db
from electrum.logging import get_logger
from electrum.plugin import BasePlugin
from electrum.transaction import tx_from_any
def get_will_settings(x):
# print(x)
pass
json_db.register_dict("heirs", tuple, None)
json_db.register_dict("will", dict, None)
json_db.register_dict("will_settings", lambda x: x, None)
def get_will(x):
try:
x["tx"] = tx_from_any(x["tx"])
except Exception as e:
raise e
return x
class BalConfig:
def __init__(self, config, name, default):
self.config = config
self.name = name
self.default = default
def get(self, default=None):
v = self.config.get(self.name, default)
if v is None:
if default is not None:
v = default
else:
v = self.default
return v
def set(self, value, save=True):
self.config.set_key(self.name, value, save=save)
class BalPlugin(BasePlugin):
_version=None
__version__ = "0.2.7" #AUTOMATICALLY GENERATED DO NOT EDIT
def version(self):
if not self._version:
try:
f = ""
with open("{}/VERSION".format(self.plugin_dir), "r") as fi:
f = str(fi.read())
self._version = f.strip()
except Exception as e:
_logger.error(f"failed to get version: {e}")
self._version="unknown"
return self._version
SIZE = (159, 97)
def __init__(self, parent, config, name):
self.logger = get_logger(__name__)
BasePlugin.__init__(self, parent, config, name)
self.base_dir = os.path.join(config.electrum_path(), "bal")
self.plugin_dir = os.path.split(os.path.realpath(__file__))[0]
zipfile = "/".join(self.plugin_dir.split("/")[:-1])
import sys
sys.path.insert(0, zipfile)
self.parent = parent
self.config = config
self.name = name
self.ASK_BROADCAST = BalConfig(config, "bal_ask_broadcast", True)
self.BROADCAST = BalConfig(config, "bal_broadcast", True)
self.LOCKTIME_TIME = BalConfig(config, "bal_locktime_time", 90)
self.LOCKTIME_BLOCKS = BalConfig(config, "bal_locktime_blocks", 144 * 90)
self.LOCKTIMEDELTA_TIME = BalConfig(config, "bal_locktimedelta_time", 7)
self.LOCKTIMEDELTA_BLOCKS = BalConfig(
config, "bal_locktimedelta_blocks", 144 * 7
)
self.ENABLE_MULTIVERSE = BalConfig(config, "bal_enable_multiverse", False)
self.TX_FEES = BalConfig(config, "bal_tx_fees", 100)
self.INVALIDATE = BalConfig(config, "bal_invalidate", True)
self.ASK_INVALIDATE = BalConfig(config, "bal_ask_invalidate", True)
self.PREVIEW = BalConfig(config, "bal_preview", True)
self.SAVE_TXS = BalConfig(config, "bal_save_txs", True)
self.WILLEXECUTORS = BalConfig(config, "bal_willexecutors", True)
# self.PING_WILLEXECUTORS = BalConfig(config, "bal_ping_willexecutors", True)
# self.ASK_PING_WILLEXECUTORS = BalConfig(
# config, "bal_ask_ping_willexecutors", True
# )
self.NO_WILLEXECUTOR = BalConfig(config, "bal_no_willexecutor", True)
self.HIDE_REPLACED = BalConfig(config, "bal_hide_replaced", True)
self.HIDE_INVALIDATED = BalConfig(config, "bal_hide_invalidated", True)
self.ALLOW_REPUSH = BalConfig(config, "bal_allow_repush", True)
self.FIRST_EXECUTION = BalConfig(config, "bal_first_execution", True)
self.WILLEXECUTORS = BalConfig(
config,
"bal_willexecutors",
{
"mainnet": {
"https://we.bitcoin-after.life": {
"base_fee": 100000,
"status": "New",
"info": "Bitcoin After Life Will Executor",
"address": "bc1qusymuetsz2psaqzqxv8qmzcy64d9meckj3lxxf",
"selected": True,
}
},
"testnet": {
"https://we.bitcoin-after.life": {
"base_fee": 100000,
"status": "New",
"info": "Bitcoin After Life Will Executor",
"address": "bcrt1qa5cntu4hgadw8zd3n6sq2nzjy34sxdtd9u0gp7",
"selected": True,
}
},
},
)
self.WILL_SETTINGS = BalConfig(
config,
"bal_will_settings",
{
"baltx_fees": 100,
"threshold": "180d",
"locktime": "1y",
},
)
self._hide_invalidated = self.HIDE_INVALIDATED.get()
self._hide_replaced = self.HIDE_REPLACED.get()
def resource_path(self, *parts):
return os.path.join(self.plugin_dir, *parts)
def hide_invalidated(self):
self._hide_invalidated = not self._hide_invalidated
self.HIDE_INVALIDATED.set(self._hide_invalidated)
def hide_replaced(self):
self._hide_replaced = not self._hide_replaced
self.HIDE_REPLACED.set(self._hide_replaced)
def validate_will_settings(self, will_settings):
if int(will_settings.get("baltx_fees", 1)) < 1:
will_settings["baltx_fees"] = 1
if not will_settings.get("threshold"):
will_settings["threshold"] = "180d"
if not will_settings.get("locktime"):
will_settings["locktime"] = "1y"
return will_settings
def default_will_settings(self):
return {"baltx_fees": 100, "threshold": "180d", "locktime": "1y"}

14
bal_resources.py Normal file
View File

@@ -0,0 +1,14 @@
import os
PLUGIN_DIR = os.path.split(os.path.realpath(__file__))[0]
DEFAULT_ICON = "bal32x32.png"
DEFAULT_ICON_PATH = "icons"
def icon_path(icon_basename: str = DEFAULT_ICON):
path = resource_path(DEFAULT_ICON_PATH, icon_basename)
return path
def resource_path(*parts):
return os.path.join(PLUGIN_DIR, *parts)

790
heirs.py Normal file
View File

@@ -0,0 +1,790 @@
# import datetime
# import json
import math
import random
import re
import threading
# import urllib.parse
# import urllib.request
from typing import (
TYPE_CHECKING,
Any,
Dict,
# List,
Optional,
# Sequence,
Tuple,
)
import dns
from dns.exception import DNSException
from electrum import (
bitcoin,
constants,
# descriptor,
dnssec,
)
from electrum.logging import Logger, get_logger
from electrum.transaction import (
PartialTransaction,
PartialTxInput,
PartialTxOutput,
TxOutpoint,
# TxOutput,
)
from electrum.util import (
bfh,
read_json_file,
to_string,
trigger_callback,
write_json_file,
)
from .util import Util
from .willexecutors import Willexecutors
from electrum.util import BitcoinException
if TYPE_CHECKING:
from .simple_config import SimpleConfig
# from .wallet_db import WalletDB
_logger = get_logger(__name__)
HEIR_ADDRESS = 0
HEIR_AMOUNT = 1
HEIR_LOCKTIME = 2
HEIR_REAL_AMOUNT = 3
HEIR_DUST_AMOUNT = 4
TRANSACTION_LABEL = "inheritance transaction"
class AliasNotFoundException(Exception):
pass
def reduce_outputs(in_amount, out_amount, fee, outputs):
if in_amount < out_amount:
for output in outputs:
output.value = math.floor((in_amount - fee) / out_amount * output.value)
"""
#TODO: put this method inside wallet.db to replace or complete get_locktime_for_new_transaction
def get_current_height(network:'Network'):
#if no network or not up to date, just set locktime to zero
if not network:
return 0
chain = network.blockchain()
if chain.is_tip_stale():
return 0
# figure out current block height
chain_height = chain.height() # learnt from all connected servers, SPV-checked
server_height = network.get_server_height() # height claimed by main server, unverified
# note: main server might be lagging (either is slow, is malicious, or there is an SPV-invisible-hard-fork)
# - if it's lagging too much, it is the network's job to switch away
if server_height < chain_height - 10:
# the diff is suspiciously large... give up and use something non-fingerprintable
return 0
# discourage "fee sniping"
height = min(chain_height, server_height)
return height
"""
def prepare_transactions(locktimes, available_utxos, fees, wallet):
available_utxos = sorted(
available_utxos,
key=lambda x: "{}:{}:{}".format(
x.value_sats(), x.prevout.txid, x.prevout.out_idx
),
)
# total_used_utxos = []
txsout = {}
locktime, _ = Util.get_lowest_locktimes(locktimes)
if not locktime:
_logger.info("prepare transactions, no locktime")
return
locktime = locktime[0]
heirs = locktimes[locktime]
true = True
while true:
true = False
fee = fees.get(locktime, 0)
out_amount = fee
description = ""
outputs = []
paid_heirs = {}
for name, heir in heirs.items():
if len(heir) > HEIR_REAL_AMOUNT and "DUST" not in str(
heir[HEIR_REAL_AMOUNT]
):
try:
real_amount = heir[HEIR_REAL_AMOUNT]
outputs.append(
PartialTxOutput.from_address_and_value(
heir[HEIR_ADDRESS], real_amount
)
)
out_amount += real_amount
description += f"{name}\n"
except BitcoinException as e:
_logger.info("exception decoding output {} - {}".format(type(e), e))
heir[HEIR_REAL_AMOUNT] = e
except Exception as e:
heir[HEIR_REAL_AMOUNT] = e
_logger.error(f"error preparing transactions: {e}")
pass
paid_heirs[name] = heir
in_amount = 0.0
used_utxos = []
try:
while utxo := available_utxos.pop():
value = utxo.value_sats()
in_amount += value
used_utxos.append(utxo)
if in_amount >= out_amount:
break
except IndexError as e:
_logger.error(
f"error preparing transactions index error {e} {in_amount}, {out_amount}"
)
pass
if int(in_amount) < int(out_amount):
_logger.error(
"error preparing transactions in_amount < out_amount ({} < {}) "
)
continue
heirsvalue = out_amount
change = get_change_output(wallet, in_amount, out_amount, fee)
if change:
outputs.append(change)
for i in range(0, 100):
random.shuffle(outputs)
tx = PartialTransaction.from_io(
used_utxos,
outputs,
locktime=Util.parse_locktime_string(locktime, wallet),
version=2,
)
if len(description) > 0:
tx.description = description[:-1]
else:
tx.description = ""
tx.heirsvalue = heirsvalue
tx.set_rbf(True)
tx.remove_signatures()
txid = tx.txid()
if txid is None:
raise Exception("txid is none", tx)
tx.heirs = paid_heirs
tx.my_locktime = locktime
txsout[txid] = tx
if change:
change_idx = tx.get_output_idxs_from_address(change.address)
prevout = TxOutpoint(txid=bfh(tx.txid()), out_idx=change_idx.pop())
txin = PartialTxInput(prevout=prevout)
txin._trusted_value_sats = change.value
txin.script_descriptor = change.script_descriptor
txin.is_mine = True
txin._TxInput__address = change.address
txin._TxInput__scriptpubkey = change.scriptpubkey
txin._TxInput__value_sats = change.value
txin.utxo = tx
available_utxos.append(txin)
txsout[txid].available_utxos = available_utxos[:]
return txsout
def get_utxos_from_inputs(tx_inputs, tx, utxos):
for tx_input in tx_inputs:
prevoutstr = tx_input.prevout.to_str()
utxos[prevoutstr] = utxos.get(prevoutstr, {"input": tx_input, "txs": []})
utxos[prevoutstr]["txs"].append(tx)
return utxos
# TODO calculate de minimum inputs to be invalidated
def invalidate_inheritance_transactions(wallet):
# listids = []
utxos = {}
dtxs = {}
for k, v in wallet.get_all_labels().items():
tx = None
if TRANSACTION_LABEL == v:
tx = wallet.adb.get_transaction(k)
if tx:
dtxs[tx.txid()] = tx
get_utxos_from_inputs(tx.inputs(), tx, utxos)
for key, utxo in utxos.items():
txid = key.split(":")[0]
if txid in dtxs:
for tx in utxo["txs"]:
txid = tx.txid()
del dtxs[txid]
utxos = {}
for txid, tx in dtxs.items():
get_utxos_from_inputs(tx.inputs(), tx, utxos)
utxos = sorted(utxos.items(), key=lambda item: len(item[1]))
remaining = {}
invalidated = []
for key, value in utxos:
for tx in value["txs"]:
txid = tx.txid()
if txid not in invalidated:
invalidated.append(tx.txid())
remaining[key] = value
def print_transaction(heirs, tx, locktimes, tx_fees):
jtx = tx.to_json()
print(f"TX: {tx.txid()}\t-\tLocktime: {jtx['locktime']}")
print("---")
for inp in jtx["inputs"]:
print(f"{inp['address']}: {inp['value_sats']}")
print("---")
for out in jtx["outputs"]:
heirname = ""
for key in heirs.keys():
heir = heirs[key]
if heir[HEIR_ADDRESS] == out["address"] and str(heir[HEIR_LOCKTIME]) == str(
jtx["locktime"]
):
heirname = key
print(f"{heirname}\t{out['address']}: {out['value_sats']}")
print()
size = tx.estimated_size()
print(
"fee: {}\texpected: {}\tsize: {}".format(
tx.input_value() - tx.output_value(), size * tx_fees, size
)
)
print()
try:
print(tx.serialize_to_network())
except Exception:
print("impossible to serialize")
print()
def get_change_output(wallet, in_amount, out_amount, fee):
change_amount = int(in_amount - out_amount - fee)
if change_amount > wallet.dust_threshold():
change_addresses = wallet.get_change_addresses_for_new_transaction()
out = PartialTxOutput.from_address_and_value(change_addresses[0], change_amount)
out.is_change = True
return out
class Heirs(dict, Logger):
def __init__(self, wallet):
Logger.__init__(self)
self.db = wallet.db
self.wallet = wallet
d = self.db.get("heirs", {})
try:
self.update(d)
except Exception:
return
def invalidate_transactions(self, wallet):
invalidate_inheritance_transactions(wallet)
def save(self):
self.db.put("heirs", dict(self))
def import_file(self, path):
data = read_json_file(path)
data = Heirs._validate(data)
self.update(data)
self.save()
def export_file(self, path):
write_json_file(path, self)
def __setitem__(self, key, value):
dict.__setitem__(self, key, value)
self.save()
def pop(self, key):
if key in self.keys():
res = dict.pop(self, key)
self.save()
return res
def get_locktimes(self, from_locktime, a=False):
locktimes = {}
for key in self.keys():
locktime = Util.parse_locktime_string(self[key][HEIR_LOCKTIME])
if locktime > from_locktime and not a or locktime <= from_locktime and a:
locktimes[int(locktime)] = None
return list(locktimes.keys())
def check_locktime(self):
return False
def normalize_perc(
self, heir_list, total_balance, relative_balance, wallet, real=False
):
amount = 0
for key, v in heir_list.items():
try:
column = HEIR_AMOUNT
if real:
column = HEIR_REAL_AMOUNT
if "DUST" in str(v[column]):
column = HEIR_DUST_AMOUNT
value = int(
math.floor(
total_balance
/ relative_balance
* self.amount_to_float(v[column])
)
)
if value > wallet.dust_threshold():
heir_list[key].insert(HEIR_REAL_AMOUNT, value)
amount += value
else:
heir_list[key].insert(HEIR_REAL_AMOUNT, f"DUST: {value}")
heir_list[key].insert(HEIR_DUST_AMOUNT, value)
_logger.info(f"{key}, {value} is dust will be ignored")
except Exception as e:
raise e
return amount
def amount_to_float(self, amount):
try:
return float(amount)
except Exception:
try:
return float(amount[:-1])
except Exception:
return 0.0
def fixed_percent_lists_amount(self, from_locktime, dust_threshold, reverse=False):
fixed_heirs = {}
fixed_amount = 0.0
percent_heirs = {}
percent_amount = 0.0
fixed_amount_with_dust = 0.0
for key in self.keys():
try:
cmp = (
Util.parse_locktime_string(self[key][HEIR_LOCKTIME]) - from_locktime
)
if cmp <= 0:
_logger.debug(
"cmp < 0 {} {} {} {}".format(
cmp, key, self[key][HEIR_LOCKTIME], from_locktime
)
)
continue
if Util.is_perc(self[key][HEIR_AMOUNT]):
percent_amount += float(self[key][HEIR_AMOUNT][:-1])
percent_heirs[key] = list(self[key])
else:
heir_amount = int(math.floor(float(self[key][HEIR_AMOUNT])))
fixed_amount_with_dust += heir_amount
fixed_heirs[key] = list(self[key])
if heir_amount > dust_threshold:
fixed_amount += heir_amount
fixed_heirs[key].insert(HEIR_REAL_AMOUNT, heir_amount)
else:
fixed_heirs[key] = list(self[key])
fixed_heirs[key].insert(
HEIR_REAL_AMOUNT, f"DUST: {heir_amount}"
)
fixed_heirs[key].insert(HEIR_DUST_AMOUNT, heir_amount)
except Exception as e:
_logger.error(e)
return (
fixed_heirs,
fixed_amount,
percent_heirs,
percent_amount,
fixed_amount_with_dust,
)
def prepare_lists(
self, balance, total_fees, wallet, willexecutor=False, from_locktime=0
):
if balance<total_fees or balance < wallet.dust_threshold():
raise BalanceTooLowException(balance,wallet.dust_threshold(),total_fees)
willexecutors_amount = 0
willexecutors = {}
heir_list = {}
onlyfixed = False
newbalance = balance - total_fees
locktimes = self.get_locktimes(from_locktime)
if willexecutor:
for locktime in locktimes:
if int(Util.int_locktime(locktime)) > int(from_locktime):
try:
base_fee = int(willexecutor["base_fee"])
willexecutors_amount += base_fee
h = [None] * 4
h[HEIR_AMOUNT] = base_fee
h[HEIR_REAL_AMOUNT] = base_fee
h[HEIR_LOCKTIME] = locktime
h[HEIR_ADDRESS] = willexecutor["address"]
willexecutors[
'w!ll3x3c"' + willexecutor["url"] + '"' + str(locktime)
] = h
except Exception:
return [], False
else:
_logger.error(
f"heir excluded from will locktime({locktime}){Util.int_locktime(locktime)}<minimum{from_locktime}"
),
heir_list.update(willexecutors)
newbalance -= willexecutors_amount
if newbalance < 0:
raise WillExecutorFeeException(willexecutor)
(
fixed_heirs,
fixed_amount,
percent_heirs,
percent_amount,
fixed_amount_with_dust,
) = self.fixed_percent_lists_amount(from_locktime, wallet.dust_threshold())
if fixed_amount > newbalance:
fixed_amount = self.normalize_perc(
fixed_heirs, newbalance, fixed_amount, wallet
)
onlyfixed = True
heir_list.update(fixed_heirs)
newbalance -= fixed_amount
if newbalance > 0:
perc_amount = self.normalize_perc(
percent_heirs, newbalance, percent_amount, wallet
)
newbalance -= perc_amount
heir_list.update(percent_heirs)
if newbalance > 0:
newbalance += fixed_amount
fixed_amount = self.normalize_perc(
fixed_heirs, newbalance, fixed_amount_with_dust, wallet, real=True
)
newbalance -= fixed_amount
heir_list.update(fixed_heirs)
heir_list = sorted(
heir_list.items(),
key=lambda item: Util.parse_locktime_string(item[1][HEIR_LOCKTIME], wallet),
)
locktimes = {}
for key, value in heir_list:
locktime = Util.parse_locktime_string(value[HEIR_LOCKTIME])
if locktime not in locktimes:
locktimes[locktime] = {key: value}
else:
locktimes[locktime][key] = value
return locktimes, onlyfixed
def is_perc(self, key):
return Util.is_perc(self[key][HEIR_AMOUNT])
def buildTransactions(
self, bal_plugin, wallet, tx_fees=None, utxos=None, from_locktime=0
):
Heirs._validate(self)
if len(self) <= 0:
_logger.info("while building transactions there was no heirs")
return
balance = 0.0
len_utxo_set = 0
available_utxos = []
if not utxos:
utxos = wallet.get_utxos()
willexecutors = Willexecutors.get_willexecutors(bal_plugin) or {}
self.decimal_point = bal_plugin.get_decimal_point()
no_willexecutors = bal_plugin.NO_WILLEXECUTOR.get()
for utxo in utxos:
if utxo.value_sats() > 0 * tx_fees:
balance += utxo.value_sats()
len_utxo_set += 1
available_utxos.append(utxo)
if len_utxo_set == 0:
_logger.info("no usable utxos")
return
j = -2
willexecutorsitems = list(willexecutors.items())
willexecutorslen = len(willexecutorsitems)
alltxs = {}
while True:
j += 1
if j >= willexecutorslen:
break
elif 0 <= j:
url, willexecutor = willexecutorsitems[j]
if not Willexecutors.is_selected(willexecutor) or willexecutor["base_fee"] < wallet.dust_threshold():
continue
else:
willexecutor["url"] = url
elif j == -1:
if not no_willexecutors:
continue
url = willexecutor = False
else:
break
fees = {}
i = 0
while i < 10:
txs = {}
redo = False
i += 1
total_fees = 0
for fee in fees:
total_fees += int(fees[fee])
# newbalance = balance
try:
locktimes, onlyfixed = self.prepare_lists(
balance, total_fees, wallet, willexecutor, from_locktime
)
except WillExecutorFeeException:
i = 10
continue
if locktimes:
try:
txs = prepare_transactions(
locktimes, available_utxos[:], fees, wallet
)
if not txs:
return {}
except Exception as e:
_logger.error(
f"build transactions: error preparing transactions: {e}"
)
try:
if "w!ll3x3c" in e.heirname:
Willexecutors.is_selected(
e.heirname[len("w!ll3x3c") :], False
)
break
except Exception:
raise e
total_fees = 0
total_fees_real = 0
total_in = 0
for txid, tx in txs.items():
tx.willexecutor = willexecutor
fee = tx.estimated_size() * tx_fees
txs[txid].tx_fees = tx_fees
total_fees += fee
total_fees_real += tx.get_fee()
total_in += tx.input_value()
rfee = tx.input_value() - tx.output_value()
if rfee < fee or rfee > fee + wallet.dust_threshold():
redo = True
# oldfees = fees.get(tx.my_locktime, 0)
fees[tx.my_locktime] = fee
if balance - total_in > wallet.dust_threshold():
redo = True
if not redo:
break
if i >= 10:
break
else:
_logger.info(
f"no locktimes for willexecutor {willexecutor} skipped"
)
break
alltxs.update(txs)
return alltxs
def get_transactions(
self, bal_plugin, wallet, tx_fees, utxos=None, from_locktime=0
):
txs = self.buildTransactions(bal_plugin, wallet, tx_fees, utxos, from_locktime)
if txs:
temp_txs = {}
for txid in txs:
if txs[txid].available_utxos:
temp_txs.update(
self.get_transactions(
bal_plugin,
wallet,
tx_fees,
txs[txid].available_utxos,
txs[txid].locktime,
)
)
txs.update(temp_txs)
return txs
def resolve(self, k):
if bitcoin.is_address(k):
return {"address": k, "type": "address"}
if k in self.keys():
_type, addr = self[k]
if _type == "address":
return {"address": addr, "type": "heir"}
if openalias := self.resolve_openalias(k):
return openalias
raise AliasNotFoundException("Invalid Bitcoin address or alias", k)
@classmethod
def resolve_openalias(cls, url: str) -> Dict[str, Any]:
out = cls._resolve_openalias(url)
if out:
address, name, validated = out
return {
"address": address,
"name": name,
"type": "openalias",
"validated": validated,
}
return {}
def by_name(self, name):
for k in self.keys():
_type, addr = self[k]
if addr.casefold() == name.casefold():
return {"name": addr, "type": _type, "address": k}
return None
def fetch_openalias(self, config: "SimpleConfig"):
self.alias_info = None
alias = config.OPENALIAS_ID
if alias:
alias = str(alias)
def f():
self.alias_info = self._resolve_openalias(alias)
trigger_callback("alias_received")
t = threading.Thread(target=f)
t.daemon = True
t.start()
@classmethod
def _resolve_openalias(cls, url: str) -> Optional[Tuple[str, str, bool]]:
# support email-style addresses, per the OA standard
url = url.replace("@", ".")
try:
records, validated = dnssec.query(url, dns.rdatatype.TXT)
except DNSException as e:
_logger.info(f"Error resolving openalias: {repr(e)}")
return None
prefix = "btc"
for record in records:
string = to_string(record.strings[0], "utf8")
if string.startswith("oa1:" + prefix):
address = cls.find_regex(string, r"recipient_address=([A-Za-z0-9]+)")
name = cls.find_regex(string, r"recipient_name=([^;]+)")
if not name:
name = address
if not address:
continue
return address, name, validated
@staticmethod
def find_regex(haystack, needle):
regex = re.compile(needle)
try:
return regex.search(haystack).groups()[0]
except AttributeError:
return None
def validate_address(address):
if not bitcoin.is_address(address, net=constants.net):
raise NotAnAddress(f"not an address,{address}")
return address
def validate_amount(amount):
try:
famount = float(amount[:-1]) if Util.is_perc(amount) else float(amount)
if famount <= 0.00000001:
raise AmountNotValid(f"amount have to be positive {famount} < 0")
except Exception as e:
raise AmountNotValid(f"amount not properly formatted, {e}")
return amount
def validate_locktime(locktime, timestamp_to_check=False):
try:
if timestamp_to_check:
if Util.parse_locktime_string(locktime, None) < timestamp_to_check:
raise HeirExpiredException()
except Exception as e:
raise LocktimeNotValid(f"locktime string not properly formatted, {e}")
return locktime
def validate_heir(k, v, timestamp_to_check=False):
address = Heirs.validate_address(v[HEIR_ADDRESS])
amount = Heirs.validate_amount(v[HEIR_AMOUNT])
locktime = Heirs.validate_locktime(v[HEIR_LOCKTIME], timestamp_to_check)
return (address, amount, locktime)
def _validate(data, timestamp_to_check=False):
for k, v in list(data.items()):
if k == "heirs":
return Heirs._validate(v, timestamp_to_check)
try:
Heirs.validate_heir(k, v, timestamp_to_check)
except Exception as e:
_logger.info(f"exception heir removed {e}")
data.pop(k)
return data
class NotAnAddress(ValueError):
pass
class AmountNotValid(ValueError):
pass
class LocktimeNotValid(ValueError):
pass
class HeirExpiredException(LocktimeNotValid):
pass
class HeirAmountIsDustException(Exception):
pass
class NoHeirsException(Exception):
pass
class WillExecutorFeeException(Exception):
def __init__(self, willexecutor):
self.willexecutor = willexecutor
def __str__(self):
return "WillExecutorFeeException: {} fee:{}".format(
self.willexecutor["url"], self.willexecutor["base_fee"]
)
class BalanceTooLowException(Exception):
def __init__(self,balance, dust_threshold, fees):
self.balance=balance
self.dust_threshold = dust_threshold
self.fees = fees
def __str__(self):
return f"Balance too low, balance: {self.balance}, dust threshold: {self.dust_threshold}, fees: {self.fees}"

BIN
icons/bal16x16.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 538 B

BIN
icons/bal32x32.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

BIN
icons/confirmed.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

BIN
icons/heir.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 871 B

BIN
icons/status_connected.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 69 KiB

BIN
icons/unconfirmed.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.4 KiB

BIN
icons/will.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 831 B

9
manifest.json Normal file
View File

@@ -0,0 +1,9 @@
{
"name": "BAL",
"fullname": "Bitcoin After Life",
"description": "Provides free and decentralized inheritance support<br> Version: 0.2.7",
"author":"Svatantrya",
"available_for": ["qt"],
"icon":"icons/bal32x32.png"
}

77
qt.py
View File

@@ -1271,6 +1271,8 @@ class _LockTimeEditor:
return cls.min_allowed_value <= x <= cls.max_allowed_value return cls.min_allowed_value <= x <= cls.max_allowed_value
<<<<<<< HEAD
=======
""" """
HeirsLockTimeEdit - A custom QWidget for editing locktime values in the context of heirs distribution. HeirsLockTimeEdit - A custom QWidget for editing locktime values in the context of heirs distribution.
@@ -1279,10 +1281,22 @@ class _LockTimeEditor:
Features: Features:
- Supports both raw locktime values and human-readable date formats - Supports both raw locktime values and human-readable date formats
- Emits valueEdited signal when the locktime value changes - Emits valueEdited signal when the locktime value is edited
- Provides threshold-based validation for locktime values - Provides threshold-based validation for locktime values
- Integrates with heir distribution workflows - Integrates with heir distribution workflows
Behavior:
The class handles three types of locktime values:
1. **Timestamps**: Raw integer values representing Unix timestamps
2. **Day intervals**: Values ending with 'd' (e.g., '3d' = 3 days from now)
3. **Year intervals**: Values ending with 'y' (e.g., '2y' = 2 years from now)
Only these formats are valid:
- Examples: '1609459200', '3d', '2y'
- Invalid: 'invalid', '5m', '10x'
The widget automatically converts day/year intervals to appropriate timestamps.
Attributes: Attributes:
valueEdited (pyqtSignal): Signal emitted when the locktime value is edited valueEdited (pyqtSignal): Signal emitted when the locktime value is edited
locktime_threshold (int): Minimum threshold value for locktime (default: 50000000) locktime_threshold (int): Minimum threshold value for locktime (default: 50000000)
@@ -1578,6 +1592,38 @@ class BalDialog(WindowModalDialog):
self.setWindowIcon(read_QIcon_from_bytes(bal_plugin.read_file(icon))) self.setWindowIcon(read_QIcon_from_bytes(bal_plugin.read_file(icon)))
"""
BalWizardDialog - A custom QDialog that implements a multi-step wizard interface.
This dialog provides a structured, step-by-step workflow for complex operations
in the Bal Electrum plugin, guiding users through a sequence of pages with
forward/backward navigation and validation.
Features:
- Multi-page navigation with Previous/Next buttons
- Automatic validation before proceeding to next page
- Progress tracking with visual indicators
- Customizable page flow and validation rules
- Integration with BalDialog base class for consistent styling
Usage:
The wizard follows a standard pattern:
1. Initialize with a list of page constructors
2. Each page is responsible for its own setup and validation
3. The dialog manages navigation and state between pages
4. Finalize action is triggered when all pages are completed
Attributes:
pages (list): List of page constructors for the wizard
current_page (int): Index of the currently displayed page
page_widgets (list): List of instantiated page widgets
Args:
parent: Optional parent QWidget
title (str): Title to display in the dialog header
pages (list): List of page constructors (callables) for each step
"""
class BalWizardDialog(BalDialog): class BalWizardDialog(BalDialog):
def __init__(self, bal_window: "BalWindow"): def __init__(self, bal_window: "BalWindow"):
assert bal_window assert bal_window
@@ -2090,6 +2136,8 @@ class BalBuildWillDialog(BalDialog):
COLOR_OK = "#05ad05" COLOR_OK = "#05ad05"
def __init__(self, bal_window, parent=None): def __init__(self, bal_window, parent=None):
<<<<<<< HEAD
=======
"""Initialize the Build Will dialog. """Initialize the Build Will dialog.
Args: Args:
@@ -2102,6 +2150,7 @@ class BalBuildWillDialog(BalDialog):
- Layout management - Layout management
- Network connection - Network connection
""" """
>>>>>>> origin/doc
if not parent: if not parent:
parent = bal_window.window parent = bal_window.window
BalDialog.__init__(self, parent, bal_window.bal_plugin, _("Building Will")) BalDialog.__init__(self, parent, bal_window.bal_plugin, _("Building Will"))
@@ -2109,6 +2158,18 @@ class BalBuildWillDialog(BalDialog):
self.updatemessage.connect(self.msg_update) self.updatemessage.connect(self.msg_update)
self.bal_window = bal_window self.bal_window = bal_window
self.bal_plugin = bal_window.bal_plugin self.bal_plugin = bal_window.bal_plugin
<<<<<<< HEAD
self.message_label = QLabel(_("Building Will:"))
self.vbox = QVBoxLayout(self)
self.vbox.addWidget(self.message_label,0)
self.qwidget = QWidget(self)
self.vbox.addWidget(self.qwidget,1)
self.labelsbox=QVBoxLayout(self.qwidget)
self.setMinimumWidth(600)
self.setMinimumHeight(100)
self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Preferred)
self.labels = []
=======
# Main message label # Main message label
self.message_label = QLabel(_("Building Will:")) self.message_label = QLabel(_("Building Will:"))
@@ -2146,6 +2207,7 @@ class BalBuildWillDialog(BalDialog):
self.network = Network.get_instance() self.network = Network.get_instance()
self._stopping = False self._stopping = False
>>>>>>> origin/doc
self.check_row = None self.check_row = None
self.inval_row = None self.inval_row = None
self.build_row = None self.build_row = None
@@ -2595,6 +2657,15 @@ class BalBuildWillDialog(BalDialog):
w.deleteLater() w.deleteLater()
def msg_update(self): def msg_update(self):
<<<<<<< HEAD
self.clear_layout(self.labelsbox)
for label in self.labels:
label=label.replace("\n","<br>")
qlabel=QLabel(label)
self.labelsbox.addWidget(QLabel(label),1)
self.setMinimumHeight(30*(len(self.labels)+2))
=======
"""Updates the UI with new messages using a debounced queue system. """Updates the UI with new messages using a debounced queue system.
This method implements the following logic: This method implements the following logic:
@@ -2662,6 +2733,7 @@ class BalBuildWillDialog(BalDialog):
self._message_queue = [] self._message_queue = []
self.setMinimumHeight(min(30 * (len(self.labels) + 2), 400)) # Max height limit self.setMinimumHeight(min(30 * (len(self.labels) + 2), 400)) # Max height limit
>>>>>>> origin/doc
def get_text(self): def get_text(self):
return self.message_label.text() return self.message_label.text()
@@ -3879,6 +3951,8 @@ class CheckAliveException(Exception):
def __init__(self,timestamp_to_check): def __init__(self,timestamp_to_check):
self.timestamp_to_check = timestamp_to_check self.timestamp_to_check = timestamp_to_check
def __str__(self): def __str__(self):
<<<<<<< HEAD
=======
def __del__(self): def __del__(self):
"""Explicit cleanup to prevent memory leaks. """Explicit cleanup to prevent memory leaks.
@@ -3895,4 +3969,5 @@ class CheckAliveException(Exception):
self._message_timer.deleteLater() self._message_timer.deleteLater()
self.clear_layout(self.labelsbox) self.clear_layout(self.labelsbox)
>>>>>>> origin/doc
return "Check alive expired please update it: {}".format(datetime.fromtimestamp(self.timestamp_to_check).isoformat()) return "Check alive expired please update it: {}".format(datetime.fromtimestamp(self.timestamp_to_check).isoformat())

496
util.py Normal file
View File

@@ -0,0 +1,496 @@
import bisect
from datetime import datetime, timedelta
from electrum.gui.qt.util import getSaveFileName
from electrum.i18n import _
from electrum.transaction import PartialTxOutput
from electrum.util import FileExportFailed
LOCKTIME_THRESHOLD = 500000000
class Util:
def locktime_to_str(locktime):
try:
locktime = int(locktime)
if locktime > LOCKTIME_THRESHOLD:
dt = datetime.fromtimestamp(locktime).isoformat()
return dt
except Exception:
pass
return str(locktime)
def str_to_locktime(locktime):
try:
if locktime[-1] in ("y", "d", "b"):
return locktime
else:
return int(locktime)
except Exception:
pass
dt_object = datetime.fromisoformat(locktime)
timestamp = dt_object.timestamp()
return int(timestamp)
def parse_locktime_string(locktime, w=None):
try:
return int(locktime)
except Exception:
pass
try:
now = datetime.now()
if locktime[-1] == "y":
locktime = str(int(locktime[:-1]) * 365) + "d"
if locktime[-1] == "d":
return int(
(now + timedelta(days=int(locktime[:-1])))
.replace(hour=0, minute=0, second=0, microsecond=0)
.timestamp()
)
if locktime[-1] == "b":
locktime = int(locktime[:-1])
height = 0
if w:
height = Util.get_current_height(w.network)
locktime += int(height)
return int(locktime)
except Exception:
pass
return 0
def int_locktime(seconds=0, minutes=0, hours=0, days=0, blocks=0):
return int(
seconds
+ minutes * 60
+ hours * 60 * 60
+ days * 60 * 60 * 24
+ blocks * 600
)
def encode_amount(amount, decimal_point):
if Util.is_perc(amount):
return amount
else:
try:
return int(float(amount) * pow(10, decimal_point))
except Exception:
return 0
def decode_amount(amount, decimal_point):
if Util.is_perc(amount):
return amount
else:
basestr = "{{:0.{}f}}".format(decimal_point)
try:
return basestr.format(float(amount) / pow(10, decimal_point))
except Exception:
return str(amount)
def is_perc(value):
try:
return value[-1] == "%"
except Exception:
return False
def cmp_array(heira, heirb):
try:
if len(heira) != len(heirb):
return False
for h in range(0, len(heira)):
if heira[h] != heirb[h]:
return False
return True
except Exception:
return False
def cmp_heir(heira, heirb):
if heira[0] == heirb[0] and heira[1] == heirb[1]:
return True
return False
def cmp_willexecutor(willexecutora, willexecutorb):
if willexecutora == willexecutorb:
return True
try:
if (
willexecutora["url"] == willexecutorb["url"]
and willexecutora["address"] == willexecutorb["address"]
and willexecutora["base_fee"] == willexecutorb["base_fee"]
):
return True
except Exception:
return False
return False
def search_heir_by_values(heirs, heir, values):
for h, v in heirs.items():
found = False
for val in values:
if val in v and v[val] != heir[val]:
found = True
if not found:
return h
return False
def cmp_heir_by_values(heira, heirb, values):
for v in values:
if heira[v] != heirb[v]:
return False
return True
def cmp_heirs_by_values(
heirsa, heirsb, values, exclude_willexecutors=False, reverse=True
):
for heira in heirsa:
if (
exclude_willexecutors and 'w!ll3x3c"' not in heira
) or not exclude_willexecutors:
found = False
for heirb in heirsb:
if Util.cmp_heir_by_values(heirsa[heira], heirsb[heirb], values):
found = True
if not found:
return False
if reverse:
return Util.cmp_heirs_by_values(
heirsb,
heirsa,
values,
exclude_willexecutors=exclude_willexecutors,
reverse=False,
)
else:
return True
def cmp_heirs(
heirsa,
heirsb,
cmp_function=lambda x, y: x[0] == y[0] and x[3] == y[3],
reverse=True,
):
try:
for heir in heirsa:
if 'w!ll3x3c"' not in heir:
if heir not in heirsb or not cmp_function(
heirsa[heir], heirsb[heir]
):
if not Util.search_heir_by_values(heirsb, heirsa[heir], [0, 3]):
return False
if reverse:
return Util.cmp_heirs(heirsb, heirsa, cmp_function, False)
else:
return True
except Exception as e:
raise e
return False
def cmp_inputs(inputsa, inputsb):
if len(inputsa) != len(inputsb):
return False
for inputa in inputsa:
if not Util.in_utxo(inputa, inputsb):
return False
return True
def cmp_outputs(outputsa, outputsb, willexecutor_output=None):
if len(outputsa) != len(outputsb):
return False
for outputa in outputsa:
if not Util.cmp_output(outputa, willexecutor_output):
if not Util.in_output(outputa, outputsb):
return False
return True
def cmp_txs(txa, txb):
if not Util.cmp_inputs(txa.inputs(), txb.inputs()):
return False
if not Util.cmp_outputs(txa.outputs(), txb.outputs()):
return False
return True
def get_value_amount(txa, txb):
outputsa = txa.outputs()
# outputsb = txb.outputs()
value_amount = 0
for outa in outputsa:
same_amount, same_address = Util.in_output(outa, txb.outputs())
if not (same_amount or same_address):
return False
if same_amount and same_address:
value_amount += outa.value
if same_amount:
pass
if same_address:
pass
return value_amount
def chk_locktime(timestamp_to_check, block_height_to_check, locktime):
# TODO BUG: WHAT HAPPEN AT THRESHOLD?
locktime = int(locktime)
if locktime > LOCKTIME_THRESHOLD and locktime > timestamp_to_check:
return True
elif locktime < LOCKTIME_THRESHOLD and locktime > block_height_to_check:
return True
else:
return False
def anticipate_locktime(locktime, blocks=0, hours=0, days=0):
locktime = int(locktime)
out = 0
if locktime > LOCKTIME_THRESHOLD:
seconds = blocks * 600 + hours * 3600 + days * 86400
dt = datetime.fromtimestamp(locktime)
dt -= timedelta(seconds=seconds)
out = dt.timestamp()
else:
blocks -= hours * 6 + days * 144
out = locktime + blocks
if out < 1:
out = 1
return out
def cmp_locktime(locktimea, locktimeb):
if locktimea == locktimeb:
return 0
strlocktimea = str(locktimea)
strlocktimeb = str(locktimeb)
# intlocktimea = Util.str_to_locktime(strlocktimea)
# intlocktimeb = Util.str_to_locktime(strlocktimeb)
if locktimea[-1] in "ydb":
if locktimeb[-1] == locktimea[-1]:
return int(strlocktimea[-1]) - int(strlocktimeb[-1])
else:
return int(locktimea) - (locktimeb)
def get_lowest_valid_tx(available_utxos, will):
will = sorted(will.items(), key=lambda x: x[1]["tx"].locktime)
for txid, willitem in will.items():
pass
def get_locktimes(will):
locktimes = {}
for txid, willitem in will.items():
locktimes[willitem["tx"].locktime] = True
return locktimes.keys()
def get_lowest_locktimes(locktimes):
sorted_timestamp = []
sorted_block = []
for locktime in locktimes:
locktime = Util.parse_locktime_string(locktime)
if locktime < LOCKTIME_THRESHOLD:
bisect.insort(sorted_block, locktime)
else:
bisect.insort(sorted_timestamp, locktime)
return sorted(sorted_timestamp), sorted(sorted_block)
def get_lowest_locktimes_from_will(will):
return Util.get_lowest_locktimes(Util.get_locktimes(will))
def search_willtx_per_io(will, tx):
for wid, w in will.items():
if Util.cmp_txs(w["tx"], tx["tx"]):
return wid, w
return None, None
def invalidate_will(will):
raise Exception("not implemented")
def get_will_spent_utxos(will):
utxos = []
for txid, willitem in will.items():
utxos += willitem["tx"].inputs()
return utxos
def utxo_to_str(utxo):
try:
return utxo.to_str()
except Exception:
pass
try:
return utxo.prevout.to_str()
except Exception:
pass
return str(utxo)
def cmp_utxo(utxoa, utxob):
utxoa = Util.utxo_to_str(utxoa)
utxob = Util.utxo_to_str(utxob)
if utxoa == utxob:
return True
else:
return False
def in_utxo(utxo, utxos):
for s_u in utxos:
if Util.cmp_utxo(s_u, utxo):
return True
return False
def txid_in_utxo(txid, utxos):
for s_u in utxos:
if s_u.prevout.txid == txid:
return True
return False
def cmp_output(outputa, outputb):
return outputa.address == outputb.address and outputa.value == outputb.value
def in_output(output, outputs):
for s_o in outputs:
if Util.cmp_output(s_o, output):
return True
return False
# check all output with the same amount if none have the same address it can be a change
# return true true same address same amount
# return true false same amount different address
# return false false different amount, different address not found
def din_output(out, outputs):
same_amount = []
for s_o in outputs:
if int(out.value) == int(s_o.value):
same_amount.append(s_o)
if out.address == s_o.address:
return True, True
else:
pass
if len(same_amount) > 0:
return True, False
else:
return False, False
def get_change_output(wallet, in_amount, out_amount, fee):
change_amount = int(in_amount - out_amount - fee)
if change_amount > wallet.dust_threshold():
change_addresses = wallet.get_change_addresses_for_new_transaction()
out = PartialTxOutput.from_address_and_value(
change_addresses[0], change_amount
)
out.is_change = True
return out
def get_current_height(network):
# if no network or not up to date, just set locktime to zero
if not network:
return 0
chain = network.blockchain()
if chain.is_tip_stale():
return 0
# figure out current block height
chain_height = chain.height() # learnt from all connected servers, SPV-checked
server_height = (
network.get_server_height()
) # height claimed by main server, unverified
# note: main server might be lagging (either is slow, is malicious, or there is an SPV-invisible-hard-fork)
# - if it's lagging too much, it is the network's job to switch away
if server_height < chain_height - 10:
# the diff is suspiciously large... give up and use something non-fingerprintable
return 0
# discourage "fee sniping"
height = min(chain_height, server_height)
return height
def print_var(var, name="", veryverbose=False):
print(f"---{name}---")
if var is not None:
try:
print("str:", str(var))
except Exception:
pass
try:
print("repr", repr(var))
except Exception:
pass
try:
print("dict", dict(var))
except Exception:
pass
try:
print("dir", dir(var))
except Exception:
pass
try:
print("type", type(var))
except Exception:
pass
try:
print("to_json", var.to_json())
except Exception:
pass
try:
print("__slotnames__", var.__slotnames__)
except Exception:
pass
print(f"---end {name}---")
def print_utxo(utxo, name=""):
print(f"---utxo-{name}---")
Util.print_var(utxo, name)
Util.print_prevout(utxo.prevout, name)
Util.print_var(utxo.script_sig, f"{name}-script-sig")
Util.print_var(utxo.witness, f"{name}-witness")
print("_TxInput__address:", utxo._TxInput__address)
print("_TxInput__scriptpubkey:", utxo._TxInput__scriptpubkey)
print("_TxInput__value_sats:", utxo._TxInput__value_sats)
print(f"---utxo-end {name}---")
def print_prevout(prevout, name=""):
print(f"---prevout-{name}---")
Util.print_var(prevout, f"{name}-prevout")
Util.print_var(prevout._asdict())
print(f"---prevout-end {name}---")
def export_meta_gui(electrum_window, title, exporter):
filter_ = "All files (*)"
filename = getSaveFileName(
parent=electrum_window,
title=_("Select file to save your {}".format(title)),
filename="BALplugin_{}".format(title),
filter=filter_,
config=electrum_window.config,
)
if not filename:
return
try:
exporter(filename)
except FileExportFailed as e:
electrum_window.show_critical(str(e))
else:
electrum_window.show_message(
_("Your {0} were exported to '{1}'".format(title, str(filename)))
)
def copy(dicto, dictfrom):
for k, v in dictfrom.items():
dicto[k] = v
def fix_will_settings_tx_fees(will_settings):
tx_fees = will_settings.get("tx_fees", False)
have_to_update = False
if tx_fees:
will_settings["baltx_fees"] = tx_fees
del will_settings["tx_fees"]
have_to_update = True
return have_to_update
def fix_will_tx_fees(will):
have_to_update = False
for txid, willitem in will.items():
tx_fees = willitem.get("tx_fees", False)
if tx_fees:
will[txid]["baltx_fees"] = tx_fees
del will[txid]["tx_fees"]
have_to_update = True
return have_to_update

77
wallet_util/bal_wallet_utils.py Executable file
View File

@@ -0,0 +1,77 @@
#!env/bin/python3
from electrum.storage import WalletStorage
import json
from electrum.util import MyEncoder
import sys
import getpass
import os
default_fees = 100
def fix_will_settings_tx_fees(json_wallet):
tx_fees = json_wallet.get("will_settings", {}).get("tx_fees", False)
have_to_update = False
if tx_fees:
json_wallet["will_settings"]["baltx_fees"] = tx_fees
del json_wallet["will_settings"]["tx_fees"]
have_to_update = True
for txid, willitem in json_wallet["will"].items():
tx_fees = willitem.get("tx_fees", False)
if tx_fees:
json_wallet["will"][txid]["baltx_fees"] = tx_fees
del json_wallet["will"][txid]["tx_fees"]
have_to_update = True
return have_to_update
def uninstall_bal(json_wallet):
del json_wallet["will_settings"]
del json_wallet["will"]
del json_wallet["heirs"]
return True
def save(json_wallet, storage):
human_readable = not storage.is_encrypted()
storage.write(
json.dumps(
json_wallet,
indent=4 if human_readable else None,
sort_keys=bool(human_readable),
cls=MyEncoder,
)
)
def read_wallet(path, password=False):
storage = WalletStorage(path)
if storage.is_encrypted():
if not password:
password = getpass.getpass("Enter wallet password: ", stream=None)
storage.decrypt(password)
data = storage.read()
json_wallet = json.loads("[" + data + "]")[0]
return json_wallet, storage
if __name__ == "__main__":
if len(sys.argv) < 3:
print("usage: ./bal_wallet_utils <command> <wallet path>")
print("available commands: uninstall, fix")
exit(1)
if not os.path.exists(sys.argv[2]):
print("Error: wallet not found")
exit(1)
command = sys.argv[1]
path = sys.argv[2]
json_wallet, storage = read_wallet(path)
have_to_save = False
if command == "fix":
have_to_save = fix_will_settings_tx_fees(json_wallet)
if command == "uninstall":
have_to_save = uninstall_bal(json_wallet)
if have_to_save:
save(json_wallet, storage)
else:
print("nothing to do")

View File

@@ -0,0 +1,198 @@
#!/usr/bin/env python3
import sys
import os
import json
from PyQt6.QtWidgets import (
QApplication,
QMainWindow,
QVBoxLayout,
QHBoxLayout,
QLabel,
QLineEdit,
QPushButton,
QWidget,
QFileDialog,
QGroupBox,
QTextEdit,
)
from electrum.storage import WalletStorage
from bal_wallet_utils import fix_will_settings_tx_fees, uninstall_bal, save
class WalletUtilityGUI(QMainWindow):
def __init__(self):
super().__init__()
self.initUI()
def initUI(self):
self.setWindowTitle("BAL Wallet Utility")
self.setFixedSize(500, 400)
# Central widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
# Main layout
layout = QVBoxLayout(central_widget)
# Wallet input group
wallet_group = QGroupBox("Wallet Settings")
wallet_layout = QVBoxLayout(wallet_group)
# Wallet path
wallet_path_layout = QHBoxLayout()
wallet_path_layout.addWidget(QLabel("Wallet Path:"))
self.wallet_path_edit = QLineEdit()
self.wallet_path_edit.setPlaceholderText("Select wallet path...")
wallet_path_layout.addWidget(self.wallet_path_edit)
self.browse_btn = QPushButton("Browse...")
self.browse_btn.clicked.connect(self.browse_wallet)
wallet_path_layout.addWidget(self.browse_btn)
wallet_layout.addLayout(wallet_path_layout)
# Password
password_layout = QHBoxLayout()
password_layout.addWidget(QLabel("Password:"))
self.password_edit = QLineEdit()
self.password_edit.setEchoMode(QLineEdit.EchoMode.Password)
self.password_edit.setPlaceholderText("Enter password (if encrypted)")
password_layout.addWidget(self.password_edit)
wallet_layout.addLayout(password_layout)
layout.addWidget(wallet_group)
# Output area
output_group = QGroupBox("Output")
output_layout = QVBoxLayout(output_group)
self.output_text = QTextEdit()
self.output_text.setReadOnly(True)
output_layout.addWidget(self.output_text)
layout.addWidget(output_group)
# Action buttons
buttons_layout = QHBoxLayout()
self.fix_btn = QPushButton("Fix")
self.fix_btn.clicked.connect(self.fix_wallet)
self.fix_btn.setEnabled(False)
buttons_layout.addWidget(self.fix_btn)
self.uninstall_btn = QPushButton("Uninstall")
self.uninstall_btn.clicked.connect(self.uninstall_wallet)
self.uninstall_btn.setEnabled(False)
buttons_layout.addWidget(self.uninstall_btn)
layout.addLayout(buttons_layout)
# Connections to enable buttons when path is entered
self.wallet_path_edit.textChanged.connect(self.check_inputs)
def browse_wallet(self):
file_path, _ = QFileDialog.getOpenFileName(
self, "Select Wallet", "*", "Electrum Wallet (*)"
)
if file_path:
self.wallet_path_edit.setText(file_path)
def check_inputs(self):
wallet_path = self.wallet_path_edit.text().strip()
has_path = bool(wallet_path) and os.path.exists(wallet_path)
self.fix_btn.setEnabled(has_path)
self.uninstall_btn.setEnabled(has_path)
def log_message(self, message):
self.output_text.append(message)
def fix_wallet(self):
self.process_wallet("fix")
def uninstall_wallet(self):
self.log_message(
"WARNING: This will remove all BAL settings. This operation cannot be undone."
)
self.process_wallet("uninstall")
def process_wallet(self, command):
wallet_path = self.wallet_path_edit.text().strip()
password = self.password_edit.text()
if not wallet_path:
self.log_message("ERROR: Please enter wallet path")
return
if not os.path.exists(wallet_path):
self.log_message("ERROR: Wallet not found")
return
try:
self.log_message(f"Processing wallet: {wallet_path}")
storage = WalletStorage(wallet_path)
# Decrypt if necessary
if storage.is_encrypted():
if not password:
self.log_message(
"ERROR: Wallet is encrypted, please enter password"
)
return
try:
storage.decrypt(password)
self.log_message("Wallet decrypted successfully")
except Exception as e:
self.log_message(f"ERROR: Wrong password: {str(e)}")
return
# Read wallet
data = storage.read()
json_wallet = json.loads("[" + data + "]")[0]
have_to_save = False
message = ""
if command == "fix":
have_to_save = fix_will_settings_tx_fees(json_wallet)
message = (
"Fix applied successfully" if have_to_save else "No fix needed"
)
elif command == "uninstall":
have_to_save = uninstall_bal(json_wallet)
message = (
"BAL uninstalled successfully"
if have_to_save
else "No BAL settings found to uninstall"
)
if have_to_save:
try:
save(json_wallet, storage)
self.log_message(f"SUCCESS: {message}")
except Exception as e:
self.log_message(f"Save error: {str(e)}")
else:
self.log_message(f"INFO: {message}")
except Exception as e:
error_msg = f"ERROR: Processing failed: {str(e)}"
self.log_message(error_msg)
def main():
app = QApplication(sys.argv)
window = WalletUtilityGUI()
window.show()
return app.exec()
if __name__ == "__main__":
sys.exit(main())

895
will.py Normal file
View File

@@ -0,0 +1,895 @@
import copy
from electrum.bitcoin import NLOCKTIME_BLOCKHEIGHT_MAX
from electrum.i18n import _
from electrum.logging import Logger, get_logger
from electrum.transaction import (
PartialTransaction,
PartialTxInput,
PartialTxOutput,
Transaction,
TxOutpoint,
tx_from_any,
)
from electrum.util import (
bfh,
)
from .util import Util
from .willexecutors import Willexecutors
MIN_LOCKTIME = 1
MIN_BLOCK = 1
_logger = get_logger(__name__)
class Will:
def get_children(will, willid):
out = []
for _id in will:
inputs = will[_id].tx.inputs()
for idi in range(0, len(inputs)):
_input = inputs[idi]
if _input.prevout.txid.hex() == willid:
out.append([_id, idi, _input.prevout.out_idx])
return out
# build a tree with parent transactions
def add_willtree(will):
for willid in will:
will[willid].children = Will.get_children(will, willid)
for child in will[willid].children:
if not will[child[0]].father:
will[child[0]].father = willid
# return a list of will sorted by locktime
def get_sorted_will(will):
return sorted(will.items(), key=lambda x: x[1]["tx"].locktime)
def only_valid(will):
for k, v in will.items():
if v.get_status("VALID"):
yield k
def search_equal_tx(will, tx, wid):
for w in will:
if w != wid and not tx.to_json() != will[w]["tx"].to_json():
if will[w]["tx"].txid() != tx.txid():
if Util.cmp_txs(will[w]["tx"], tx):
return will[w]["tx"]
return False
def get_tx_from_any(x):
try:
a = str(x)
return tx_from_any(a)
except Exception as e:
raise e
return x
def add_info_from_will(will, wid, wallet):
if isinstance(will[wid].tx, str):
will[wid].tx = Will.get_tx_from_any(will[wid].tx)
if wallet:
will[wid].tx.add_info_from_wallet(wallet)
for txin in will[wid].tx.inputs():
txid = txin.prevout.txid.hex()
if txid in will:
change = will[txid].tx.outputs()[txin.prevout.out_idx]
txin._trusted_value_sats = change.value
try:
txin.script_descriptor = change.script_descriptor
except Exception:
pass
txin.is_mine = True
txin._TxInput__address = change.address
txin._TxInput__scriptpubkey = change.scriptpubkey
txin._TxInput__value_sats = change.value
txin._trusted_value_sats = change.value
def normalize_will(will, wallet=None, others_inputs={}):
to_delete = []
to_add = {}
# add info from wallet
willitems = {}
for wid in will:
Will.add_info_from_will(will, wid, wallet)
willitems[wid] = WillItem(will[wid])
will = willitems
errors = {}
for wid in will:
txid = will[wid].tx.txid()
if txid is None:
_logger.error("##########")
_logger.error(wid)
_logger.error(will[wid])
_logger.error(will[wid].tx.to_json())
_logger.error("txid is none")
will[wid].set_status("ERROR", True)
errors[wid] = will[wid]
continue
if txid != wid:
outputs = will[wid].tx.outputs()
ow = will[wid]
ow.normalize_locktime(others_inputs)
will[wid] = WillItem(ow.to_dict())
for i in range(0, len(outputs)):
Will.change_input(
will, wid, i, outputs[i], others_inputs, to_delete, to_add
)
to_delete.append(wid)
to_add[ow.tx.txid()] = ow.to_dict()
# for eid, err in errors.items():
# new_txid = err.tx.txid()
for k, w in to_add.items():
will[k] = w
for wid in to_delete:
if wid in will:
del will[wid]
def new_input(txid, idx, change):
prevout = TxOutpoint(txid=bfh(txid), out_idx=idx)
inp = PartialTxInput(prevout=prevout)
inp._trusted_value_sats = change.value
inp.is_mine = True
inp._TxInput__address = change.address
inp._TxInput__scriptpubkey = change.scriptpubkey
inp._TxInput__value_sats = change.value
return inp
"""
in questa situazione sono presenti due transazioni con id differente(quindi transazioni differenti)
per prima cosa controllo il locktime
se il locktime della nuova transazione e' maggiore del locktime della vecchia transazione, allora
confronto gli eredi, per locktime se corrispondono controllo i willexecutor
se hanno la stessa url ma le fee vecchie sono superiori alle fee nuove, allora anticipare.
"""
def check_anticipate(ow: "WillItem", nw: "WillItem"):
anticipate = Util.anticipate_locktime(ow.tx.locktime, days=1)
if int(nw.tx.locktime) >= int(anticipate):
if Util.cmp_heirs_by_values(
ow.heirs, nw.heirs, [0, 1], exclude_willexecutors=True
):
if nw.we and ow.we:
if ow.we["url"] == nw.we["url"]:
if int(ow.we["base_fee"]) > int(nw.we["base_fee"]):
return anticipate
else:
if int(ow.tx_fees) != int(nw.tx_fees):
return anticipate
else:
ow.tx.locktime
else:
ow.tx.locktime
else:
if nw.we == ow.we:
if not Util.cmp_heirs_by_values(ow.heirs, nw.heirs, [0, 3]):
return anticipate
else:
return ow.tx.locktime
else:
return ow.tx.locktime
else:
return anticipate
return 4294967295 + 1
def change_input(will, otxid, idx, change, others_inputs, to_delete, to_append):
ow = will[otxid]
ntxid = ow.tx.txid()
if otxid != ntxid:
for wid in will:
w = will[wid]
inputs = w.tx.inputs()
outputs = w.tx.outputs()
found = False
old_txid = w.tx.txid()
# ntx = None
for i in range(0, len(inputs)):
if (
inputs[i].prevout.txid.hex() == otxid
and inputs[i].prevout.out_idx == idx
):
if isinstance(w.tx, Transaction):
will[wid].tx = PartialTransaction.from_tx(w.tx)
will[wid].tx.set_rbf(True)
will[wid].tx._inputs[i] = Will.new_input(wid, idx, change)
found = True
if found:
pass
new_txid = will[wid].tx.txid()
if old_txid != new_txid:
to_delete.append(old_txid)
to_append[new_txid] = will[wid]
outputs = will[wid].tx.outputs()
for i in range(0, len(outputs)):
Will.change_input(
will,
wid,
i,
outputs[i],
others_inputs,
to_delete,
to_append,
)
def get_all_inputs(will, only_valid=False):
all_inputs = {}
for w, wi in will.items():
if not only_valid or wi.get_status("VALID"):
inputs = wi.tx.inputs()
for i in inputs:
prevout_str = i.prevout.to_str()
inp = [w, will[w], i]
if prevout_str not in all_inputs:
all_inputs[prevout_str] = [inp]
else:
all_inputs[prevout_str].append(inp)
return all_inputs
def get_all_inputs_min_locktime(all_inputs):
all_inputs_min_locktime = {}
for i, values in all_inputs.items():
min_locktime = min(values, key=lambda x: x[1].tx.locktime)[1].tx.locktime
for w in values:
if w[1].tx.locktime == min_locktime:
if i not in all_inputs_min_locktime:
all_inputs_min_locktime[i] = [w]
else:
all_inputs_min_locktime[i].append(w)
return all_inputs_min_locktime
def search_anticipate_rec(will, old_inputs):
redo = False
to_delete = []
to_append = {}
new_inputs = Will.get_all_inputs(will, only_valid=True)
for nid, nwi in will.items():
if nwi.search_anticipate(new_inputs):
if nid != nwi.tx.txid():
redo = True
to_delete.append(nid)
to_append[nwi.tx.txid()] = nwi
outputs = nwi.tx.outputs()
for i in range(0, len(outputs)):
Will.change_input(
will, nid, i, outputs[i], new_inputs, to_delete, to_append
)
if nwi.search_anticipate(old_inputs):
if nid != nwi.tx.txid():
redo = True
to_delete.append(nid)
to_append[nwi.tx.txid()] = nwi
outputs = nwi.tx.outputs()
for i in range(0, len(outputs)):
Will.change_input(
will, nid, i, outputs[i], new_inputs, to_delete, to_append
)
for w in to_delete:
try:
del will[w]
except Exception:
pass
for k, w in to_append.items():
will[k] = w
if redo:
Will.search_anticipate_rec(will, old_inputs)
def update_will(old_will, new_will):
all_old_inputs = Will.get_all_inputs(old_will, only_valid=True)
# all_inputs_min_locktime = Will.get_all_inputs_min_locktime(all_old_inputs)
# all_new_inputs = Will.get_all_inputs(new_will)
# check if the new input is already spent by other transaction
# if it is use the same locktime, or anticipate.
Will.search_anticipate_rec(new_will, all_old_inputs)
other_inputs = Will.get_all_inputs(old_will, {})
try:
Will.normalize_will(new_will, others_inputs=other_inputs)
except Exception as e:
raise e
for oid in Will.only_valid(old_will):
if oid in new_will:
new_heirs = new_will[oid].heirs
new_we = new_will[oid].we
new_will[oid] = old_will[oid]
new_will[oid].heirs = new_heirs
new_will[oid].we = new_we
continue
else:
continue
def get_higher_input_for_tx(will):
out = {}
for wid in will:
wtx = will[wid].tx
found = False
for inp in wtx.inputs():
if inp.prevout.txid.hex() in will:
found = True
break
if not found:
out[inp.prevout.to_str()] = inp
return out
def invalidate_will(will, wallet, fees_per_byte):
will_only_valid = Will.only_valid_list(will)
inputs = Will.get_all_inputs(will_only_valid)
utxos = wallet.get_utxos()
filtered_inputs = []
prevout_to_spend = []
current_height = Util.get_current_height(wallet.network)
for prevout_str, ws in inputs.items():
for w in ws:
if w[0] not in filtered_inputs:
filtered_inputs.append(w[0])
if prevout_str not in prevout_to_spend:
prevout_to_spend.append(prevout_str)
balance = 0
utxo_to_spend = []
for utxo in utxos:
if utxo.is_coinbase_output() and utxo.block_height < current_height+100:
continue
utxo_str = utxo.prevout.to_str()
if utxo_str in prevout_to_spend:
balance += inputs[utxo_str][0][2].value_sats()
utxo_to_spend.append(utxo)
if len(utxo_to_spend) > 0:
change_addresses = wallet.get_change_addresses_for_new_transaction()
out = PartialTxOutput.from_address_and_value(change_addresses[0], balance)
out.is_change = True
locktime = current_height
tx = PartialTransaction.from_io(
utxo_to_spend, [out], locktime=locktime, version=2
)
tx.set_rbf(True)
fee = tx.estimated_size() * fees_per_byte
if balance - fee > 0:
out = PartialTxOutput.from_address_and_value(
change_addresses[0], balance - fee
)
tx = PartialTransaction.from_io(
utxo_to_spend, [out], locktime=locktime, version=2
)
tx.set_rbf(True)
_logger.debug(f"invalidation tx: {tx}")
return tx
else:
_logger.debug(f"balance({balance}) - fee({fee}) <=0")
pass
else:
_logger.debug("len utxo_to_spend <=0")
pass
def is_new(will):
for wid, w in will.items():
if w.get_status("VALID") and not w.get_status("COMPLETE"):
return True
def search_rai(all_inputs, all_utxos, will, wallet):
# will_only_valid = Will.only_valid_or_replaced_list(will)
for inp, ws in all_inputs.items():
inutxo = Util.in_utxo(inp, all_utxos)
for w in ws:
wi = w[1]
if (
wi.get_status("VALID")
or wi.get_status("CONFIRMED")
or wi.get_status("PENDING")
):
prevout_id = w[2].prevout.txid.hex()
if not inutxo:
if prevout_id in will:
wo = will[prevout_id]
if wo.get_status("REPLACED"):
wi.set_status("REPLACED", True)
if wo.get_status("INVALIDATED"):
wi.set_status("INVALIDATED", True)
else:
if wallet.db.get_transaction(wi._id):
wi.set_status("CONFIRMED", True)
else:
wi.set_status("INVALIDATED", True)
for child in wi.search(all_inputs):
if child.tx.locktime < wi.tx.locktime:
_logger.debug("a child was found")
wi.set_status("REPLACED", True)
else:
pass
def utxos_strs(utxos):
return [Util.utxo_to_str(u) for u in utxos]
def set_invalidate(wid, will=[]):
will[wid].set_status("INVALIDATED", True)
if will[wid].children:
for c in will[wid].children.items():
Will.set_invalidate(c[0], will)
def check_tx_height(tx, wallet):
info = wallet.get_tx_info(tx)
return info.tx_mined_status.height()
# check if transactions are stil valid tecnically valid
def check_invalidated(willtree, utxos_list, wallet):
for wid, w in willtree.items():
if (
not w.father
or willtree[w.father].get_status("CONFIRMED")
or willtree[w.father].get_status("PENDING")
):
for inp in w.tx.inputs():
inp_str = Util.utxo_to_str(inp)
if inp_str not in utxos_list:
if wallet:
height = Will.check_tx_height(w.tx, wallet)
if height < 0:
Will.set_invalidate(wid, willtree)
elif height == 0:
w.set_status("PENDING", True)
else:
w.set_status("CONFIRMED", True)
# def reflect_to_children(treeitem):
# if not treeitem.get_status("VALID"):
# _logger.debug(f"{tree:item._id} status not valid looking for children")
# for child in treeitem.children:
# wc = willtree[child]
# if wc.get_status("VALID"):
# if treeitem.get_status("INVALIDATED"):
# wc.set_status("INVALIDATED", True)
# if treeitem.get_status("REPLACED"):
# wc.set_status("REPLACED", True)
# if wc.children:
# Will.reflect_to_children(wc)
def check_amounts(heirs, willexecutors, all_utxos, timestamp_to_check, dust):
fixed_heirs, fixed_amount, perc_heirs, perc_amount, fixed_amount_with_dust = (
heirs.fixed_percent_lists_amount(timestamp_to_check, dust, reverse=True)
)
wallet_balance = 0
for utxo in all_utxos:
wallet_balance += utxo.value_sats()
if fixed_amount >= wallet_balance:
raise FixedAmountException(
f"Fixed amount({fixed_amount}) >= {wallet_balance}"
)
if perc_amount != 100:
raise PercAmountException(f"Perc amount({perc_amount}) =! 100%")
for url, wex in willexecutors.items():
if Willexecutors.is_selected(wex):
temp_balance = wallet_balance - int(wex["base_fee"])
if fixed_amount >= temp_balance:
raise FixedAmountException(
f"Willexecutor{url} excess base fee({wex['base_fee']}), {fixed_amount} >={temp_balance}"
)
def check_will(will, all_utxos, wallet, block_to_check, timestamp_to_check):
Will.add_willtree(will)
utxos_list = Will.utxos_strs(all_utxos)
Will.check_invalidated(will, utxos_list, wallet)
all_inputs = Will.get_all_inputs(will, only_valid=True)
all_inputs_min_locktime = Will.get_all_inputs_min_locktime(all_inputs)
Will.check_will_expired(
all_inputs_min_locktime, block_to_check, timestamp_to_check
)
all_inputs = Will.get_all_inputs(will, only_valid=True)
Will.search_rai(all_inputs, all_utxos, will, wallet)
def is_will_valid(
will,
block_to_check,
timestamp_to_check,
tx_fees,
all_utxos,
heirs={},
willexecutors={},
self_willexecutor=False,
wallet=False,
callback_not_valid_tx=None,
):
Will.check_will(will, all_utxos, wallet, block_to_check, timestamp_to_check)
if heirs:
if not Will.check_willexecutors_and_heirs(
will,
heirs,
willexecutors,
self_willexecutor,
timestamp_to_check,
tx_fees,
):
raise NotCompleteWillException()
all_inputs = Will.get_all_inputs(will, only_valid=True)
_logger.info("check all utxo in wallet are spent")
if all_inputs:
for utxo in all_utxos:
if utxo.value_sats() > 68 * tx_fees:
if not Util.in_utxo(utxo, all_inputs.keys()):
_logger.info("utxo is not spent", utxo.to_json())
_logger.debug(all_inputs.keys())
raise NotCompleteWillException(
"Some utxo in the wallet is not included"
)
_logger.info("will ok")
return True
def check_will_expired(all_inputs_min_locktime, block_to_check, timestamp_to_check):
_logger.info("check if some transaction is expired")
for prevout_str, wid in all_inputs_min_locktime.items():
for w in wid:
if w[1].get_status("VALID"):
locktime = int(wid[0][1].tx.locktime)
if locktime <= NLOCKTIME_BLOCKHEIGHT_MAX:
if locktime < int(block_to_check):
raise WillExpiredException(
f"Will Expired {wid[0][0]}: {locktime}<{block_to_check}"
)
else:
if locktime < int(timestamp_to_check):
raise WillExpiredException(
f"Will Expired {wid[0][0]}: {locktime}<{timestamp_to_check}"
)
else:
from datetime import datetime
_logger.debug(f"Will Not Expired {wid[0][0]}: {datetime.fromtimestamp(locktime).isoformat()} > {datetime.fromtimestamp(timestamp_to_check).isoformat()}")
# def check_all_input_spent_are_in_wallet():
# _logger.info("check all input spent are in wallet or valid txs")
# for inp, ws in all_inputs.items():
# if not Util.in_utxo(inp, all_utxos):
# for w in ws:
# if w[1].get_status("VALID"):
# prevout_id = w[2].prevout.txid.hex()
# parentwill = will.get(prevout_id, False)
# if not parentwill or not parentwill.get_status("VALID"):
# w[1].set_status("INVALIDATED", True)
def only_valid_list(will):
out = {}
for wid, w in will.items():
if w.get_status("VALID"):
out[wid] = w
return out
def only_valid_or_replaced_list(will):
out = []
for wid, w in will.items():
wi = w
if wi.get_status("VALID") or wi.get_status("REPLACED"):
out.append(wid)
return out
def check_willexecutors_and_heirs(
will, heirs, willexecutors, self_willexecutor, check_date, tx_fees
):
_logger.debug("check willexecutors heirs")
no_willexecutor = 0
willexecutors_found = {}
heirs_found = {}
will_only_valid = Will.only_valid_list(will)
if len(will_only_valid) < 1:
return False
for wid in Will.only_valid_list(will):
w = will[wid]
if w.tx_fees != tx_fees:
raise TxFeesChangedException(f"{tx_fees}:", w.tx_fees)
for wheir in w.heirs:
if not 'w!ll3x3c"' == wheir[:9]:
their = will[wid].heirs[wheir]
if heir := heirs.get(wheir, None):
if (
heir[0] == their[0]
and heir[1] == their[1]
and Util.parse_locktime_string(heir[2])
>= Util.parse_locktime_string(their[2])
):
count = heirs_found.get(wheir, 0)
heirs_found[wheir] = count + 1
else:
_logger.debug(
f"heir not present transaction is not valid:{wheir} {wid}, {w}"
)
if willexecutor := w.we:
count = willexecutors_found.get(willexecutor["url"], 0)
if Util.cmp_willexecutor(
willexecutor, willexecutors.get(willexecutor["url"], None)
):
willexecutors_found[willexecutor["url"]] = count + 1
else:
no_willexecutor += 1
count_heirs = 0
for h in heirs:
if Util.parse_locktime_string(heirs[h][2]) >= check_date:
count_heirs += 1
if h not in heirs_found:
_logger.debug(f"heir: {h} not found")
raise HeirNotFoundException(h)
if not count_heirs:
raise NoHeirsException("there are not valid heirs")
if self_willexecutor and no_willexecutor == 0:
raise NoWillExecutorNotPresent("Backup tx")
for url, we in willexecutors.items():
if Willexecutors.is_selected(we):
if url not in willexecutors_found:
_logger.debug(f"will-executor: {url} not fount")
raise WillExecutorNotPresent(url)
_logger.info("will is coherent with heirs and will-executors")
return True
class WillItem(Logger):
STATUS_DEFAULT = {
"ANTICIPATED": ["Anticipated", False],
"BROADCASTED": ["Broadcasted", False],
"CHECKED": ["Checked", False],
"CHECK_FAIL": ["Check Failed", False],
"COMPLETE": ["Signed", False],
"CONFIRMED": ["Confirmed", False],
"ERROR": ["Error", False],
"EXPIRED": ["Expired", False],
"EXPORTED": ["Exported", False],
"IMPORTED": ["Imported", False],
"INVALIDATED": ["Invalidated", False],
"PENDING": ["Pending", False],
"PUSH_FAIL": ["Push failed", False],
"PUSHED": ["Pushed", False],
"REPLACED": ["Replaced", False],
"RESTORED": ["Restored", False],
"VALID": ["Valid", True],
}
def set_status(self, status, value=True):
# _logger.trace(
# "set status {} - {} {} -> {}".format(
# self._id, status, self.STATUS[status][1], value
# )
# )
if self.STATUS[status][1] == bool(value):
return None
self.status += "." + (("NOT " if not value else "") + _(self.STATUS[status][0]))
self.STATUS[status][1] = bool(value)
if value:
if status in ["INVALIDATED", "REPLACED", "CONFIRMED", "PENDING"]:
self.STATUS["VALID"][1] = False
if status in ["CONFIRMED", "PENDING"]:
self.STATUS["INVALIDATED"][1] = False
if status in ["PUSHED"]:
self.STATUS["PUSH_FAIL"][1] = False
self.STATUS["CHECK_FAIL"][1] = False
if status in ["CHECKED"]:
self.STATUS["PUSHED"][1] = True
self.STATUS["PUSH_FAIL"][1] = False
return value
def get_status(self, status):
return self.STATUS[status][1]
def __init__(self, w, _id=None, wallet=None):
if isinstance(
w,
WillItem,
):
self.__dict__ = w.__dict__.copy()
else:
self.tx = Will.get_tx_from_any(w["tx"])
self.heirs = w.get("heirs", None)
self.we = w.get("willexecutor", None)
self.status = w.get("status", None)
self.description = w.get("description", None)
self.time = w.get("time", None)
self.change = w.get("change", None)
self.tx_fees = w.get("baltx_fees", 0)
self.father = w.get("Father", None)
self.children = w.get("Children", None)
self.STATUS = copy.deepcopy(WillItem.STATUS_DEFAULT)
for s in self.STATUS:
self.STATUS[s][1] = w.get(s, WillItem.STATUS_DEFAULT[s][1])
if not _id:
self._id = self.tx.txid()
else:
self._id = _id
if not self._id:
self.status += "ERROR!!!"
self.valid = False
if wallet:
self.tx.add_info_from_wallet(wallet)
def to_dict(self):
out = {
"_id": self._id,
"tx": self.tx,
"heirs": self.heirs,
"willexecutor": self.we,
"status": self.status,
"description": self.description,
"time": self.time,
"change": self.change,
"baltx_fees": self.tx_fees,
}
for key in self.STATUS:
try:
out[key] = self.STATUS[key][1]
except Exception as e:
_logger.error(f"{key},{self.STATUS[key]} {e}")
return out
def __repr__(self):
return str(self)
def __str__(self):
return str(self.to_dict())
def set_anticipate(self, ow: "WillItem"):
nl = min(ow.tx.locktime, Will.check_anticipate(ow, self))
if int(nl) < self.tx.locktime:
self.tx.locktime = int(nl)
return True
else:
return False
def search_anticipate(self, all_inputs):
anticipated = False
for ow in self.search(all_inputs):
if self.set_anticipate(ow):
anticipated = True
return anticipated
def search(self, all_inputs):
for inp in self.tx.inputs():
prevout_str = inp.prevout.to_str()
oinps = all_inputs.get(prevout_str, [])
for oinp in oinps:
ow = oinp[1]
if ow._id != self._id:
yield ow
def normalize_locktime(self, all_inputs):
outputs = self.tx.outputs()
for idx in range(0, len(outputs)):
inps = all_inputs.get(f"{self._id}:{idx}", [])
_logger.debug("****check locktime***")
for inp in inps:
if inp[0] != self._id:
iw = inp[1]
self.set_anticipate(iw)
def set_check_willexecutor(self,resp):
try:
if resp :
if "tx" in resp and resp["tx"] == str(self.tx):
self.set_status("PUSHED")
self.set_status("CHECKED")
else:
self.set_status("CHECK_FAIL")
self.set_status("PUSHED", False)
return True
else:
self.set_status("CHECK_FAIL")
self.set_status("PUSHED", False)
return False
except Exception as e:
_logger.error(f"exception checking transaction: {e}")
self.set_status("CHECK_FAIL")
def get_color(self):
if self.get_status("INVALIDATED"):
return "#f87838"
elif self.get_status("REPLACED"):
return "#ff97e9"
elif self.get_status("CONFIRMED"):
return "#bfbfbf"
elif self.get_status("PENDING"):
return "#ffce30"
elif self.get_status("CHECK_FAIL") and not self.get_status("CHECKED"):
return "#e83845"
elif self.get_status("CHECKED"):
return "#8afa6c"
elif self.get_status("PUSH_FAIL"):
return "#e83845"
elif self.get_status("PUSHED"):
return "#73f3c8"
elif self.get_status("COMPLETE"):
return "#2bc8ed"
else:
return "#ffffff"
class WillException(Exception):
def __init__(self,msg="WillException"):
self.msg=msg
Exception.__init__(self)
def __str__(self):
return self.msg
class WillExpiredException(WillException):
pass
class NotCompleteWillException(WillException):
pass
class HeirChangeException(NotCompleteWillException):
pass
class TxFeesChangedException(NotCompleteWillException):
pass
class HeirNotFoundException(NotCompleteWillException):
pass
class WillexecutorChangeException(NotCompleteWillException):
pass
class NoWillExecutorNotPresent(NotCompleteWillException):
pass
class WillExecutorNotPresent(NotCompleteWillException):
pass
class NoHeirsException(WillException):
pass
class AmountException(WillException):
pass
class PercAmountException(AmountException):
pass
class FixedAmountException(AmountException):
pass

357
willexecutors.py Normal file
View File

@@ -0,0 +1,357 @@
import json
from datetime import datetime
import time
from aiohttp import ClientResponse
from electrum import constants
from electrum.i18n import _
from electrum.logging import get_logger
from electrum.network import Network
from .bal import BalPlugin
DEFAULT_TIMEOUT = 5
_logger = get_logger(__name__)
chainname = constants.net.NET_NAME if constants.net.NET_NAME != "mainnet" else "bitcoin"
class Willexecutors:
def save(bal_plugin, willexecutors):
_logger.debug(f"save {willexecutors},{chainname}")
aw = bal_plugin.WILLEXECUTORS.get()
aw[chainname] = willexecutors
bal_plugin.WILLEXECUTORS.set(aw)
_logger.debug(f"saved: {aw}")
# bal_plugin.WILLEXECUTORS.set(willexecutors)
def get_willexecutors(
bal_plugin, update=False, bal_window=False, force=False, task=True
):
willexecutors = bal_plugin.WILLEXECUTORS.get()
willexecutors = willexecutors.get(chainname, {})
to_del = []
for w in willexecutors:
if not isinstance(willexecutors[w], dict):
to_del.append(w)
continue
Willexecutors.initialize_willexecutor(willexecutors[w], w)
for w in to_del:
_logger.error(
"error Willexecutor to delete type:{} {}".format(
type(willexecutors[w]), w
)
)
del willexecutors[w]
bal = bal_plugin.WILLEXECUTORS.default.get(chainname, {})
for bal_url, bal_executor in bal.items():
if bal_url not in willexecutors:
_logger.debug(f"force add {bal_url} willexecutor")
willexecutors[bal_url] = bal_executor
# if update:
# found = False
# for url, we in willexecutors.items():
# if Willexecutors.is_selected(we):
# found = True
# if found or force:
# if bal_plugin.PING_WILLEXECUTORS.get() or force:
# ping_willexecutors = True
# if bal_plugin.ASK_PING_WILLEXECUTORS.get() and not force:
# if bal_window:
# ping_willexecutors = bal_window.window.question(
# _(
# "Contact willexecutors servers to update payment informations?"
# )
# )
# if ping_willexecutors:
# if task:
# bal_window.ping_willexecutors(willexecutors, task)
# else:
# bal_window.ping_willexecutors_task(willexecutors)
w_sorted = dict(
sorted(
willexecutors.items(), key=lambda w: w[1].get("sort", 0), reverse=True
)
)
return w_sorted
def is_selected(willexecutor, value=None):
if not willexecutor:
return False
if value is not None:
willexecutor["selected"] = value
try:
return willexecutor["selected"]
except Exception:
willexecutor["selected"] = False
return False
def get_willexecutor_transactions(will, force=False):
willexecutors = {}
for wid, willitem in will.items():
if willitem.get_status("VALID"):
if willitem.get_status("COMPLETE"):
if not willitem.get_status("PUSHED") or force:
if willexecutor := willitem.we:
url = willexecutor["url"]
if willexecutor and Willexecutors.is_selected(willexecutor):
if url not in willexecutors:
willexecutor["txs"] = ""
willexecutor["txsids"] = []
willexecutor["broadcast_status"] = _("Waiting...")
willexecutors[url] = willexecutor
willexecutors[url]["txs"] += str(willitem.tx) + "\n"
willexecutors[url]["txsids"].append(wid)
return willexecutors
# def only_selected_list(willexecutors):
# out = {}
# for url, v in willexecutors.items():
# if Willexecutors.is_selected(url):
# out[url] = v
# def push_transactions_to_willexecutors(will):
# willexecutors = Willexecutors.get_transactions_to_be_pushed()
# for url in willexecutors:
# willexecutor = willexecutors[url]
# if Willexecutors.is_selected(willexecutor):
# if "txs" in willexecutor:
# Willexecutors.push_transactions_to_willexecutor(
# willexecutors[url]["txs"], url
# )
def send_request(
method, url, data=None, *, timeout=10, handle_response=None, count_reply=0
):
network = Network.get_instance()
if not network:
raise Exception("You are offline.")
_logger.debug(f"<-- {method} {url} {data}")
headers = {}
headers["user-agent"] = f"BalPlugin v:{BalPlugin.__version__}"
headers["Content-Type"] = "text/plain"
if not handle_response:
handle_response = Willexecutors.handle_response
try:
if method == "get":
response = Network.send_http_on_proxy(
method,
url,
params=data,
headers=headers,
on_finish=handle_response,
timeout=timeout,
)
elif method == "post":
response = Network.send_http_on_proxy(
method,
url,
body=data,
headers=headers,
on_finish=handle_response,
timeout=timeout,
)
else:
raise Exception(f"unexpected {method=!r}")
except TimeoutError:
if count_reply < 10:
_logger.debug(f"timeout({count_reply}) error: retry in 3 sec...")
time.sleep(3)
return Willexecutors.send_request(
method,
url,
data,
timeout=timeout,
handle_response=handle_response,
count_reply=count_reply + 1,
)
else:
_logger.debug(f"Too many timeouts: {count_reply}")
except Exception as e:
raise e
else:
_logger.debug(f"--> {response}")
return response
def get_we_url_from_response(resp):
url_slices = str(resp.url).split("/")
if len(url_slices) > 2:
url_slices = url_slices[:-2]
return "/".join(url_slices)
async def handle_response(resp: ClientResponse):
r = await resp.text()
try:
r = json.loads(r)
# url = Willexecutors.get_we_url_from_response(resp)
# r["url"]= url
# r["status"]=resp.status
except Exception as e:
_logger.debug(f"error handling response:{e}")
pass
return r
class AlreadyPresentException(Exception):
pass
def push_transactions_to_willexecutor(willexecutor):
out = True
try:
_logger.debug(f"{willexecutor['url']}: {willexecutor['txs']}")
if w := Willexecutors.send_request(
"post",
willexecutor["url"] + "/" + chainname + "/pushtxs",
data=willexecutor["txs"].encode("ascii"),
):
willexecutor["broadcast_status"] = _("Success")
_logger.debug(f"pushed: {w}")
if w != "thx":
_logger.debug(f"error: {w}")
raise Exception(w)
else:
raise Exception("empty reply from:{willexecutor['url']}")
except Exception as e:
_logger.debug(f"error:{e}")
if str(e) == "already present":
raise Willexecutors.AlreadyPresentException()
out = False
willexecutor["broadcast_status"] = _("Failed")
return out
def ping_servers(willexecutors):
for url, we in willexecutors.items():
Willexecutors.get_info_task(url, we)
def get_info_task(url, willexecutor):
w = None
try:
_logger.info("GETINFO_WILLEXECUTOR")
_logger.debug(url)
w = Willexecutors.send_request("get", url + "/" + chainname + "/info")
if isinstance(w, dict):
willexecutor["url"] = url
willexecutor["status"] = 200
willexecutor["base_fee"] = w["base_fee"]
willexecutor["address"] = w["address"]
willexecutor["info"] = w["info"]
_logger.debug(f"response_data {w}")
except Exception as e:
_logger.error(f"error {e} contacting {url}: {w}")
willexecutor["status"] = "KO"
willexecutor["last_update"] = datetime.now().timestamp()
return willexecutor
def initialize_willexecutor(willexecutor, url, status=None, old_willexecutor={}):
willexecutor["url"] = url
if status is not None:
willexecutor["status"] = status
else:
willexecutor["status"] = old_willexecutor.get("status",willexecutor.get("status","Ko"))
willexecutor["selected"]=Willexecutors.is_selected(old_willexecutor) or willexecutor.get("selected",False)
willexecutor["address"]=old_willexecutor.get("address",willexecutor.get("address",""))
willexecutor["promo_code"]=old_willexecutor.get("promo_code",willexecutor.get("promo_code"))
def download_list(old_willexecutors):
try:
willexecutors = Willexecutors.send_request(
"get",
f"https://welist.bitcoin-after.life/data/{chainname}?page=0&limit=100",
)
# del willexecutors["status"]
for w in willexecutors:
if w not in ("status", "url"):
Willexecutors.initialize_willexecutor(
willexecutors[w], w, None, old_willexecutors.get(w,{})
)
# bal_plugin.WILLEXECUTORS.set(l)
# bal_plugin.config.set_key(bal_plugin.WILLEXECUTORS,l,save=True)
return willexecutors
except Exception as e:
_logger.error(f"Failed to download willexecutors list: {e}")
return {}
def get_willexecutors_list_from_json():
try:
with open("willexecutors.json") as f:
willexecutors = json.load(f)
for w in willexecutors:
willexecutor = willexecutors[w]
Willexecutors.initialize_willexecutor(willexecutor, w, "New", False)
# bal_plugin.WILLEXECUTORS.set(willexecutors)
return willexecutors
except Exception as e:
_logger.error(f"error opening willexecutors json: {e}")
return {}
def check_transaction(txid, url):
_logger.debug(f"{url}:{txid}")
try:
w = Willexecutors.send_request(
"post", url + "/searchtx", data=txid.encode("ascii")
)
return w
except Exception as e:
_logger.error(f"error contacting {url} for checking txs {e}")
raise e
def compute_id(willexecutor):
return "{}-{}".format(willexecutor.get("url"), willexecutor.get("chain"))
class WillExecutor:
def __init__(
self,
url,
base_fee,
chain,
info,
version,
status,
is_selected=False,
promo_code="",
):
self.url = url
self.base_fee = base_fee
self.chain = chain
self.info = info
self.version = version
self.status = status
self.promo_code = promo_code
self.is_selected = is_selected
self.id = self.compute_id()
def from_dict(d):
return WillExecutor(
url=d.get("url", "http://localhost:8000"),
base_fee=d.get("base_fee", 1000),
chain=d.get("chain", chainname),
info=d.get("info", ""),
version=d.get("version", 0),
status=d.get("status", "Ko"),
is_selected=d.get("is_selected", "False"),
promo_code=d.get("promo_code", ""),
)
def to_dict(self):
return {
"url": self.url,
"base_fee": self.base_fee,
"chain": self.chain,
"info": self.info,
"version": self.version,
"promo_code": self.promo_code,
}
def compute_id(self):
return f"{self.url}-{self.chain}"