187 lines
6.8 KiB
Python
187 lines
6.8 KiB
Python
"""
|
|
Script for Processing Smart Contract Transactions
|
|
|
|
This script processes smart contract transactions from a specified input folder.
|
|
It reads each transaction file, builds a Transactional Abstract Control Flow Graph (TAC EFG),
|
|
and exports the related facts based on a set of target opcodes. The script maintains a log
|
|
of any errors encountered during processing and saves the progress to continue from the last
|
|
processed file in case of interruption. The error log and progress are saved in CSV and text
|
|
files, respectively.
|
|
|
|
To run the script, use the command: `python3 analyze3.py`
|
|
|
|
|
|
Author: Yangyi Zou
|
|
Date: Dec 2023
|
|
Usage: python3 analyze3.py
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
from os.path import abspath, dirname, join
|
|
import src.exporter as exporter
|
|
import src.tac_efg as tac_efg
|
|
import src.settings as settings
|
|
import csv, threading, queue, time
|
|
import pdb,traceback
|
|
|
|
# 初始化错误日志和进度跟踪
|
|
INTERMIDIATE_FOLDER = 'intermediate'
|
|
LOGGING_FOLDER = 'logging'
|
|
if not os.path.exists(LOGGING_FOLDER):
|
|
os.makedirs(LOGGING_FOLDER)
|
|
if not os.path.exists(INTERMIDIATE_FOLDER):
|
|
os.makedirs(INTERMIDIATE_FOLDER)
|
|
|
|
progress_file_path = os.path.join(INTERMIDIATE_FOLDER,'progress_test.txt')
|
|
error_log_file_path = os.path.join(INTERMIDIATE_FOLDER,'error_log_test.csv')
|
|
|
|
progress_file = open(progress_file_path, 'a') if os.path.exists(progress_file_path) else open(progress_file_path, 'w')
|
|
|
|
error_log_file_mode = 'a' if os.path.exists(error_log_file_path) else 'w'
|
|
error_log_file = open(error_log_file_path, error_log_file_mode, newline='')
|
|
|
|
error_writer = csv.writer(error_log_file)
|
|
if error_log_file_mode == 'w':
|
|
error_writer.writerow(['Transaction', 'Error'])
|
|
|
|
def log_error(transaction, error):
|
|
"""记录错误到 CSV 文件"""
|
|
error_writer.writerow([transaction[:-4], str(error)])
|
|
error_log_file.flush()
|
|
|
|
def setup_file_logger(filename):
|
|
"""为每个文件设置独立的日志处理器"""
|
|
log_filename = os.path.join(LOGGING_FOLDER, filename[:-4] + '_error.log')
|
|
file_handler = logging.FileHandler(log_filename, mode='w') # 使用写入模式,覆盖旧日志
|
|
formatter = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
|
|
file_handler.setFormatter(formatter)
|
|
# 获取根日志器并添加新的文件处理器
|
|
|
|
logger = logging.getLogger()
|
|
logger.addHandler(file_handler)
|
|
# 返回文件处理器以便之后移除
|
|
return file_handler
|
|
|
|
def remove_file_logger(file_handler):
|
|
"""移除指定的文件日志处理器"""
|
|
logger = logging.getLogger()
|
|
logger.removeHandler(file_handler)
|
|
file_handler.close()
|
|
|
|
def update_progress(transaction):
|
|
"""更新进度文件"""
|
|
with open(progress_file_path, 'w') as progress_file:
|
|
progress_file.write(transaction)
|
|
|
|
# 设置超时时间(以秒为单位)
|
|
timeout_seconds = 30 # 示例:2分钟
|
|
|
|
def process_file(filename, line_number):
|
|
# pdb.set_trace()
|
|
# 为当前文件设置日志处理器
|
|
file_handler = setup_file_logger(filename)
|
|
|
|
"""处理单个文件的函数,同时显示行号"""
|
|
start_time = time.time()
|
|
try:
|
|
input_file = os.path.join(input_folder, filename)
|
|
output_folder = os.path.join('./facts/fact_' + filename[:-4])
|
|
if not os.path.exists(output_folder):
|
|
os.makedirs(output_folder)
|
|
|
|
with open(input_file, "r") as f:
|
|
efg = tac_efg.TACGraph.from_opcode(f)
|
|
|
|
# 假设 export 操作是耗时操作,需要放在超时检测中
|
|
if time.time() - start_time < timeout_seconds:
|
|
exporter.EFGTsvExporter(efg).export(output_dir=output_folder, out_opcodes=target_opcodes)
|
|
else:
|
|
raise TimeoutError("Processing exceeded time limit.")
|
|
|
|
# 计算处理时间并打印信息
|
|
processing_time = time.time() - start_time
|
|
print(f"完成处理文件 Processed {filename} in {processing_time:.2f} seconds")
|
|
|
|
update_progress(filename)
|
|
|
|
except TimeoutError as te:
|
|
log_error(filename, str(te))
|
|
print(f"Line {line_number}: {filename} skipped due to timeout.")
|
|
except Exception as e:
|
|
logging.error(f"Error processing file {filename}: {e}", exc_info=True)
|
|
log_error(filename, str(e))
|
|
|
|
print(f"Exception type: {type(e)}")
|
|
print(f"Exception message: {e}")
|
|
traceback.print_exc() # 这需要先导入 traceback 模块
|
|
remove_file_logger(file_handler)
|
|
|
|
# 修改 file_processing_worker 函数,以传递行号到 process_file
|
|
def file_processing_worker():
|
|
"""线程工作函数,处理文件队列中的文件"""
|
|
line_number = 0
|
|
while True:
|
|
filename = file_queue.get()
|
|
print("------------------------------------------------------------------------------------------------------")
|
|
print(f"Line number:{line_number} Processing file: {filename}")
|
|
if filename is None:
|
|
break
|
|
line_number += 1
|
|
process_file(filename, line_number)
|
|
file_queue.task_done()
|
|
|
|
# 设置日志等级
|
|
# logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level)
|
|
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s:%(name)s:%(message)s')
|
|
|
|
# 设置目标操作码
|
|
target_opcodes = ['CREATE', 'BALANCE', 'CALLER', 'CALLVALUE', 'STOP', 'RETURN', 'REVERT', 'ORIGIN', 'CALLDATALOAD', 'EQ', 'TIMESTAMP', 'NUMBER', 'DIFFICULTY', 'COINBASE', 'BLOCKHASH', 'GASLIMIT', 'EXTCODESIZE', 'SELFDESTRUCT', 'JUMPI', 'JUMP', 'JUMPDEST', 'SSTORE', 'SLOAD', 'CALL', 'DELEGATE', 'CALLCODE', 'STATICCALL']
|
|
|
|
|
|
#####################################################
|
|
#####################################################
|
|
# 设置输入文件夹路径
|
|
# input_folder = "/home/yangyi/database/TxSpector/detector/test" # For test debug
|
|
# input_folder = "test" # For test debug
|
|
input_folder = "/home/yangyi/database/smart_contract-dataset/Tx_tuples"
|
|
|
|
# 设置线程数和文件队列
|
|
num_threads = 1
|
|
file_queue = queue.Queue()
|
|
#####################################################
|
|
#####################################################
|
|
|
|
|
|
# 创建并启动线程
|
|
threads = []
|
|
for _ in range(num_threads):
|
|
thread = threading.Thread(target=file_processing_worker)
|
|
thread.start()
|
|
threads.append(thread)
|
|
|
|
# 读取上次处理的进度
|
|
last_processed = None
|
|
if os.path.exists(progress_file_path):
|
|
with open(progress_file_path, 'r') as progress_file:
|
|
last_processed = progress_file.read().strip()
|
|
|
|
# 将文件添加到队列(在启动线程之后)
|
|
for filename in sorted(os.listdir(input_folder)):
|
|
if filename.endswith('.txt') and (not last_processed or filename > last_processed):
|
|
file_queue.put(filename)
|
|
|
|
# 等待队列清空
|
|
file_queue.join()
|
|
|
|
# 停止工作线程
|
|
for _ in range(num_threads):
|
|
file_queue.put(None)
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
print(" ==================------------- Processing complete -------------==================")
|
|
error_log_file.close() |