download welist

This commit is contained in:
bitcoinafterlife 2025-07-04 13:22:05 -04:00
parent e542c82ee1
commit 4d021e1d82
7 changed files with 3095 additions and 1321 deletions

14
bal.py
View File

@ -1,15 +1,10 @@
import random import random
import os import os
from hashlib import sha256
from typing import NamedTuple, Optional, Dict, Tuple
from electrum.plugin import BasePlugin from electrum.plugin import BasePlugin
from electrum.util import to_bytes, bfh
from electrum import json_db from electrum import json_db
from electrum.transaction import tx_from_any from electrum.transaction import tx_from_any
from . import util as Util
from . import willexecutors as Willexecutors
import os import os
json_db.register_dict('heirs', tuple, None) json_db.register_dict('heirs', tuple, None)
json_db.register_dict('will', lambda x: get_will(x), None) json_db.register_dict('will', lambda x: get_will(x), None)
@ -17,11 +12,10 @@ json_db.register_dict('will_settings', lambda x:x, None)
from electrum.logging import get_logger from electrum.logging import get_logger
def get_will(x): def get_will(x):
try: try:
#print("______________________________________________________________________________________________________")
#print(x)
x['tx']=tx_from_any(x['tx']) x['tx']=tx_from_any(x['tx'])
except Exception as e: except Exception as e:
#Util.print_var(x)
raise e raise e
return x return x
@ -69,7 +63,7 @@ class BalPlugin(BasePlugin):
HIDE_INVALIDATED:True, HIDE_INVALIDATED:True,
ALLOW_REPUSH: False, ALLOW_REPUSH: False,
WILLEXECUTORS: { WILLEXECUTORS: {
'http://bitcoin-after.life:9137': { 'https://bitcoin-after.life:9137': {
"base_fee": 100000, "base_fee": 100000,
"status": "New", "status": "New",
"info":"Bitcoin After Life Will Executor", "info":"Bitcoin After Life Will Executor",
@ -102,10 +96,8 @@ class BalPlugin(BasePlugin):
def config_get(self,key): def config_get(self,key):
v = self.config.get(key,None) v = self.config.get(key,None)
print("config get",key,v)
if v is None: if v is None:
self.config.set_key(key,self.DEFAULT_SETTINGS[key]) self.config.set_key(key,self.DEFAULT_SETTINGS[key])
print("config setkey",key)
v = self.DEFAULT_SETTINGS[key] v = self.DEFAULT_SETTINGS[key]
return v return v

View File

@ -15,8 +15,8 @@ import datetime
import urllib.request import urllib.request
import urllib.parse import urllib.parse
from .bal import BalPlugin from .bal import BalPlugin
from . import util as Util from .util import Util
from . import willexecutors as Willexecutors from .willexecutors import Willexecutors
if TYPE_CHECKING: if TYPE_CHECKING:
from .wallet_db import WalletDB from .wallet_db import WalletDB
from .simple_config import SimpleConfig from .simple_config import SimpleConfig
@ -38,6 +38,7 @@ def reduce_outputs(in_amount, out_amount, fee, outputs):
for output in outputs: for output in outputs:
output.value = math.floor((in_amount-fee)/out_amount * output.value) output.value = math.floor((in_amount-fee)/out_amount * output.value)
"""
#TODO: put this method inside wallet.db to replace or complete get_locktime_for_new_transaction #TODO: put this method inside wallet.db to replace or complete get_locktime_for_new_transaction
def get_current_height(network:'Network'): def get_current_height(network:'Network'):
#if no network or not up to date, just set locktime to zero #if no network or not up to date, just set locktime to zero
@ -57,7 +58,7 @@ def get_current_height(network:'Network'):
# discourage "fee sniping" # discourage "fee sniping"
height = min(chain_height, server_height) height = min(chain_height, server_height)
return height return height
"""

View File

@ -3,5 +3,6 @@
"fullname": "Bitcoin After Life", "fullname": "Bitcoin After Life",
"description": "Provides free and decentralized inheritance support", "description": "Provides free and decentralized inheritance support",
"author":"Svātantrya", "author":"Svātantrya",
"available_for": ["qt"] "available_for": ["qt"],
"icon":"icons/bal32x32.png"
} }

2138
qt.py

File diff suppressed because it is too large Load Diff

809
util.py
View File

@ -8,451 +8,430 @@ import urllib.parse
from electrum.util import write_json_file,FileImportFailed,FileExportFailed from electrum.util import write_json_file,FileImportFailed,FileExportFailed
LOCKTIME_THRESHOLD = 500000000 LOCKTIME_THRESHOLD = 500000000
def locktime_to_str(locktime): class Util:
try: def locktime_to_str(locktime):
locktime=int(locktime)
if locktime > LOCKTIME_THRESHOLD:
dt = datetime.fromtimestamp(locktime).isoformat()
return dt
except Exception as e:
#print(e)
pass
return str(locktime)
def str_to_locktime(locktime):
try:
if locktime[-1] in ('y','d','b'):
return locktime
else: return int(locktime)
except Exception as e:
pass
#print(e)
dt_object = datetime.fromisoformat(locktime)
timestamp = dt_object.timestamp()
return int(timestamp)
def parse_locktime_string(locktime,w=None):
try:
return int(locktime)
except Exception as e:
pass
#print("parse_locktime_string",e)
try:
now = datetime.now()
if locktime[-1] == 'y':
locktime = str(int(locktime[:-1])*365) + "d"
if locktime[-1] == 'd':
return int((now + timedelta(days = int(locktime[:-1]))).replace(hour=0,minute=0,second=0,microsecond=0).timestamp())
if locktime[-1] == 'b':
locktime = int(locktime[:-1])
height = 0
if w:
height = get_current_height(w.network)
locktime+=int(height)
return int(locktime)
except Exception as e:
print("parse_locktime_string",e)
#raise e
return 0
def int_locktime(seconds=0,minutes=0,hours=0, days=0, blocks = 0):
return int(seconds + minutes*60 + hours*60*60 + days*60*60*24 + blocks * 600)
def encode_amount(amount, decimal_point):
if is_perc(amount):
return amount
else:
try: try:
return int(float(amount)*pow(10,decimal_point)) locktime=int(locktime)
except: if locktime > LOCKTIME_THRESHOLD:
return 0 dt = datetime.fromtimestamp(locktime).isoformat()
return dt
def decode_amount(amount,decimal_point): except Exception as e:
if is_perc(amount): pass
return amount return str(locktime)
else:
num=8-decimal_point
basestr="{{:0{}.{}f}}".format(num,num)
return "{:08.8f}".format(float(amount)/pow(10,decimal_point))
def is_perc(value): def str_to_locktime(locktime):
try:
if locktime[-1] in ('y','d','b'):
return locktime
else: return int(locktime)
except Exception as e:
pass
dt_object = datetime.fromisoformat(locktime)
timestamp = dt_object.timestamp()
return int(timestamp)
def parse_locktime_string(locktime,w=None):
try:
return int(locktime)
except Exception as e:
pass
try:
now = datetime.now()
if locktime[-1] == 'y':
locktime = str(int(locktime[:-1])*365) + "d"
if locktime[-1] == 'd':
return int((now + timedelta(days = int(locktime[:-1]))).replace(hour=0,minute=0,second=0,microsecond=0).timestamp())
if locktime[-1] == 'b':
locktime = int(locktime[:-1])
height = 0
if w:
height = Util.get_current_height(w.network)
locktime+=int(height)
return int(locktime)
except Exception as e:
pass
return 0
def int_locktime(seconds=0,minutes=0,hours=0, days=0, blocks = 0):
return int(seconds + minutes*60 + hours*60*60 + days*60*60*24 + blocks * 600)
def encode_amount(amount, decimal_point):
if Util.is_perc(amount):
return amount
else:
try:
return int(float(amount)*pow(10,decimal_point))
except:
return 0
def decode_amount(amount,decimal_point):
if Util.is_perc(amount):
return amount
else:
num=8-decimal_point
basestr="{{:0{}.{}f}}".format(num,num)
return "{:08.8f}".format(float(amount)/pow(10,decimal_point))
def is_perc(value):
try: try:
return value[-1] == '%' return value[-1] == '%'
except: except:
return False return False
def cmp_array(heira,heirb): def cmp_array(heira,heirb):
try: try:
if not len(heira) == len(heirb): if not len(heira) == len(heirb):
return False
for h in range(0,len(heira)):
if not heira[h] == heirb[h]:
return False return False
return True for h in range(0,len(heira)):
except: if not heira[h] == heirb[h]:
return False return False
def cmp_heir(heira,heirb):
if heira[0] == heirb[0] and heira[1] == heirb[1]:
return True
return False
def cmp_willexecutor(willexecutora,willexecutorb):
if willexecutora == willexecutorb:
return True
try:
if willexecutora['url']==willexecutorb['url'] and willexecutora['address'] == willexecutorb['address'] and willexecutora['base_fee']==willexecutorb['base_fee']:
return True return True
except: except:
return False
return False
def search_heir_by_values(heirs,heir,values):
#print()
for h,v in heirs.items():
found = False
for val in values:
if val in v and v[val] != heir[val]:
found = True
if not found:
return h
return False
def cmp_heir_by_values(heira,heirb,values):
for v in values:
if heira[v] != heirb[v]:
return False return False
return True
def cmp_heirs_by_values(heirsa,heirsb,values,exclude_willexecutors=False,reverse = True): def cmp_heir(heira,heirb):
for heira in heirsa: if heira[0] == heirb[0] and heira[1] == heirb[1]:
if (exclude_willexecutors and not "w!ll3x3c\"" in heira) or not exclude_willexecutors: return True
return False
def cmp_willexecutor(willexecutora,willexecutorb):
if willexecutora == willexecutorb:
return True
try:
if willexecutora['url']==willexecutorb['url'] and willexecutora['address'] == willexecutorb['address'] and willexecutora['base_fee']==willexecutorb['base_fee']:
return True
except:
return False
return False
def search_heir_by_values(heirs,heir,values):
for h,v in heirs.items():
found = False found = False
for heirb in heirsb: for val in values:
if cmp_heir_by_values(heirsa[heira],heirsb[heirb],values): if val in v and v[val] != heir[val]:
found=True found = True
if not found: if not found:
#print(f"not_found {heira}--{heirsa[heira]}") return h
return False
def cmp_heir_by_values(heira,heirb,values):
for v in values:
if heira[v] != heirb[v]:
return False return False
if reverse:
return cmp_heirs_by_values(heirsb,heirsa,values,exclude_willexecutors=exclude_willexecutors,reverse=False)
else:
return True return True
def cmp_heirs(heirsa,heirsb,cmp_function = lambda x,y: x[0]==y[0] and x[3]==y[3],reverse=True): def cmp_heirs_by_values(heirsa,heirsb,values,exclude_willexecutors=False,reverse = True):
try: for heira in heirsa:
for heir in heirsa: if (exclude_willexecutors and not "w!ll3x3c\"" in heira) or not exclude_willexecutors:
if not "w!ll3x3c\"" in heir: found = False
if not heir in heirsb or not cmp_function(heirsa[heir],heirsb[heir]): for heirb in heirsb:
if not search_heir_by_values(heirsb,heirsa[heir],[0,3]): if Util.cmp_heir_by_values(heirsa[heira],heirsb[heirb],values):
return False found=True
if not found:
return False
if reverse: if reverse:
return cmp_heirs(heirsb,heirsa,cmp_function,False) return Util.cmp_heirs_by_values(heirsb,heirsa,values,exclude_willexecutors=exclude_willexecutors,reverse=False)
else: else:
return True return True
except Exception as e:
raise e
return False
def cmp_inputs(inputsa,inputsb): def cmp_heirs(heirsa,heirsb,cmp_function = lambda x,y: x[0]==y[0] and x[3]==y[3],reverse=True):
if len(inputsa) != len(inputsb): try:
return False for heir in heirsa:
for inputa in inputsa: if not "w!ll3x3c\"" in heir:
if not in_utxo(inputa,inputsb): if not heir in heirsb or not cmp_function(heirsa[heir],heirsb[heir]):
return False if not Util.search_heir_by_values(heirsb,heirsa[heir],[0,3]):
return True return False
if reverse:
def cmp_outputs(outputsa,outputsb,willexecutor_output = None): return Util.cmp_heirs(heirsb,heirsa,cmp_function,False)
if len(outputsa) != len(outputsb):
return False
for outputa in outputsa:
if not cmp_output(outputa,willexecutor_output):
if not in_output(outputa,outputsb):
return False
return True
def cmp_txs(txa,txb):
if not cmp_inputs(txa.inputs(),txb.inputs()):
return False
if not cmp_outputs(txa.outputs(),txb.outputs()):
return False
return True
def get_value_amount(txa,txb):
outputsa=txa.outputs()
outputsb=txb.outputs()
value_amount = 0
#if len(outputsa) != len(outputsb):
# print("outputlen is different")
# return False
for outa in outputsa:
same_amount,same_address = in_output(outa,txb.outputs())
if not (same_amount or same_address):
#print("outa notin txb", same_amount,same_address)
return False
if same_amount and same_address:
value_amount+=outa.value
if same_amount:
pass
#print("same amount")
if same_address:
pass
#print("same address")
return value_amount
#not needed
#for outb in outputsb:
# if not in_output(outb,txa.outputs()):
# print("outb notin txb")
# return False
def chk_locktime(timestamp_to_check,block_height_to_check,locktime):
#TODO BUG: WHAT HAPPEN AT THRESHOLD?
locktime=int(locktime)
if locktime > LOCKTIME_THRESHOLD and locktime > timestamp_to_check:
return True
elif locktime < LOCKTIME_THRESHOLD and locktime > block_height_to_check:
return True
else:
return False
def anticipate_locktime(locktime,blocks=0,hours=0,days=0):
locktime = int(locktime)
out=0
if locktime> LOCKTIME_THRESHOLD:
seconds = blocks*600 + hours*3600 + days*86400
dt = datetime.fromtimestamp(locktime)
dt -= timedelta(seconds=seconds)
out = dt.timestamp()
else:
blocks -= hours*6 + days*144
out = locktime + blocks
if out < 1:
out = 1
return out
def cmp_locktime(locktimea,locktimeb):
if locktimea==locktimeb:
return 0
strlocktime = str(locktimea)
strlocktimeb = str(locktimeb)
intlocktimea = str_to_locktime(strlocktimea)
intlocktimeb = str_to_locktime(strlocktimeb)
if locktimea[-1] in "ydb":
if locktimeb[-1] == locktimea[-1]:
return int(strlocktimea[-1])-int(strlocktimeb[-1])
else:
return int(locktimea)-(locktimeb)
def get_lowest_valid_tx(available_utxos,will):
will = sorted(will.items(),key = lambda x: x[1]['tx'].locktime)
for txid,willitem in will.items():
pass
def get_locktimes(will):
locktimes = {}
for txid,willitem in will.items():
locktimes[willitem['tx'].locktime]=True
return locktimes.keys()
def get_lowest_locktimes(locktimes):
sorted_timestamp=[]
sorted_block=[]
for l in locktimes:
#print("locktime:",parse_locktime_string(l))
l=parse_locktime_string(l)
if l < LOCKTIME_THRESHOLD:
bisect.insort(sorted_block,l)
else:
bisect.insort(sorted_timestamp,l)
return sorted(sorted_timestamp), sorted(sorted_block)
def get_lowest_locktimes_from_will(will):
return get_lowest_locktimes(get_locktimes(will))
def search_willtx_per_io(will,tx):
for wid, w in will.items():
if cmp_txs(w['tx'],tx['tx']):
return wid,w
return None, None
def invalidate_will(will):
raise Exception("not implemented")
def get_will_spent_utxos(will):
utxos=[]
for txid,willitem in will.items():
utxos+=willitem['tx'].inputs()
return utxos
def utxo_to_str(utxo):
try: return utxo.to_str()
except Exception as e: pass
try: return utxo.prevout.to_str()
except Exception as e: pass
return str(utxo)
def cmp_utxo(utxoa,utxob):
utxoa=utxo_to_str(utxoa)
utxob=utxo_to_str(utxob)
if utxoa == utxob:
#if utxoa.prevout.txid==utxob.prevout.txid and utxoa.prevout.out_idx == utxob.prevout.out_idx:
return True
else:
return False
def in_utxo(utxo, utxos):
for s_u in utxos:
if cmp_utxo(s_u,utxo):
return True
return False
def txid_in_utxo(txid,utxos):
for s_u in utxos:
if s_u.prevout.txid == txid:
return True
return False
def cmp_output(outputa,outputb):
return outputa.address == outputb.address and outputa.value == outputb.value
def in_output(output,outputs):
for s_o in outputs:
if cmp_output(s_o,output):
return True
return False
#check all output with the same amount if none have the same address it can be a change
#return true true same address same amount
#return true false same amount different address
#return false false different amount, different address not found
def din_output(out,outputs):
same_amount=[]
for s_o in outputs:
if int(out.value) == int(s_o.value):
same_amount.append(s_o)
if out.address==s_o.address:
#print("SAME_:",out.address,s_o.address)
return True, True
else: else:
return True
except Exception as e:
raise e
return False
def cmp_inputs(inputsa,inputsb):
if len(inputsa) != len(inputsb):
return False
for inputa in inputsa:
if not Util.in_utxo(inputa,inputsb):
return False
return True
def cmp_outputs(outputsa,outputsb,willexecutor_output = None):
if len(outputsa) != len(outputsb):
return False
for outputa in outputsa:
if not Util.cmp_output(outputa,willexecutor_output):
if not Util.in_output(outputa,outputsb):
return False
return True
def cmp_txs(txa,txb):
if not Util.cmp_inputs(txa.inputs(),txb.inputs()):
return False
if not Util.cmp_outputs(txa.outputs(),txb.outputs()):
return False
return True
def get_value_amount(txa,txb):
outputsa=txa.outputs()
outputsb=txb.outputs()
value_amount = 0
for outa in outputsa:
same_amount,same_address = Util.in_output(outa,txb.outputs())
if not (same_amount or same_address):
return False
if same_amount and same_address:
value_amount+=outa.value
if same_amount:
pass
if same_address:
pass pass
#print("NOT SAME_:",out.address,s_o.address)
if len(same_amount)>0: return value_amount
return True, False
else:return False, False
def get_change_output(wallet,in_amount,out_amount,fee):
change_amount = int(in_amount - out_amount - fee) def chk_locktime(timestamp_to_check,block_height_to_check,locktime):
if change_amount > wallet.dust_threshold(): #TODO BUG: WHAT HAPPEN AT THRESHOLD?
change_addresses = wallet.get_change_addresses_for_new_transaction() locktime=int(locktime)
out = PartialTxOutput.from_address_and_value(change_addresses[0], change_amount) if locktime > LOCKTIME_THRESHOLD and locktime > timestamp_to_check:
out.is_change = True return True
elif locktime < LOCKTIME_THRESHOLD and locktime > block_height_to_check:
return True
else:
return False
def anticipate_locktime(locktime,blocks=0,hours=0,days=0):
locktime = int(locktime)
out=0
if locktime> LOCKTIME_THRESHOLD:
seconds = blocks*600 + hours*3600 + days*86400
dt = datetime.fromtimestamp(locktime)
dt -= timedelta(seconds=seconds)
out = dt.timestamp()
else:
blocks -= hours*6 + days*144
out = locktime + blocks
if out < 1:
out = 1
return out return out
def cmp_locktime(locktimea,locktimeb):
def get_current_height(network:'Network'): if locktimea==locktimeb:
#if no network or not up to date, just set locktime to zero return 0
if not network: strlocktime = str(locktimea)
return 0 strlocktimeb = str(locktimeb)
chain = network.blockchain() intlocktimea = Util.str_to_locktime(strlocktimea)
if chain.is_tip_stale(): intlocktimeb = Util.str_to_locktime(strlocktimeb)
return 0 if locktimea[-1] in "ydb":
# figure out current block height if locktimeb[-1] == locktimea[-1]:
chain_height = chain.height() # learnt from all connected servers, SPV-checked return int(strlocktimea[-1])-int(strlocktimeb[-1])
server_height = network.get_server_height() # height claimed by main server, unverified else:
# note: main server might be lagging (either is slow, is malicious, or there is an SPV-invisible-hard-fork) return int(locktimea)-(locktimeb)
# - if it's lagging too much, it is the network's job to switch away
if server_height < chain_height - 10:
# the diff is suspiciously large... give up and use something non-fingerprintable
return 0
# discourage "fee sniping"
height = min(chain_height, server_height)
return height
def print_var(var,name = "",veryverbose=False): def get_lowest_valid_tx(available_utxos,will):
print(f"---{name}---") will = sorted(will.items(),key = lambda x: x[1]['tx'].locktime)
if not var is None: for txid,willitem in will.items():
pass
def get_locktimes(will):
locktimes = {}
for txid,willitem in will.items():
locktimes[willitem['tx'].locktime]=True
return locktimes.keys()
def get_lowest_locktimes(locktimes):
sorted_timestamp=[]
sorted_block=[]
for l in locktimes:
l=Util.parse_locktime_string(l)
if l < LOCKTIME_THRESHOLD:
bisect.insort(sorted_block,l)
else:
bisect.insort(sorted_timestamp,l)
return sorted(sorted_timestamp), sorted(sorted_block)
def get_lowest_locktimes_from_will(will):
return Util.get_lowest_locktimes(Util.get_locktimes(will))
def search_willtx_per_io(will,tx):
for wid, w in will.items():
if Util.cmp_txs(w['tx'],tx['tx']):
return wid,w
return None, None
def invalidate_will(will):
raise Exception("not implemented")
def get_will_spent_utxos(will):
utxos=[]
for txid,willitem in will.items():
utxos+=willitem['tx'].inputs()
return utxos
def utxo_to_str(utxo):
try: return utxo.to_str()
except Exception as e: pass
try: return utxo.prevout.to_str()
except Exception as e: pass
return str(utxo)
def cmp_utxo(utxoa,utxob):
utxoa=Util.utxo_to_str(utxoa)
utxob=Util.utxo_to_str(utxob)
if utxoa == utxob:
return True
else:
return False
def in_utxo(utxo, utxos):
for s_u in utxos:
if Util.cmp_utxo(s_u,utxo):
return True
return False
def txid_in_utxo(txid,utxos):
for s_u in utxos:
if s_u.prevout.txid == txid:
return True
return False
def cmp_output(outputa,outputb):
return outputa.address == outputb.address and outputa.value == outputb.value
def in_output(output,outputs):
for s_o in outputs:
if Util.cmp_output(s_o,output):
return True
return False
#check all output with the same amount if none have the same address it can be a change
#return true true same address same amount
#return true false same amount different address
#return false false different amount, different address not found
def din_output(out,outputs):
same_amount=[]
for s_o in outputs:
if int(out.value) == int(s_o.value):
same_amount.append(s_o)
if out.address==s_o.address:
return True, True
else:
pass
if len(same_amount)>0:
return True, False
else:return False, False
def get_change_output(wallet,in_amount,out_amount,fee):
change_amount = int(in_amount - out_amount - fee)
if change_amount > wallet.dust_threshold():
change_addresses = wallet.get_change_addresses_for_new_transaction()
out = PartialTxOutput.from_address_and_value(change_addresses[0], change_amount)
out.is_change = True
return out
def get_current_height(network:'Network'):
#if no network or not up to date, just set locktime to zero
if not network:
return 0
chain = network.blockchain()
if chain.is_tip_stale():
return 0
# figure out current block height
chain_height = chain.height() # learnt from all connected servers, SPV-checked
server_height = network.get_server_height() # height claimed by main server, unverified
# note: main server might be lagging (either is slow, is malicious, or there is an SPV-invisible-hard-fork)
# - if it's lagging too much, it is the network's job to switch away
if server_height < chain_height - 10:
# the diff is suspiciously large... give up and use something non-fingerprintable
return 0
# discourage "fee sniping"
height = min(chain_height, server_height)
return height
def print_var(var,name = "",veryverbose=False):
print(f"---{name}---")
if not var is None:
try:
print("doc:",doc(var))
except: pass
try:
print("str:",str(var))
except: pass
try:
print("repr",repr(var))
except:pass
try:
print("dict",dict(var))
except:pass
try:
print("dir",dir(var))
except:pass
try:
print("type",type(var))
except:pass
try:
print("to_json",var.to_json())
except: pass
try:
print("__slotnames__",var.__slotnames__)
except:pass
print(f"---end {name}---")
def print_utxo(utxo, name = ""):
print(f"---utxo-{name}---")
Util.print_var(utxo,name)
Util.print_prevout(utxo.prevout,name)
Util.print_var(utxo.script_sig,f"{name}-script-sig")
Util.print_var(utxo.witness,f"{name}-witness")
print("_TxInput__address:",utxo._TxInput__address)
print("_TxInput__scriptpubkey:",utxo._TxInput__scriptpubkey)
print("_TxInput__value_sats:",utxo._TxInput__value_sats)
print(f"---utxo-end {name}---")
def print_prevout(prevout, name = ""):
print(f"---prevout-{name}---")
Util.print_var(prevout,f"{name}-prevout")
Util.print_var(prevout._asdict())
print(f"---prevout-end {name}---")
def export_meta_gui(electrum_window: 'ElectrumWindow', title, exporter):
filter_ = "All files (*)"
filename = getSaveFileName(
parent=electrum_window,
title=_("Select file to save your {}").format(title),
filename='BALplugin_{}'.format(title),
filter=filter_,
config=electrum_window.config,
)
if not filename:
return
try: try:
print("doc:",doc(var)) exporter(filename)
except: pass except FileExportFailed as e:
try: electrum_window.show_critical(str(e))
print("str:",str(var)) else:
except: pass electrum_window.show_message(_("Your {0} were exported to '{1}'")
try: .format(title, str(filename)))
print("repr",repr(var))
except:pass
try:
print("dict",dict(var))
except:pass
try:
print("dir",dir(var))
except:pass
try:
print("type",type(var))
except:pass
try:
print("to_json",var.to_json())
except: pass
try:
print("__slotnames__",var.__slotnames__)
except:pass
print(f"---end {name}---")
def print_utxo(utxo, name = ""):
print(f"---utxo-{name}---")
print_var(utxo,name)
print_prevout(utxo.prevout,name)
print_var(utxo.script_sig,f"{name}-script-sig")
print_var(utxo.witness,f"{name}-witness")
#print("madonnamaiala_TXInput__scriptpubkey:",utxo._TXInput__scriptpubkey)
print("_TxInput__address:",utxo._TxInput__address)
print("_TxInput__scriptpubkey:",utxo._TxInput__scriptpubkey)
print("_TxInput__value_sats:",utxo._TxInput__value_sats)
print(f"---utxo-end {name}---")
def print_prevout(prevout, name = ""):
print(f"---prevout-{name}---")
print_var(prevout,f"{name}-prevout")
print_var(prevout._asdict())
print(f"---prevout-end {name}---")
def export_meta_gui(electrum_window: 'ElectrumWindow', title, exporter):
filter_ = "All files (*)"
filename = getSaveFileName(
parent=electrum_window,
title=_("Select file to save your {}").format(title),
filename='BALplugin_{}'.format(title),
filter=filter_,
config=electrum_window.config,
)
if not filename:
return
try:
exporter(filename)
except FileExportFailed as e:
electrum_window.show_critical(str(e))
else:
electrum_window.show_message(_("Your {0} were exported to '{1}'")
.format(title, str(filename)))
def copy(dicto,dictfrom): def copy(dicto,dictfrom):
for k,v in dictfrom.items(): for k,v in dictfrom.items():
dicto[k]=v dicto[k]=v

1042
will.py

File diff suppressed because it is too large Load Diff

View File

@ -9,195 +9,214 @@ from electrum.logging import get_logger
from electrum.gui.qt.util import WaitingDialog from electrum.gui.qt.util import WaitingDialog
from electrum.i18n import _ from electrum.i18n import _
from .balqt.baldialog import BalWaitingDialog from .util import Util
from . import util as Util
DEFAULT_TIMEOUT = 5 DEFAULT_TIMEOUT = 5
_logger = get_logger(__name__) _logger = get_logger(__name__)
class Willexecutors:
def get_willexecutors(bal_plugin, update = False,bal_window=False,force=False,task=True):
willexecutors = bal_plugin.config_get(bal_plugin.WILLEXECUTORS)
for w in willexecutors:
Willexecutors.initialize_willexecutor(willexecutors[w],w)
def get_willexecutors(bal_plugin, update = False,bal_window=False,force=False,task=True): bal=bal_plugin.DEFAULT_SETTINGS[bal_plugin.WILLEXECUTORS]
willexecutors = bal_plugin.config_get(bal_plugin.WILLEXECUTORS) for bal_url,bal_executor in bal.items():
for w in willexecutors: if not bal_url in willexecutors:
initialize_willexecutor(willexecutors[w],w) _logger.debug("replace bal")
willexecutors[bal_url]=bal_executor
if update:
found = False
for url,we in willexecutors.items():
if Willexecutors.is_selected(we):
found = True
if found or force:
if bal_plugin.config_get(bal_plugin.PING_WILLEXECUTORS) or force:
ping_willexecutors = True
if bal_plugin.config_get(bal_plugin.ASK_PING_WILLEXECUTORS) and not force:
ping_willexecutors = bal_window.window.question(_("Contact willexecutors servers to update payment informations?"))
if ping_willexecutors:
if task:
bal_window.ping_willexecutors(willexecutors)
else:
bal_window.ping_willexecutors_task(willexecutors)
return willexecutors
bal=bal_plugin.DEFAULT_SETTINGS[bal_plugin.WILLEXECUTORS] def is_selected(willexecutor,value=None):
for bal_url,bal_executor in bal.items(): if not willexecutor:
if not bal_url in willexecutors: return False
_logger.debug("replace bal") if not value is None:
willexecutors[bal_url]=bal_executor willexecutor['selected']=value
if update: try:
found = False return willexecutor['selected']
for url,we in willexecutors.items(): except:
if is_selected(we): willexecutor['selected']=False
found = True return False
if found or force:
if bal_plugin.config_get(bal_plugin.PING_WILLEXECUTORS) or force:
ping_willexecutors = True
if bal_plugin.config_get(bal_plugin.ASK_PING_WILLEXECUTORS) and not force:
ping_willexecutors = bal_window.window.question(_("Contact willexecutors servers to update payment informations?"))
if ping_willexecutors:
if task:
bal_window.ping_willexecutors(willexecutors)
else:
bal_window.ping_willexecutors_task(willexecutors)
return willexecutors
def is_selected(willexecutor,value=None): def get_willexecutor_transactions(will, force=False):
if not willexecutor: willexecutors ={}
return False for wid,willitem in will.items():
if not value is None: if willitem.get_status('VALID'):
willexecutor['selected']=value if willitem.get_status('COMPLETE'):
try: if not willitem.get_status('PUSHED') or force:
return willexecutor['selected'] if willexecutor := willitem.we:
except: url=willexecutor['url']
willexecutor['selected']=False if willexecutor and Willexecutors.is_selected(willexecutor):
return False if not url in willexecutors:
willexecutor['txs']=""
willexecutor['txsids']=[]
willexecutor['broadcast_status']= _("Waiting...")
willexecutors[url]=willexecutor
willexecutors[url]['txs']+=str(willitem.tx)+"\n"
willexecutors[url]['txsids'].append(wid)
def get_willexecutor_transactions(will, force=False): return willexecutors
willexecutors ={}
for wid,willitem in will.items():
if willitem.get_status('VALID'):
if willitem.get_status('COMPLETE'):
if not willitem.get_status('PUSHED') or force:
if willexecutor := willitem.we:
url=willexecutor['url']
if willexecutor and is_selected(willexecutor):
if not url in willexecutors:
willexecutor['txs']=""
willexecutor['txsids']=[]
willexecutor['broadcast_status']= _("Waiting...")
willexecutors[url]=willexecutor
willexecutors[url]['txs']+=str(willitem.tx)+"\n"
willexecutors[url]['txsids'].append(wid)
return willexecutors def only_selected_list(willexecutors):
out = {}
for url,v in willexecutors.items():
if Willexecutors.is_selected(willexecutor):
out[url]=v
def push_transactions_to_willexecutors(will):
willexecutors = get_transactions_to_be_pushed()
for url in willexecutors:
willexecutor = willexecutors[url]
if Willexecutors.is_selected(willexecutor):
if 'txs' in willexecutor:
Willexecutors.push_transactions_to_willexecutor(willexecutors[url]['txs'],url)
def only_selected_list(willexecutors): def send_request(method, url, data=None, *, timeout=10):
out = {} network = Network.get_instance()
for url,v in willexectors.items(): if not network:
if is_selected(willexecutor): raise ErrorConnectingServer('You are offline.')
out[url]=v _logger.debug(f'<-- {method} {url} {data}')
def push_transactions_to_willexecutors(will): headers = {}
willexecutors = get_transactions_to_be_pushed() headers['user-agent'] = 'BalPlugin'
for url in willexecutors: headers['Content-Type']='text/plain'
willexecutor = willexecutors[url]
if is_selected(willexecutor):
if 'txs' in willexecutor:
push_transactions_to_willexecutor(willexecutors[url]['txs'],url)
def send_request(method, url, data=None, *, timeout=10): try:
network = Network.get_instance() if method == 'get':
if not network: response = Network.send_http_on_proxy(method, url,
raise ErrorConnectingServer('You are offline.') params=data,
_logger.debug(f'<-- {method} {url} {data}') headers=headers,
headers = {} on_finish=Willexecutors.handle_response,
headers['user-agent'] = 'BalPlugin' timeout=timeout)
headers['Content-Type']='text/plain' elif method == 'post':
response = Network.send_http_on_proxy(method, url,
try: body=data,
if method == 'get': headers=headers,
response = Network.send_http_on_proxy(method, url, on_finish=Willexecutors.handle_response,
params=data, timeout=timeout)
headers=headers, else:
on_finish=handle_response, raise Exception(f"unexpected {method=!r}")
timeout=timeout) except Exception as e:
elif method == 'post': _logger.error(f"exception sending request {e}")
response = Network.send_http_on_proxy(method, url, raise e
body=data,
headers=headers,
on_finish=handle_response,
timeout=timeout)
else: else:
raise Exception(f"unexpected {method=!r}") _logger.debug(f'--> {response}')
except Exception as e: return response
_logger.error(f"exception sending request {e}") async def handle_response(resp:ClientResponse):
raise e r=await resp.text()
else: try:
_logger.debug(f'--> {response}') r=json.loads(r)
return response r['status'] = resp.status
async def handle_response(resp:ClientResponse): r['selected']=Willexecutors.is_selected(willexecutor)
r=await resp.text() r['url']=url
try: except:
r=json.loads(r) pass
r['status'] = resp.status return r
r['selected']=is_selected(willexecutor)
r['url']=url class AlreadyPresentException(Exception):
except:
pass pass
return r def push_transactions_to_willexecutor(willexecutor):
out=True
try:
class AlreadyPresentException(Exception): _logger.debug(f"willexecutor['txs']")
pass if w:=Willexecutors.send_request('post', willexecutor['url']+"/"+constants.net.NET_NAME+"/pushtxs", data=willexecutor['txs'].encode('ascii')):
def push_transactions_to_willexecutor(willexecutor): willexecutor['broadcast_status'] = _("Success")
out=True _logger.debug(f"pushed: {w}")
try: if w !='thx':
_logger.debug(f"willexecutor['txs']") _logger.debug(f"error: {w}")
if w:=send_request('post', willexecutor['url']+"/"+constants.net.NET_NAME+"/pushtxs", data=willexecutor['txs'].encode('ascii')): raise Exception(w)
willexecutor['broadcast_status'] = _("Success") else:
_logger.debug(f"pushed: {w}") raise Exception("empty reply from:{willexecutor['url']}")
if w !='thx': except Exception as e:
_logger.debug(f"error: {w}") _logger.debug(f"error:{e}")
raise Exception(w) if str(e) == "already present":
else: raise Willexecutors.AlreadyPresentException()
raise Exception("empty reply from:{willexecutor['url']}") out=False
except Exception as e: willexecutor['broadcast_status'] = _("Failed")
_logger.debug(f"error:{e}")
if str(e) == "already present":
raise AlreadyPresentException()
out=False
willexecutor['broadcast_status'] = _("Failed")
return out return out
def ping_servers(willexecutors): def ping_servers(willexecutors):
for url,we in willexecutors.items(): for url,we in willexecutors.items():
get_info_task(url,we) Willexecutors.get_info_task(url,we)
def get_info_task(url,willexecutor): def get_info_task(url,willexecutor):
w=None w=None
try: try:
_logger.info("GETINFO_WILLEXECUTOR") _logger.info("GETINFO_WILLEXECUTOR")
_logger.debug(url) _logger.debug(url)
w = send_request('get',url+"/"+constants.net.NET_NAME+"/info") netname="bitcoin"
if constants.net.NET_NAME!="mainnet":
netname=constants.net.NET_NAME
w = Willexecutors.send_request('get',url+"/"+netname+"/info")
willexecutor['url']=url
willexecutor['status'] = w['status']
willexecutor['base_fee'] = w['base_fee']
willexecutor['address'] = w['address']
if not willexecutor['info']:
willexecutor['info'] = w['info']
_logger.debug(f"response_data {w['address']}")
except Exception as e:
_logger.error(f"error {e} contacting {url}: {w}")
willexecutor['status']="KO"
willexecutor['last_update'] = datetime.now().timestamp()
return willexecutor
def initialize_willexecutor(willexecutor,url,status=None,selected=None):
print("1",willexecutor)
willexecutor['url']=url willexecutor['url']=url
willexecutor['status'] = w['status'] print("2",status)
willexecutor['base_fee'] = w['base_fee'] if not status is None:
willexecutor['address'] = w['address'] willexecutor['status'] = status
if not willexecutor['info']: willexecutor['selected'] = Willexecutors.is_selected(willexecutor,selected)
willexecutor['info'] = w['info'] def download_list(bal_plugin):
_logger.debug(f"response_data {w['address']}") try:
except Exception as e: l = Willexecutors.send_request('get',"https://welist.bitcoin-after.life/data/bitcoin?page=0&limit=100")
_logger.error(f"error {e} contacting {url}: {w}") del l['status']
willexecutor['status']="KO" from pprint import pprint
pprint(l)
for w in l:
print("----")
willexecutor=l[w]
Willexecutors.initialize_willexecutor(willexecutor,w,'New',False)
bal_plugin.config.set_key(bal_plugin.WILLEXECUTORS,l,save=True)
return l
except Exception as e:
_logger.error(f"error downloading willexecutors list:{e}")
return {}
def get_willexecutors_list_from_json(bal_plugin):
try:
with open("willexecutors.json") as f:
willexecutors = json.load(f)
for w in willexecutors:
willexecutor=willexecutors[w]
Willexecutors.initialize_willexecutor(willexecutor,w,'New',False)
bal_plugin.config.set_key(bal_plugin.WILLEXECUTORS,willexecutors,save=True)
return h
except Exception as e:
_logger.error(f"errore aprendo willexecutors.json: {e}")
return {}
willexecutor['last_update'] = datetime.now().timestamp() def check_transaction(txid,url):
return willexecutor _logger.debug(f"{url}:{txid}")
try:
def initialize_willexecutor(willexecutor,url,status=None,selected=None): w = Willexecutors.send_request('post',url+"/searchtx",data=txid.encode('ascii'))
willexecutor['url']=url return w
if not status is None: except Exception as e:
willexecutor['status'] = status _logger.error(f"error contacting {url} for checking txs {e}")
willexecutor['selected'] = is_selected(willexecutor,selected) raise e
def get_willexecutors_list_from_json(bal_plugin):
try:
with open("willexecutors.json") as f:
willexecutors = json.load(f)
for w in willexecutors:
willexecutor=willexecutors[w]
willexecutors.initialize_willexecutor(willexecutor,w,'New',False)
bal_plugin.config.set_key(bal_plugin.WILLEXECUTORS,willexecutors,save=True)
return h
except Exception as e:
_logger.error(f"errore aprendo willexecutors.json: {e}")
return {}
def check_transaction(txid,url):
_logger.debug(f"{url}:{txid}")
try:
w = send_request('post',url+"/searchtx",data=txid.encode('ascii'))
return w
except Exception as e:
_logger.error(f"error contacting {url} for checking txs {e}")
raise e