459 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			459 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from datetime import datetime,timedelta
 | 
						|
import bisect
 | 
						|
from electrum.gui.qt.util import getSaveFileName
 | 
						|
from electrum.i18n import _
 | 
						|
from electrum.transaction import PartialTxOutput
 | 
						|
import urllib.request
 | 
						|
import urllib.parse
 | 
						|
from electrum.util import write_json_file,FileImportFailed,FileExportFailed
 | 
						|
 | 
						|
LOCKTIME_THRESHOLD = 500000000
 | 
						|
def locktime_to_str(locktime):
 | 
						|
    try:
 | 
						|
        locktime=int(locktime)
 | 
						|
        if locktime > LOCKTIME_THRESHOLD:
 | 
						|
            dt = datetime.fromtimestamp(locktime).isoformat()
 | 
						|
            return dt
 | 
						|
 | 
						|
    except Exception as e:
 | 
						|
        #print(e)
 | 
						|
        pass
 | 
						|
    return str(locktime)
 | 
						|
 | 
						|
def str_to_locktime(locktime):
 | 
						|
    try:
 | 
						|
        if locktime[-1] in ('y','d','b'):
 | 
						|
          return locktime
 | 
						|
        else: return int(locktime)
 | 
						|
    except Exception as e:
 | 
						|
        pass
 | 
						|
        #print(e)
 | 
						|
    dt_object = datetime.fromisoformat(locktime)
 | 
						|
    timestamp = dt_object.timestamp()
 | 
						|
    return int(timestamp)
 | 
						|
 | 
						|
def parse_locktime_string(locktime,w=None): 
 | 
						|
    try: 
 | 
						|
        return int(locktime) 
 | 
						|
 
 | 
						|
    except Exception as e: 
 | 
						|
        pass
 | 
						|
        #print("parse_locktime_string",e) 
 | 
						|
    try: 
 | 
						|
        now = datetime.now() 
 | 
						|
        if locktime[-1] == 'y': 
 | 
						|
            locktime = str(int(locktime[:-1])*365) + "d" 
 | 
						|
        if locktime[-1] == 'd': 
 | 
						|
            return int((now + timedelta(days = int(locktime[:-1]))).replace(hour=0,minute=0,second=0,microsecond=0).timestamp()) 
 | 
						|
        if locktime[-1] == 'b': 
 | 
						|
            locktime = int(locktime[:-1]) 
 | 
						|
            height = 0 
 | 
						|
            if w: 
 | 
						|
                height = get_current_height(w.network) 
 | 
						|
            locktime+=int(height) 
 | 
						|
        return int(locktime) 
 | 
						|
    except Exception as e: 
 | 
						|
        print("parse_locktime_string",e) 
 | 
						|
        #raise e
 | 
						|
    return 0
 | 
						|
 | 
						|
 | 
						|
def int_locktime(seconds=0,minutes=0,hours=0, days=0, blocks = 0):
 | 
						|
    return int(seconds + minutes*60 + hours*60*60 + days*60*60*24 + blocks * 600)
 | 
						|
 | 
						|
def encode_amount(amount, decimal_point):
 | 
						|
    if is_perc(amount):
 | 
						|
        return amount
 | 
						|
    else:
 | 
						|
        try:
 | 
						|
            return int(float(amount)*pow(10,decimal_point))
 | 
						|
        except:
 | 
						|
            return 0
 | 
						|
 | 
						|
def decode_amount(amount,decimal_point):
 | 
						|
    if is_perc(amount):
 | 
						|
        return amount
 | 
						|
    else:
 | 
						|
        num=8-decimal_point
 | 
						|
        basestr="{{:0{}.{}f}}".format(num,num)
 | 
						|
        return "{:08.8f}".format(float(amount)/pow(10,decimal_point))
 | 
						|
 | 
						|
def is_perc(value): 
 | 
						|
        try:
 | 
						|
            return value[-1] == '%'
 | 
						|
        except:
 | 
						|
            return False
 | 
						|
 | 
						|
def cmp_array(heira,heirb):
 | 
						|
    try:
 | 
						|
        if not len(heira) == len(heirb):
 | 
						|
            return False
 | 
						|
        for h in range(0,len(heira)):
 | 
						|
            if not heira[h] == heirb[h]:
 | 
						|
                return False
 | 
						|
        return True
 | 
						|
    except:
 | 
						|
        return False
 | 
						|
 | 
						|
