Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check Sighash Flag #2807

Merged
merged 25 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions counterparty-core/counterpartycore/lib/api/api_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,7 @@ def get_tx_info(tx_hex, block_index=None):
db,
deserialize.deserialize_tx(tx_hex, use_txid=use_txid),
block_index=block_index,
composing=True,
)
)
return (
Expand Down
14 changes: 10 additions & 4 deletions counterparty-core/counterpartycore/lib/deserialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,21 @@ def read_transaction(vds, use_txid=True):
offset_before_tx_witnesses = vds.read_cursor - start_pos
for vin in transaction["vin"]: # noqa: B007
witnesses_count = vds.read_compact_size()
for i in range(witnesses_count): # noqa: B007
witness_length = vds.read_compact_size()
witness = vds.read_bytes(witness_length)
transaction["vtxinwit"].append(witness)
if witnesses_count == 0:
transaction["vtxinwit"].append([])
else:
vin_witnesses = []
for i in range(witnesses_count): # noqa: B007
witness_length = vds.read_compact_size()
witness = vds.read_bytes(witness_length)
vin_witnesses.append(witness)
transaction["vtxinwit"].append(vin_witnesses)

transaction["lock_time"] = vds.read_uint32()
data = vds.input[start_pos : vds.read_cursor]

transaction["tx_hash"] = ib2h(double_hash(data))
transaction["tx_id"] = transaction["tx_hash"]
if transaction["segwit"]:
hash_data = data[:4] + data[6:offset_before_tx_witnesses] + data[-4:]
transaction["tx_id"] = ib2h(double_hash(hash_data))
Expand Down
121 changes: 116 additions & 5 deletions counterparty-core/counterpartycore/lib/gettxinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ def get_vin_info(vin):
if "value" in vin:
return vin["value"], vin["script_pub_key"], vin["is_segwit"]

# Note: We don't know what block the `vin` is in, and the block might have been from a while ago, so this call may not hit the cache.
# Note: We don't know what block the `vin` is in, and the block might
# have been from a while ago, so this call may not hit the cache.
vin_ctx = backend.bitcoind.get_decoded_transaction(vin["hash"])

is_segwit = len(vin_ctx["vtxinwit"]) > 0
Expand All @@ -155,11 +156,117 @@ def get_vin_info(vin):
return vout["value"], vout["script_pub_key"], is_segwit


def get_der_signature_sighash_flag(value):
if not isinstance(value, bytes):
return None
lenght_by_prefix = {
"3042": 69,
"3043": 70,
"3044": 71,
"3045": 72,
"3046": 73,
}
for prefix, length in lenght_by_prefix.items():
if value.startswith(binascii.unhexlify(prefix)) and len(value) == length:
return value[-1:]
return None


def get_schnorr_signature_sighash_flag(value):

Check warning

Code scanning / pylint

Either all return statements in a function should return an expression, or none of them should. Warning

Either all return statements in a function should return an expression, or none of them should.
if not isinstance(value, bytes):
return None
if len(value) not in [64, 65]:
return None
if len(value) == 65:
return value[-1:]
return b"\x01" # SIGHASH_ALL by default


def collect_sighash_flags(script_sig, witnesses):
flags = []

# P2PK, P2PKH, P2MS
if script_sig != b"":
asm = script.script_to_asm(script_sig)
for item in asm:
flag = get_der_signature_sighash_flag(item)
if flag is not None:
flags.append(flag)

if len(witnesses) == 0:
return flags

witnesses = [
binascii.unhexlify(witness) if isinstance(witness, str) else witness
for witness in witnesses
]

# P2WPKH
if len(witnesses) == 2:
flag = get_der_signature_sighash_flag(witnesses[0])
if flag is not None:
flags.append(flag)
return flags

# P2TR key path spend
if len(witnesses) == 1:
flag = get_schnorr_signature_sighash_flag(witnesses[0])
if flag is not None:
flags.append(flag)
return flags

# P2TR script path spend
if len(witnesses) >= 3:
for item in witnesses[:-2]: # ignore script and control block
flag = get_schnorr_signature_sighash_flag(item)
if flag is not None:
flags.append(flag)
return flags

return flags


# class SighashFlagError(DecodeError):
class SighashFlagError(Exception):
pass


# known transactions with invalid SIGHASH flag
SIGHASH_FLAG_TRANSACTION_WHITELIST = [
"c8091f1ef768a2f00d48e6d0f7a2c2d272a5d5c8063db78bf39977adcb12e103"
]


def check_signatures_sighash_flag(decoded_tx):
if decoded_tx["tx_id"] in SIGHASH_FLAG_TRANSACTION_WHITELIST:
return

script_sig = decoded_tx["vin"][0]["script_sig"]
witnesses = []
if decoded_tx["segwit"]:
witnesses = decoded_tx["vtxinwit"][0]

flags = collect_sighash_flags(script_sig, witnesses)

if len(flags) == 0:
error = f"impossible to determine SIGHASH flag for transaction {decoded_tx['tx_id']}"
logger.debug(error)
raise SighashFlagError(error)

# first input must be signed with SIGHASH_ALL or SIGHASH_ALL|SIGHASH_ANYONECANPAY
authorized_flags = [b"\x01", b"\x81"]
for flag in flags:
if flag not in authorized_flags:
error = f"invalid SIGHASH flag for transaction {decoded_tx['tx_id']}"
logger.debug(error)
raise SighashFlagError(error)


def get_transaction_sources(decoded_tx):
sources = []
outputs_value = 0

for vin in decoded_tx["vin"][:]: # Loop through inputs.
for vin in decoded_tx["vin"]: # Loop through inputs.
vout_value, script_pubkey, _is_segwit = get_vin_info(vin)

outputs_value += vout_value
Expand Down Expand Up @@ -394,6 +501,8 @@ def get_tx_info_new(db, decoded_tx, block_index, p2sh_is_segwit=False, composing
# Collect all (unique) source addresses.
# if we haven't found them yet
if p2sh_encoding_source is None:
if not composing:
check_signatures_sighash_flag(decoded_tx)
sources, outputs_value = get_transaction_sources(decoded_tx)
if not fee_added:
fee += outputs_value
Expand Down Expand Up @@ -524,7 +633,7 @@ def get_tx_info_legacy(decoded_tx, block_index):
return source, destination, btc_amount, fee, data, []


def _get_tx_info(db, decoded_tx, block_index, p2sh_is_segwit=False):
def _get_tx_info(db, decoded_tx, block_index, p2sh_is_segwit=False, composing=False):
"""Get the transaction info. Calls one of two subfunctions depending on signature type."""
if not block_index:
block_index = util.CURRENT_BLOCK_INDEX
Expand All @@ -535,12 +644,14 @@ def _get_tx_info(db, decoded_tx, block_index, p2sh_is_segwit=False):
decoded_tx,
block_index,
p2sh_is_segwit=p2sh_is_segwit,
composing=composing,
)
elif util.enabled("multisig_addresses", block_index=block_index): # Protocol change.
return get_tx_info_new(
db,
decoded_tx,
block_index,
composing=composing,
)
else:
return get_tx_info_legacy(decoded_tx, block_index)
Expand Down Expand Up @@ -604,7 +715,7 @@ def get_utxos_info(db, decoded_tx):
]


