Files
2024-06-15 11:27:29 +08:00

176 lines
6.8 KiB
Python

"""
Script for Processing Smart Contract Transactions
This script processes smart contract transactions from a specified input folder.
It reads each transaction file, builds a Transactional Abstract Control Flow Graph (TAC EFG),
and exports the related facts based on a set of target opcodes. The script maintains a log
of any errors encountered during processing and saves the progress to continue from the last
processed file in case of interruption. The error log and progress are saved in CSV and text
files, respectively.
To run the script, use the command: `python3 analyze3.py`
Author: Yangyi Zou
Date: May 19 2024
Usage: python3 analyze3.py
"""
import time
import io
import os
import subprocess
import argparse
import logging
import sys
from os.path import abspath, dirname, join
import src.exporter as exporter
import src.tac_efg_lab as tac_efg
import src.settings as settings
import csv, threading, queue, time
import pdb,traceback
import psycopg2.pool
from KEY import *
import threading
import queue
from tqdm import tqdm
# 设置目标操作码
target_opcodes = ['CREATE', 'BALANCE', 'CALLER', 'CALLVALUE', 'STOP', 'RETURN', 'REVERT', 'ORIGIN', 'CALLDATALOAD', 'EQ', 'TIMESTAMP', 'NUMBER', 'DIFFICULTY', 'COINBASE', 'BLOCKHASH', 'GASLIMIT', 'EXTCODESIZE', 'SELFDESTRUCT', 'JUMPI', 'JUMP', 'JUMPDEST', 'SSTORE', 'SLOAD', 'CALL', 'DELEGATE', 'CALLCODE', 'STATICCALL']
# TODO 1: Initialize database connection pool, error logging, and progress tracking
def create_connection_pool():
return psycopg2.pool.ThreadedConnectionPool(1, 10, user=USER, password=PASSWORD, host=HOST, port=PORT, database=DATABASE)
conn_pool = create_connection_pool()
def update_progress(tx_hash, conn_pool):
with conn_pool.getconn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM framework_progress")
cursor.execute("INSERT INTO framework_progress (last_processed_tx_hash) VALUES (%s)", (tx_hash,))
conn.commit()
conn_pool.putconn(conn)
def log_error(tx_hash, error, conn_pool):
"""Logs an error for a transaction in the database"""
with conn_pool.getconn() as conn:
with conn.cursor() as cursor:
cursor.execute("INSERT INTO error_log (tx_hash, error_message) VALUES (%s, %s)", (tx_hash, error))
conn_pool.putconn(conn)
# TODO 2: Fetch transaction data
def get_last_processed_tx_hash(conn_pool):
"""获取上次处理的最后一个事务哈希。"""
with conn_pool.getconn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT last_processed_tx_hash FROM framework_progress LIMIT 1")
result = cursor.fetchone()
conn_pool.putconn(conn)
return result[0] if result else None
def fetch_transactions(conn_pool,start_after_hash=None,limit=200000):
"""Fetches transactions from the database."""
with conn_pool.getconn() as conn:
with conn.cursor() as cursor:
if start_after_hash:
cursor.execute("SELECT * FROM tx_rtl_part_b_dedup WHERE tx_hash > %s ORDER BY tx_hash ASC LIMIT %s", (start_after_hash,limit))
else:
cursor.execute("SELECT tx_hash, rtl_dedup, to_address FROM tx_rtl_part_b_dedup ORDER BY tx_hash ASC LIMIT %s", (limit,))
transactions = cursor.fetchall()
conn_pool.putconn(conn)
return transactions
def setup_directories(tx_hash):
"""Sets up directories for output based on the transaction hash."""
output_folder = os.path.join('./facts_test', f'fact_{tx_hash}')
if not os.path.exists(output_folder):
os.makedirs(output_folder)
return output_folder
# TODO 3: Process transactions
def process_transaction(tx_hash, rtl, to_address, output_folder, conn_pool):
"""Processes a transaction and exports the facts."""
start_time = time.time()
try:
rtl_file_like = io.StringIO(rtl)
efg = tac_efg.TACGraph.from_opcode(rtl_file_like)
# print(f"this is the efg content,{efg}")
# 假设 export 操作是耗时操作,需要放在超时检测中
if time.time() - start_time < timeout_seconds:
exporter.EFGTsvExporter(efg).export(output_dir=output_folder, out_opcodes=target_opcodes)
facts_file_path = os.path.join(output_folder, 'sc_addr.facts')
with open(os.path.join(output_folder, 'sc_addr.facts'), 'w') as f:
f.write(to_address)
else:
raise TimeoutError("Processing exceeded time limit.")
processing_time = time.time() - start_time
print(f"Processed {tx_hash} in {processing_time:.2f} seconds")
update_progress(tx_hash, conn_pool)
except TimeoutError as te:
log_error(tx_hash, str(te), conn_pool)
except Exception as e:
logging.error(f"Error processing {tx_hash}: {e}", exc_info=True)
log_error(tx_hash, str(e), conn_pool)
traceback.print_exc()
# TODO 5: Setup and start threads
def start_processing_thread(conn_pool, num_threads=1):
"""Starts a pool of threads to process transactions."""
threads = []
for _ in range(num_threads):
thread = threading.Thread(target=file_processing_worker, args=(conn_pool, pbar))
thread.start()
threads.append(thread)
return threads
# Ensure global constants are defined
timeout_seconds = 40 # Example: 30 seconds timeout for processing each transaction
# TODO 6: Main execution block
if __name__ == "__main__":
last_processed_hash = get_last_processed_tx_hash(conn_pool)
transactions = fetch_transactions(conn_pool, start_after_hash=last_processed_hash)
file_queue = queue.Queue()
# Prepare tqdm progress bar
pbar = tqdm(total=len(transactions), desc='Processing Transactions', unit='tx')
# Update your file_processing_worker function to update tqdm progress upon each task completion
# TODO 4: Set threading and parallel processing mechanisms
def file_processing_worker(conn_pool, pbar):
"""Thread worker function that processes files from the queue."""
while True:
tx_data = file_queue.get()
if tx_data is None: # Correctly handle the queue's end signal
file_queue.task_done()
break
tx_hash, rtl, to_address = tx_data
print(f"Now processing {tx_hash}")
output_folder = setup_directories(tx_hash)
process_transaction(tx_hash, rtl, to_address, output_folder, conn_pool)
file_queue.task_done()
pbar.update(1)
# Enqueue transactions
for tx in transactions:
file_queue.put(tx)
# Start threads
threads = start_processing_thread(conn_pool, num_threads=1)
# Wait for all items in the queue to be processed
file_queue.join()
# Stop all threads
for _ in range(len(threads)):
file_queue.put(None)
for thread in threads:
thread.join()
print("Processing complete.")