def cmp_heir(heira,heirb):
 | 
						|
    if heira[0] == heirb[0] and heira[1] == heirb[1]: 
 | 
						|
        return True
 | 
						|
    return False
 | 
						|
 | 
						|
def cmp_willexecutor(willexecutora,willexecutorb):
 | 
						|
    if willexecutora == willexecutorb:
 | 
						|
        return True
 | 
						|
    try:
 | 
						|
        if willexecutora['url']==willexecutorb['url'] and willexecutora['address'] == willexecutorb['address'] and willexecutora['base_fee']==willexecutorb['base_fee']:
 | 
						|
            return True
 | 
						|
    except:
 | 
						|
        return False
 | 
						|
    return False
 | 
						|
 | 
						|
def search_heir_by_values(heirs,heir,values):
 | 
						|
    #print()
 | 
						|
    for h,v in heirs.items():
 | 
						|
        found = False
 | 
						|
        for val in values:
 | 
						|
            if val in v and v[val] != heir[val]:
 | 
						|
                found = True
 | 
						|
 | 
						|
        if not found:
 | 
						|
            return h
 | 
						|
    return False
 | 
						|
 | 
						|
def cmp_heir_by_values(heira,heirb,values):
 | 
						|
    for v in values:
 | 
						|
        if heira[v] != heirb[v]:
 | 
						|
            return False
 | 
						|
    return True
 | 
						|
 | 
						|
def cmp_heirs_by_values(heirsa,heirsb,values,exclude_willexecutors=False,reverse = True):
 | 
						|
    for heira in heirsa:
 | 
						|
        if (exclude_willexecutors and not "w!ll3x3c\"" in heira) or not exclude_willexecutors:
 | 
						|
            found = False
 | 
						|
            for heirb in heirsb:
 | 
						|
                if cmp_heir_by_values(heirsa[heira],heirsb[heirb],values):
 | 
						|
                    found=True
 | 
						|
            if not found:
 | 
						|
                #print(f"not_found {heira}--{heirsa[heira]}")
 | 
						|
                return False
 | 
						|
    if reverse:
 | 
						|
        return cmp_heirs_by_values(heirsb,heirsa,values,exclude_willexecutors=exclude_willexecutors,reverse=False)
 | 
						|
    else:
 | 
						|
        return True
 | 
						|
    
 | 
						|
def cmp_heirs(heirsa,heirsb,cmp_function = lambda x,y: x[0]==y[0] and x[3]==y[3],reverse=True):
 | 
						|
    try:
 | 
						|
        for heir in heirsa:
 | 
						|
            if not "w!ll3x3c\"" in heir:
 | 
						|
                if not heir in heirsb or not cmp_function(heirsa[heir],heirsb[heir]):
 | 
						|
                    if not search_heir_by_values(heirsb,heirsa[heir],[0,3]):
 | 
						|
                        return False
 | 
						|
        if reverse:
 | 
						|
            return cmp_heirs(heirsb,heirsa,cmp_function,False)
 | 
						|
        else:
 | 
						|
            return True
 | 
						|
    except Exception as e:
 | 
						|
        raise e
 | 
						|
        return False
 | 
						|
 | 
						|
def cmp_inputs(inputsa,inputsb):
 | 
						|
    if len(inputsa) != len(inputsb): 
 | 
						|
        return False 
 | 
						|
    for inputa in inputsa:
 | 
						|
        if not in_utxo(inputa,inputsb):
 | 
						|
            return False
 | 
						|
    return True
 | 
						|
 | 
						|
def cmp_outputs(outputsa,outputsb,willexecutor_output = None):
 | 
						|
    if len(outputsa) != len(outputsb): 
 | 
						|
        return False 
 | 
						|
    for outputa in outputsa: 
 | 
						|
        if not cmp_output(outputa,willexecutor_output):
 | 
						|
            if not in_output(outputa,outputsb): 
 | 
						|
                return False
 | 
						|
    return True
 | 
						|
 | 
						|
def cmp_txs(txa,txb):
 | 
						|
    if not cmp_inputs(txa.inputs(),txb.inputs()):
 | 
						|
        return False
 | 
						|
    if not cmp_outputs(txa.outputs(),txb.outputs()):
 | 
						|
        return False
 | 
						|
    return True
 | 
						|
 | 
						|
