Files
txspector-new/analyze_backup.py
2024-06-15 11:27:29 +08:00

125 lines
4.7 KiB
Python

import os
import subprocess
import argparse
import logging
import requests, json, time
import sys
from os.path import abspath, dirname, join
import src.exporter as exporter
import src.tac_efg as tac_efg
import src.settings as settings
def start_logo():
print("="*145)
print("{:^7}|{:^69}|{:^46}|{:^19}|".format("Line", "Transaction Hash", "Receiver contract hash","Time"))
print("{:^7}|{:^68}|{:^45}|{:^18}|".format("Number", "\U0001F4B0", "\U0001F4B5", "\U000023F0"))
print("-"*145)
def line_show(lineNumber:int, transaction_hash:str, receiver_hash, exec_time: str):
print("{:^7}|{:^69}|{:^46}|{:^19}|".format(lineNumber, transaction_hash, receiver_hash, exec_time))
def end_logo():
print("="*141)
def format_time(seconds):
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
seconds = round(seconds,3)
if days > 0:
return f"{int(days)}d {int(hours)}h {int(minutes)}m {seconds}s"
elif hours > 0:
return f"{int(hours)}h {int(minutes)}m {seconds}s"
elif minutes > 0:
return f"{int(minutes)}m, {seconds}s"
else:
return f"{seconds} seconds"
def debug_trace_receiver(tx_hash):
"""
Get the receiver contract hash
"""
URL = "http://localhost:8545"
headers = {"Content-Type": "application/json"}
data = {
"jsonrpc": "2.0",
"method": "eth_getTransactionByHash",
"params": [tx_hash],
"id": 1
}
response = requests.post(URL, headers=headers, data=json.dumps(data))
result = response.json()
try:
to_address = result.get('result',{}).get('to','No address found')
except Exception as e:
print(f"show the error: {e}")
return to_address # Return the 'to' address as a string
if __name__ == "__main__":
# Set up logger, with appropriate log level depending on verbosity.
log_level = logging.WARNING
logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level)
target_opcodes = ['CREATE', 'BALANCE', 'CALLER', 'CALLVALUE', 'STOP', 'RETURN', 'REVERT', 'ORIGIN', 'CALLDATALOAD',
'EQ', 'TIMESTAMP', 'NUMBER', 'DIFFICULTY', 'COINBASE', 'BLOCKHASH', 'GASLIMIT', 'EXTCODESIZE',
'SELFDESTRUCT', 'JUMPI', 'JUMP', 'JUMPDEST', 'SSTORE', 'SLOAD', 'CALL', 'DELEGATE', 'CALLCODE',
'STATICCALL']
# Set the path to your input folder and the path to the analyze_geth.sh script
input_folder = './Reenter_OPCODE'
script_path = './bin/analyze_geth.sh'
start_logo()
number = 0
# Iterate over all files in the input folder
for filename in os.listdir(input_folder):
start_time = time.time()
number += 1
if filename.endswith('.txt'): # Check if the file is a .txt file
input_file = os.path.join(input_folder, filename)
trans_hash = filename[:-4]
output_folder = os.path.join('./facts/fact_' + trans_hash) # Removing '.txt' and appending 'fact_'
# Create the output folder if it doesn't exist
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# Build TAC EFG from input file
try:
logging.info("Reading from '%s'.", input_file)
with open(input_file, "r") as f:
# print(f" \033[92m{number}\033[0m {trans_hash}",end=" ")
efg = tac_efg.TACGraph.from_opcode(f)
logging.info("Initial EFG generation completed.")
# Catch a Control-C and exit with UNIX failure status 1
except KeyboardInterrupt:
print(f"\n\nUser interrupted processing at transaction hash: {trans_hash[:-4]}")
print("Exiting program.")
sys.exit(1)
# Generate facts file
exporter.EFGTsvExporter(efg).export(output_dir=output_folder,
out_opcodes=target_opcodes)
# Generate the receiver hash sc_addr.facts
try:
to_address = debug_trace_receiver(trans_hash)
output_path = os.path.join(output_folder, "sc_addr.facts")
with open(output_path, "w") as output_file:
if to_address:
output_file.write(to_address) # Write the 'to' address directly
else:
output_file.write("No address found")
except subprocess.CalledProcessError as e:
print(f"Error processing {filename}: {e.stderr.decode()}")
end_time = time.time()
formatted_time = format_time(end_time - start_time)
line_show(number,trans_hash,to_address,formatted_time)
end_logo()