Spaces:
Runtime error
Runtime error
| import json | |
| import re | |
| import os | |
| def hex_to_decimal(matched): | |
| return str(int(matched.group(), 16)) | |
| def normalize(asm): | |
| asm = asm.strip().split('\n')[: 257] | |
| asm_lst = [] | |
| addr2label = {} | |
| func_cnt, label_cnt = 0, 0 | |
| for i, line in enumerate(asm): | |
| if line.strip() == '' or 'file format elf64-x86-64' in line: | |
| continue | |
| if line.startswith('Disassembly of section'): | |
| continue | |
| if len(line.split('\t')) == 1 and line.endswith(':'): | |
| func = line[line.index('<') + 1 : line.index('>')] | |
| asm_lst.append([f'<func{func_cnt}>:']) | |
| func_cnt += 1 | |
| else: | |
| if len(line.split('\t')) < 2: | |
| print(line) | |
| label_cnt += 1 | |
| addr, content = line.split('\t', 1) | |
| addr = addr[: -1] | |
| addr2label[addr] = f'<label-{label_cnt}>' | |
| asm_lst.append( | |
| [content.strip(), f'<label-{label_cnt}>'] | |
| ) | |
| new_asm = '' | |
| for i, item in enumerate(asm_lst): | |
| if len(item) == 1: | |
| new_asm += '\n' + item[0] | |
| continue | |
| content, label = item | |
| if '<' in content and '>' in content: | |
| content = content[: content.index('<')].strip() | |
| if content.startswith('j') or content.startswith('loop') or content.startswith('call'): | |
| if len(content.split()) == 2: | |
| inst, addr = content.split() | |
| if addr.startswith('0x'): | |
| addr = addr[2:] | |
| if addr not in addr2label: | |
| content = inst + '\t' + '<unk>' | |
| else: | |
| content = inst + '\t' + addr2label[addr] | |
| content = re.sub(r"0x([0-9A-Fa-f]+)", hex_to_decimal, content) | |
| content = content.replace('%', '') | |
| content = re.sub(r"([,(])|([),])", r' \1\2 ', content) | |
| content = re.sub(r' +', ' ', content).strip() | |
| new_asm += '\n' + content + '\t' + label | |
| return new_asm | |
| def normalize_anghabench(): | |
| wp = open(f'anghabench/anghabench-normalize.jsonl', 'w') | |
| fail = 0 | |
| with open(f'anghabench/anghabench.jsonl', 'r') as fp: | |
| L = fp.readlines() | |
| for i, line in enumerate(L): | |
| try: | |
| item = json.loads(line) | |
| for opt in item['output']: | |
| item['output'][opt] = normalize(item['output'][opt]) | |
| except Exception as e: | |
| fail += 1 | |
| continue | |
| wp.write(json.dumps(item) + '\n') | |
| if i % 1000 == 0: | |
| print(f"{i}/{len(L)}, fail: {fail}") | |
| def normalize_the_stack(): | |
| wp = open('the-stack/the-stack-normalize.jsonl', 'w') | |
| fail = 0 | |
| with open('the-stack/the-stack.jsonl', 'r') as fp: | |
| L = fp.readlines() | |
| for i, line in enumerate(L): | |
| if i % 1000 == 0: | |
| print(f"{i}/{len(L)}, fail: {fail}") | |
| try: | |
| item = json.loads(line) | |
| for opt in item['output']: | |
| item['output'][opt] = normalize(item['output'][opt]).strip() | |
| except Exception as e: | |
| fail += 1 | |
| print(e) | |
| continue | |
| wp.write(json.dumps(item) + '\n') | |
| def normalize_codeart(): | |
| for file in os.listdir('codeart/'): | |
| L = [] | |
| with open(f'codeart/{file}', 'r') as fp: | |
| for l in fp.readlines(): | |
| item = json.loads(l.strip()) | |
| item['normalized_asm'] = normalize(item['asm']) | |
| L.append(item) | |
| with open(f'codeart/{file}', 'w') as wp: | |
| for l in L: | |
| wp.write(json.dumps(l) + '\n') | |
| def normalize_binarycorp(binary_corp_folder): | |
| data = {} | |
| for file in os.listdir(binary_corp_folder): | |
| if '-O0-' in file: | |
| proj = file[: file.index('-O0-')] | |
| opt = 'O0' | |
| elif '-O1-' in file: | |
| proj = file[: file.index('-O1-')] | |
| opt = 'O1' | |
| elif '-O3-' in file: | |
| proj = file[: file.index('-O3-')] | |
| opt = 'O3' | |
| else: | |
| continue | |
| if proj not in data: | |
| data[proj] = {} | |
| content = json.load(open(f'{binary_corp_folder}/{file}', 'r')) | |
| for k, v in content.items(): | |
| func = v['name'] | |
| asm = v['assembly'] | |
| if func not in data[proj]: | |
| data[proj][func] = {} | |
| data[proj][func][opt] = normalize(asm) | |
| print(len(data)) | |
| data_filter = {} | |
| for proj in data: | |
| data_filter[proj] = {} | |
| for func in data[proj]: | |
| if len(data[proj][func]) < 2 or 'O3' not in data[proj][func]: | |
| continue | |
| data_filter[proj][func] = data[proj][func] | |
| if len(data_filter[proj]) == 0: | |
| data_filter.pop(proj) | |
| json.dump(data_filter, open('binarycorp/binarycorp.json', 'w'), indent=2) | |
| if __name__ == '__main__': | |
| # training data | |
| normalize_the_stack() | |
| normalize_anghabench() | |
| # fine-tuning data | |
| # download BinaryCorp small_train.tar from https://cloud.vul337.team:8443/s/cxnH8DfZTADLKCs | |
| # binary_corp_folder = '' | |
| # normalize_binarycorp(binary_corp_folder) | |
| # evaluation data | |
| # normalize_codeart() |