def get_value_amount(txa,txb):
 | 
						|
    outputsa=txa.outputs()
 | 
						|
    outputsb=txb.outputs()
 | 
						|
    value_amount = 0
 | 
						|
    #if len(outputsa) != len(outputsb):
 | 
						|
    #    print("outputlen is different")
 | 
						|
    #    return False
 | 
						|
 | 
						|
    for outa in outputsa:
 | 
						|
        same_amount,same_address = in_output(outa,txb.outputs())
 | 
						|
        if not (same_amount or same_address):
 | 
						|
            #print("outa notin txb", same_amount,same_address)
 | 
						|
            return False
 | 
						|
        if same_amount and same_address:
 | 
						|
            value_amount+=outa.value
 | 
						|
        if same_amount:
 | 
						|
            pass
 | 
						|
            #print("same amount")
 | 
						|
        if same_address:
 | 
						|
            pass
 | 
						|
            #print("same address")
 | 
						|
 | 
						|
    return value_amount
 | 
						|
    #not needed
 | 
						|
    #for outb in outputsb:
 | 
						|
    #    if not in_output(outb,txa.outputs()):
 | 
						|
    #        print("outb notin txb")
 | 
						|
    #        return False
 | 
						|
 | 
						|
 | 
						|
 | 
						|
def chk_locktime(timestamp_to_check,block_height_to_check,locktime):
 | 
						|
    #TODO BUG:  WHAT HAPPEN AT THRESHOLD?
 | 
						|
    locktime=int(locktime)
 | 
						|
    if locktime > LOCKTIME_THRESHOLD and locktime > timestamp_to_check:
 | 
						|
        return True
 | 
						|
    elif locktime < LOCKTIME_THRESHOLD and locktime > block_height_to_check:
 | 
						|
        return True
 | 
						|
    else:
 | 
						|
        return False
 | 
						|
 | 
						|
def anticipate_locktime(locktime,blocks=0,hours=0,days=0):
 | 
						|
    locktime = int(locktime)
 | 
						|
    out=0
 | 
						|
    if locktime> LOCKTIME_THRESHOLD:
 | 
						|
        seconds = blocks*600 + hours*3600 + days*86400
 | 
						|
        dt = datetime.fromtimestamp(locktime)
 | 
						|
        dt -= timedelta(seconds=seconds)
 | 
						|
        out = dt.timestamp()
 | 
						|
    else:
 | 
						|
        blocks -= hours*6 + days*144
 | 
						|
        out = locktime + blocks
 | 
						|
 | 
						|
    if out < 1:
 | 
						|
        out = 1 
 | 
						|
    return out
 | 
						|
 | 
						|
def cmp_locktime(locktimea,locktimeb):
 | 
						|
    if locktimea==locktimeb:
 | 
						|
        return 0
 | 
						|
    strlocktime = str(locktimea)
 | 
						|
    strlocktimeb = str(locktimeb)
 | 
						|
    intlocktimea = str_to_locktime(strlocktimea)
 | 
						|
    intlocktimeb = str_to_locktime(strlocktimeb)
 | 
						|
    if locktimea[-1] in "ydb":
 | 
						|
        if locktimeb[-1] == locktimea[-1]:
 | 
						|
            return int(strlocktimea[-1])-int(strlocktimeb[-1])
 | 
						|
        else:
 | 
						|
            return int(locktimea)-(locktimeb)
 | 
						|
    
 | 
						|
 | 
						|
def get_lowest_valid_tx(available_utxos,will):
 | 
						|
    will = sorted(will.items(),key = lambda x: x[1]['tx'].locktime)
 | 
						|
    for txid,willitem in will.items():
 | 
						|
        pass
 | 
						|
 | 
						|
def get_locktimes(will):
 | 
						|
    locktimes = {}
 | 
						|
    for txid,willitem in will.items():
 | 
						|
        locktimes[willitem['tx'].locktime]=True
 | 
						|
    return locktimes.keys()
 | 
						|
 | 
						|
def get_lowest_locktimes(locktimes):
 | 
						|
    sorted_timestamp=[]
 | 
						|
    sorted_block=[]
 | 
						|
    for l in locktimes:
 | 
						|
        #print("locktime:",parse_locktime_string(l))
 | 
						|
        l=parse_locktime_string(l)
 | 
						|
        if l < LOCKTIME_THRESHOLD:
 | 
						|
            bisect.insort(sorted_block,l)
 | 
						|
        else:
 | 
						|
            bisect.insort(sorted_timestamp,l)
 | 
						|
 | 
						|
    return sorted(sorted_timestamp), sorted(sorted_block)
 | 
						|
 | 
						|
