176 lines
6.8 KiB
Python
176 lines
6.8 KiB
Python
"""
|
|
Script for Processing Smart Contract Transactions
|
|
|
|
This script processes smart contract transactions from a specified input folder.
|
|
It reads each transaction file, builds a Transactional Abstract Control Flow Graph (TAC EFG),
|
|
and exports the related facts based on a set of target opcodes. The script maintains a log
|
|
of any errors encountered during processing and saves the progress to continue from the last
|
|
processed file in case of interruption. The error log and progress are saved in CSV and text
|
|
files, respectively.
|
|
|
|
To run the script, use the command: `python3 analyze3.py`
|
|
|
|
|
|
Author: Yangyi Zou
|
|
Date: May 19 2024
|
|
Usage: python3 analyze3.py
|
|
"""
|
|
import time
|
|
import io
|
|
import os
|
|
import subprocess
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
from os.path import abspath, dirname, join
|
|
import src.exporter as exporter
|
|
import src.tac_efg_lab as tac_efg
|
|
import src.settings as settings
|
|
import csv, threading, queue, time
|
|
import pdb,traceback
|
|
import psycopg2.pool
|
|
from KEY import *
|
|
import threading
|
|
import queue
|
|
from tqdm import tqdm
|
|
|
|
# 设置目标操作码
|
|
target_opcodes = ['CREATE', 'BALANCE', 'CALLER', 'CALLVALUE', 'STOP', 'RETURN', 'REVERT', 'ORIGIN', 'CALLDATALOAD', 'EQ', 'TIMESTAMP', 'NUMBER', 'DIFFICULTY', 'COINBASE', 'BLOCKHASH', 'GASLIMIT', 'EXTCODESIZE', 'SELFDESTRUCT', 'JUMPI', 'JUMP', 'JUMPDEST', 'SSTORE', 'SLOAD', 'CALL', 'DELEGATE', 'CALLCODE', 'STATICCALL']
|
|
|
|
# TODO 1: Initialize database connection pool, error logging, and progress tracking
|
|
def create_connection_pool():
|
|
return psycopg2.pool.ThreadedConnectionPool(1, 10, user=USER, password=PASSWORD, host=HOST, port=PORT, database=DATABASE)
|
|
conn_pool = create_connection_pool()
|
|
|
|
def update_progress(tx_hash, conn_pool):
|
|
with conn_pool.getconn() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute("DELETE FROM framework_progress")
|
|
cursor.execute("INSERT INTO framework_progress (last_processed_tx_hash) VALUES (%s)", (tx_hash,))
|
|
conn.commit()
|
|
conn_pool.putconn(conn)
|
|
|
|
def log_error(tx_hash, error, conn_pool):
|
|
"""Logs an error for a transaction in the database"""
|
|
with conn_pool.getconn() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute("INSERT INTO error_log (tx_hash, error_message) VALUES (%s, %s)", (tx_hash, error))
|
|
conn_pool.putconn(conn)
|
|
|
|
|
|
|
|
# TODO 2: Fetch transaction data
|
|
def get_last_processed_tx_hash(conn_pool):
|
|
"""获取上次处理的最后一个事务哈希。"""
|
|
with conn_pool.getconn() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute("SELECT last_processed_tx_hash FROM framework_progress LIMIT 1")
|
|
result = cursor.fetchone()
|
|
conn_pool.putconn(conn)
|
|
return result[0] if result else None
|
|
|
|
def fetch_transactions(conn_pool,start_after_hash=None,limit=200000):
|
|
"""Fetches transactions from the database."""
|
|
with conn_pool.getconn() as conn:
|
|
with conn.cursor() as cursor:
|
|
if start_after_hash:
|
|
cursor.execute("SELECT * FROM tx_rtl_part_b_dedup WHERE tx_hash > %s ORDER BY tx_hash ASC LIMIT %s", (start_after_hash,limit))
|
|
else:
|
|
cursor.execute("SELECT tx_hash, rtl_dedup, to_address FROM tx_rtl_part_b_dedup ORDER BY tx_hash ASC LIMIT %s", (limit,))
|
|
transactions = cursor.fetchall()
|
|
conn_pool.putconn(conn)
|
|
return transactions
|
|
|
|
def setup_directories(tx_hash):
|
|
"""Sets up directories for output based on the transaction hash."""
|
|
output_folder = os.path.join('./facts_test', f'fact_{tx_hash}')
|
|
if not os.path.exists(output_folder):
|
|
os.makedirs(output_folder)
|
|
return output_folder
|
|
|
|
# TODO 3: Process transactions
|
|
def process_transaction(tx_hash, rtl, to_address, output_folder, conn_pool):
|
|
"""Processes a transaction and exports the facts."""
|
|
start_time = time.time()
|
|
try:
|
|
rtl_file_like = io.StringIO(rtl)
|
|
efg = tac_efg.TACGraph.from_opcode(rtl_file_like)
|
|
# print(f"this is the efg content,{efg}")
|
|
# 假设 export 操作是耗时操作,需要放在超时检测中
|
|
if time.time() - start_time < timeout_seconds:
|
|
exporter.EFGTsvExporter(efg).export(output_dir=output_folder, out_opcodes=target_opcodes)
|
|
facts_file_path = os.path.join(output_folder, 'sc_addr.facts')
|
|
with open(os.path.join(output_folder, 'sc_addr.facts'), 'w') as f:
|
|
f.write(to_address)
|
|
|
|
else:
|
|
|
|
raise TimeoutError("Processing exceeded time limit.")
|
|
processing_time = time.time() - start_time
|
|
print(f"Processed {tx_hash} in {processing_time:.2f} seconds")
|
|
update_progress(tx_hash, conn_pool)
|
|
|
|
except TimeoutError as te:
|
|
log_error(tx_hash, str(te), conn_pool)
|
|
except Exception as e:
|
|
logging.error(f"Error processing {tx_hash}: {e}", exc_info=True)
|
|
log_error(tx_hash, str(e), conn_pool)
|
|
traceback.print_exc()
|
|
|
|
# TODO 5: Setup and start threads
|
|
def start_processing_thread(conn_pool, num_threads=1):
|
|
"""Starts a pool of threads to process transactions."""
|
|
threads = []
|
|
for _ in range(num_threads):
|
|
thread = threading.Thread(target=file_processing_worker, args=(conn_pool, pbar))
|
|
thread.start()
|
|
threads.append(thread)
|
|
return threads
|
|
|
|
# Ensure global constants are defined
|
|
timeout_seconds = 40 # Example: 30 seconds timeout for processing each transaction
|
|
# TODO 6: Main execution block
|
|
if __name__ == "__main__":
|
|
last_processed_hash = get_last_processed_tx_hash(conn_pool)
|
|
transactions = fetch_transactions(conn_pool, start_after_hash=last_processed_hash)
|
|
file_queue = queue.Queue()
|
|
|
|
# Prepare tqdm progress bar
|
|
pbar = tqdm(total=len(transactions), desc='Processing Transactions', unit='tx')
|
|
|
|
# Update your file_processing_worker function to update tqdm progress upon each task completion
|
|
# TODO 4: Set threading and parallel processing mechanisms
|
|
def file_processing_worker(conn_pool, pbar):
|
|
"""Thread worker function that processes files from the queue."""
|
|
while True:
|
|
tx_data = file_queue.get()
|
|
if tx_data is None: # Correctly handle the queue's end signal
|
|
file_queue.task_done()
|
|
break
|
|
tx_hash, rtl, to_address = tx_data
|
|
print(f"Now processing {tx_hash}")
|
|
output_folder = setup_directories(tx_hash)
|
|
process_transaction(tx_hash, rtl, to_address, output_folder, conn_pool)
|
|
file_queue.task_done()
|
|
pbar.update(1)
|
|
|
|
# Enqueue transactions
|
|
for tx in transactions:
|
|
file_queue.put(tx)
|
|
|
|
# Start threads
|
|
threads = start_processing_thread(conn_pool, num_threads=1)
|
|
|
|
# Wait for all items in the queue to be processed
|
|
file_queue.join()
|
|
|
|
# Stop all threads
|
|
for _ in range(len(threads)):
|
|
file_queue.put(None)
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
print("Processing complete.")
|
|
|
|
|