def get_tx_info(db, decoded_tx, block_index):
def get_tx_info(db, decoded_tx, block_index, composing=False):
"""Get the transaction info. Returns normalized None data for DecodeError and BTCOnlyError."""
if util.enabled("utxo_support", block_index=block_index):
# utxos_info is a space-separated list of UTXOs, last element is the destination,
Expand All @@ -618,7 +729,7 @@ def get_tx_info(db, decoded_tx, block_index):
utxos_info = []
try:
source, destination, btc_amount, fee, data, dispensers_outs = _get_tx_info(
db, decoded_tx, block_index
db, decoded_tx, block_index, composing=composing
)
return source, destination, btc_amount, fee, data, dispensers_outs, utxos_info
except DecodeError as e: # noqa: F841
Expand Down
12 changes: 8 additions & 4 deletions counterparty-core/counterpartycore/lib/message_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,12 @@
return "unknown"

if message_type_id == messages.utxo.ID:
message_data = messages.utxo.unpack(message, return_dict=True)
if util.is_utxo_format(message_data["source"]):
return "detach"
return "attach"
try:
message_data = messages.utxo.unpack(message, return_dict=True)
if util.is_utxo_format(message_data["source"]):
return "detach"
return "attach"
except Exception:

Check warning

Code scanning / pylint

Catching too general exception Exception. Warning

Catching too general exception Exception.
return "unknown"

return TRANSACTION_TYPE_BY_ID.get(message_type_id, "unknown")
Loading
Loading