def get_lowest_locktimes_from_will(will):
 | 
						|
    return get_lowest_locktimes(get_locktimes(will))
 | 
						|
 | 
						|
def search_willtx_per_io(will,tx):
 | 
						|
    for wid, w in will.items():
 | 
						|
        if cmp_txs(w['tx'],tx['tx']):
 | 
						|
            return wid,w
 | 
						|
    return None, None
 | 
						|
 | 
						|
def invalidate_will(will):
 | 
						|
    raise Exception("not implemented")
 | 
						|
 | 
						|
def get_will_spent_utxos(will):
 | 
						|
    utxos=[]
 | 
						|
    for txid,willitem in will.items():
 | 
						|
        utxos+=willitem['tx'].inputs()
 | 
						|
        
 | 
						|
    return utxos
 | 
						|
 | 
						|
def utxo_to_str(utxo):
 | 
						|
    try: return utxo.to_str()
 | 
						|
    except Exception as e: pass
 | 
						|
    try: return utxo.prevout.to_str()
 | 
						|
    except Exception as e: pass
 | 
						|
    return str(utxo)
 | 
						|
 | 
						|
def cmp_utxo(utxoa,utxob):
 | 
						|
    utxoa=utxo_to_str(utxoa)
 | 
						|
    utxob=utxo_to_str(utxob)
 | 
						|
    if utxoa == utxob:
 | 
						|
    #if utxoa.prevout.txid==utxob.prevout.txid and utxoa.prevout.out_idx == utxob.prevout.out_idx:
 | 
						|
        return True
 | 
						|
    else:
 | 
						|
        return False
 | 
						|
 | 
						|
def in_utxo(utxo, utxos):
 | 
						|
    for s_u in utxos:
 | 
						|
        if cmp_utxo(s_u,utxo):
 | 
						|
            return True
 | 
						|
    return False
 | 
						|
 | 
						|
def txid_in_utxo(txid,utxos):
 | 
						|
    for s_u in utxos:
 | 
						|
        if s_u.prevout.txid == txid:
 | 
						|
            return True
 | 
						|
    return False
 | 
						|
 | 
						|
def cmp_output(outputa,outputb):
 | 
						|
    return outputa.address == outputb.address and outputa.value == outputb.value
 | 
						|
 | 
						|
def in_output(output,outputs):
 | 
						|
    for s_o in outputs:
 | 
						|
        if cmp_output(s_o,output):
 | 
						|
            return True
 | 
						|
    return False
 | 
						|
 | 
						|
#check all output with the same amount if none have the same address it can be a change
 | 
						|
#return true true same address same amount 
 | 
						|
#return true false same amount different address
 | 
						|
#return false false different amount, different address not found
 | 
						|
 | 
						|
 | 
						|
def din_output(out,outputs):
 | 
						|
    same_amount=[]
 | 
						|
    for s_o in outputs:
 | 
						|
        if int(out.value) == int(s_o.value):
 | 
						|
            same_amount.append(s_o)
 | 
						|
            if out.address==s_o.address:
 | 
						|
                #print("SAME_:",out.address,s_o.address)
 | 
						|
                return True, True
 | 
						|
            else:
 | 
						|
                pass
 | 
						|
                #print("NOT  SAME_:",out.address,s_o.address)
 | 
						|
 | 
						|
    if len(same_amount)>0:
 | 
						|
        return True, False
 | 
						|
    else:return False, False
 | 
						|
 | 
						|
 | 
						|
def get_change_output(wallet,in_amount,out_amount,fee): 
 | 
						|
    change_amount = int(in_amount - out_amount - fee) 
 | 
						|
    if change_amount > wallet.dust_threshold(): 
 | 
						|
        change_addresses = wallet.get_change_addresses_for_new_transaction() 
 | 
						|
        out = PartialTxOutput.from_address_and_value(change_addresses[0], change_amount) 
 | 
						|
        out.is_change = True 
 | 
						|
        return out
 | 
						|
 | 
						|
 | 
						|
