Files
txspector-new/analyze3_debug.py
2024-06-15 11:27:29 +08:00

187 lines
6.8 KiB
Python

"""
Script for Processing Smart Contract Transactions
This script processes smart contract transactions from a specified input folder.
It reads each transaction file, builds a Transactional Abstract Control Flow Graph (TAC EFG),
and exports the related facts based on a set of target opcodes. The script maintains a log
of any errors encountered during processing and saves the progress to continue from the last
processed file in case of interruption. The error log and progress are saved in CSV and text
files, respectively.
To run the script, use the command: `python3 analyze3.py`
Author: Yangyi Zou
Date: Dec 2023
Usage: python3 analyze3.py
"""
import os
import subprocess
import argparse
import logging
import sys
from os.path import abspath, dirname, join
import src.exporter as exporter
import src.tac_efg as tac_efg
import src.settings as settings
import csv, threading, queue, time
import pdb,traceback
# 初始化错误日志和进度跟踪
INTERMIDIATE_FOLDER = 'intermediate'
LOGGING_FOLDER = 'logging'
if not os.path.exists(LOGGING_FOLDER):
os.makedirs(LOGGING_FOLDER)
if not os.path.exists(INTERMIDIATE_FOLDER):
os.makedirs(INTERMIDIATE_FOLDER)
progress_file_path = os.path.join(INTERMIDIATE_FOLDER,'progress_test.txt')
error_log_file_path = os.path.join(INTERMIDIATE_FOLDER,'error_log_test.csv')
progress_file = open(progress_file_path, 'a') if os.path.exists(progress_file_path) else open(progress_file_path, 'w')
error_log_file_mode = 'a' if os.path.exists(error_log_file_path) else 'w'
error_log_file = open(error_log_file_path, error_log_file_mode, newline='')
error_writer = csv.writer(error_log_file)
if error_log_file_mode == 'w':
error_writer.writerow(['Transaction', 'Error'])
def log_error(transaction, error):
"""记录错误到 CSV 文件"""
error_writer.writerow([transaction[:-4], str(error)])
error_log_file.flush()
def setup_file_logger(filename):
"""为每个文件设置独立的日志处理器"""
log_filename = os.path.join(LOGGING_FOLDER, filename[:-4] + '_error.log')
file_handler = logging.FileHandler(log_filename, mode='w') # 使用写入模式,覆盖旧日志
formatter = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
file_handler.setFormatter(formatter)
# 获取根日志器并添加新的文件处理器
logger = logging.getLogger()
logger.addHandler(file_handler)
# 返回文件处理器以便之后移除
return file_handler
def remove_file_logger(file_handler):
"""移除指定的文件日志处理器"""
logger = logging.getLogger()
logger.removeHandler(file_handler)
file_handler.close()
def update_progress(transaction):
"""更新进度文件"""
with open(progress_file_path, 'w') as progress_file:
progress_file.write(transaction)
# 设置超时时间(以秒为单位)
timeout_seconds = 30 # 示例:2分钟
def process_file(filename, line_number):
# pdb.set_trace()
# 为当前文件设置日志处理器
file_handler = setup_file_logger(filename)
"""处理单个文件的函数,同时显示行号"""
start_time = time.time()
try:
input_file = os.path.join(input_folder, filename)
output_folder = os.path.join('./facts/fact_' + filename[:-4])
if not os.path.exists(output_folder):
os.makedirs(output_folder)
with open(input_file, "r") as f:
efg = tac_efg.TACGraph.from_opcode(f)
# 假设 export 操作是耗时操作,需要放在超时检测中
if time.time() - start_time < timeout_seconds:
exporter.EFGTsvExporter(efg).export(output_dir=output_folder, out_opcodes=target_opcodes)
else:
raise TimeoutError("Processing exceeded time limit.")
# 计算处理时间并打印信息
processing_time = time.time() - start_time
print(f"完成处理文件 Processed {filename} in {processing_time:.2f} seconds")
update_progress(filename)
except TimeoutError as te:
log_error(filename, str(te))
print(f"Line {line_number}: {filename} skipped due to timeout.")
except Exception as e:
logging.error(f"Error processing file {filename}: {e}", exc_info=True)
log_error(filename, str(e))
print(f"Exception type: {type(e)}")
print(f"Exception message: {e}")
traceback.print_exc() # 这需要先导入 traceback 模块
remove_file_logger(file_handler)
# 修改 file_processing_worker 函数,以传递行号到 process_file
def file_processing_worker():
"""线程工作函数,处理文件队列中的文件"""
line_number = 0
while True:
filename = file_queue.get()
print("------------------------------------------------------------------------------------------------------")
print(f"Line number:{line_number} Processing file: {filename}")
if filename is None:
break
line_number += 1
process_file(filename, line_number)
file_queue.task_done()
# 设置日志等级
# logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level)
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s:%(name)s:%(message)s')
# 设置目标操作码
target_opcodes = ['CREATE', 'BALANCE', 'CALLER', 'CALLVALUE', 'STOP', 'RETURN', 'REVERT', 'ORIGIN', 'CALLDATALOAD', 'EQ', 'TIMESTAMP', 'NUMBER', 'DIFFICULTY', 'COINBASE', 'BLOCKHASH', 'GASLIMIT', 'EXTCODESIZE', 'SELFDESTRUCT', 'JUMPI', 'JUMP', 'JUMPDEST', 'SSTORE', 'SLOAD', 'CALL', 'DELEGATE', 'CALLCODE', 'STATICCALL']
#####################################################
#####################################################
# 设置输入文件夹路径
# input_folder = "/home/yangyi/database/TxSpector/detector/test" # For test debug
# input_folder = "test" # For test debug
input_folder = "/home/yangyi/database/smart_contract-dataset/Tx_tuples"
# 设置线程数和文件队列
num_threads = 1
file_queue = queue.Queue()
#####################################################
#####################################################
# 创建并启动线程
threads = []
for _ in range(num_threads):
thread = threading.Thread(target=file_processing_worker)
thread.start()
threads.append(thread)
# 读取上次处理的进度
last_processed = None
if os.path.exists(progress_file_path):
with open(progress_file_path, 'r') as progress_file:
last_processed = progress_file.read().strip()
# 将文件添加到队列(在启动线程之后)
for filename in sorted(os.listdir(input_folder)):
if filename.endswith('.txt') and (not last_processed or filename > last_processed):
file_queue.put(filename)
# 等待队列清空
file_queue.join()
# 停止工作线程
for _ in range(num_threads):
file_queue.put(None)
for thread in threads:
thread.join()
print(" ==================------------- Processing complete -------------==================")
error_log_file.close()