def get_current_height(network:'Network'):
 | 
						|
    #if no network or not up to date, just set locktime to zero
 | 
						|
    if not network:
 | 
						|
        return 0
 | 
						|
    chain = network.blockchain()
 | 
						|
    if chain.is_tip_stale():
 | 
						|
        return 0
 | 
						|
    # figure out current block height
 | 
						|
    chain_height = chain.height()  # learnt from all connected servers, SPV-checked
 | 
						|
    server_height = network.get_server_height()  # height claimed by main server, unverified
 | 
						|
    # note: main server might be lagging (either is slow, is malicious, or there is an SPV-invisible-hard-fork)
 | 
						|
    #       - if it's lagging too much, it is the network's job to switch away
 | 
						|
    if server_height < chain_height - 10:
 | 
						|
        # the diff is suspiciously large... give up and use something non-fingerprintable
 | 
						|
        return 0
 | 
						|
    # discourage "fee sniping"
 | 
						|
    height = min(chain_height, server_height)
 | 
						|
    return height
 | 
						|
 | 
						|
 | 
						|
def print_var(var,name = "",veryverbose=False):
 | 
						|
    print(f"---{name}---")
 | 
						|
    if not var is None:
 | 
						|
        try:
 | 
						|
            print("doc:",doc(var))
 | 
						|
        except: pass
 | 
						|
        try:
 | 
						|
            print("str:",str(var))
 | 
						|
        except: pass
 | 
						|
        try:
 | 
						|
            print("repr",repr(var))
 | 
						|
        except:pass
 | 
						|
        try:
 | 
						|
            print("dict",dict(var))
 | 
						|
        except:pass
 | 
						|
        try:
 | 
						|
            print("dir",dir(var))
 | 
						|
        except:pass
 | 
						|
        try:
 | 
						|
            print("type",type(var))
 | 
						|
        except:pass
 | 
						|
        try:
 | 
						|
            print("to_json",var.to_json())
 | 
						|
        except: pass
 | 
						|
        try:
 | 
						|
            print("__slotnames__",var.__slotnames__)
 | 
						|
        except:pass
 | 
						|
 | 
						|
    print(f"---end {name}---")
 | 
						|
 | 
						|
def print_utxo(utxo, name = ""):
 | 
						|
    print(f"---utxo-{name}---")
 | 
						|
    print_var(utxo,name)
 | 
						|
    print_prevout(utxo.prevout,name)
 | 
						|
    print_var(utxo.script_sig,f"{name}-script-sig")
 | 
						|
    print_var(utxo.witness,f"{name}-witness")
 | 
						|
    #print("madonnamaiala_TXInput__scriptpubkey:",utxo._TXInput__scriptpubkey)
 | 
						|
    print("_TxInput__address:",utxo._TxInput__address)
 | 
						|
    print("_TxInput__scriptpubkey:",utxo._TxInput__scriptpubkey)
 | 
						|
    print("_TxInput__value_sats:",utxo._TxInput__value_sats)
 | 
						|
    print(f"---utxo-end {name}---")
 | 
						|
 | 
						|
def print_prevout(prevout, name = ""):
 | 
						|
    print(f"---prevout-{name}---")
 | 
						|
    print_var(prevout,f"{name}-prevout")
 | 
						|
    print_var(prevout._asdict())
 | 
						|
    print(f"---prevout-end {name}---")
 | 
						|
 | 
						|
def export_meta_gui(electrum_window: 'ElectrumWindow', title, exporter):
 | 
						|
    filter_ = "All files (*)"
 | 
						|
    filename = getSaveFileName(
 | 
						|
        parent=electrum_window,
 | 
						|
        title=_("Select file to save your {}").format(title),
 | 
						|
        filename='BALplugin_{}'.format(title),
 | 
						|
        filter=filter_,
 | 
						|
        config=electrum_window.config,
 | 
						|
    )
 | 
						|
    if not filename:
 | 
						|
        return
 | 
						|
    try:
 | 
						|
        exporter(filename)
 | 
						|
    except FileExportFailed as e:
 | 
						|
        electrum_window.show_critical(str(e))
 | 
						|
    else:
 | 
						|
        electrum_window.show_message(_("Your {0} were exported to '{1}'")
 | 
						|
                                 .format(title, str(filename)))
 | 
						|
 | 
						|
 | 
						|
def copy(dicto,dictfrom):
 | 
						|
    for k,v in dictfrom.items():
 | 
						|
        dicto[k]=v
 |