from jsonpath_ng.ext import parse import json import os import pandas as pd # 추가된 import import unicodedata # 상단에 import 추가 import re # 상단에 추가 from datetime import datetime import logging from logging_config import setup_logging # logging 설정을 위한 import import traceback import itertools import copy from script_utils import extract_and_format_scripts # 스크립트 추출 함수 import setup_logging() # logging 설정 호출 # JSON 파일 읽기 def read_json(file_path): with open(file_path, 'r', encoding='utf-8') as file: return json.load(file) # 요소 탐색 함수 def find_element(project_data, jsonpath_expr): """ 주어진 데이터(project_data)에서 jsonpath 표현식에 일치하는 모든 값들을 찾아 리스트로 반환합니다. """ parsed_expr = parse(jsonpath_expr) return [match.value for match in parsed_expr.find(project_data)] # 요소 탐색 함수 def find_script_element(project_data, jsonpath_expr): jsonpath_expr = parse(jsonpath_expr) match = jsonpath_expr.find(project_data) if not match: return None return match[0].value # jsonpath_expr_list 로 넘어온 jsonpath들을 하나씩 parse 해주고 결과를 result 리스트로 반환 def find_list_element(data, jsonpath_expr_list): """ 주어진 데이터(data)에서 여러 jsonpath 표현식들에 일치하는 값들을 찾아 결과를 리스트의 리스트 형태로 반환합니다. """ return [ [match.value for match in parse(expr).find(data)] for expr in jsonpath_expr_list ] # 스크립트 채점 진행 전 스크립트 블럭 순서가 when_run_button_click 1번째, when_clone_start 2번째 배열에 없으면 # 리스트 순서 스왑해서 각각 0, 1번 순서로 배치될 수 있도록 함 # when_scene_start 1번째, when_message_cast 2번째 동일한 순서로 리스트 순서 스왑해서 정렬 # when_scene_start = [장면이 시작되었을 때] # when_message_cast = ["성공"신호를 받았을 때] def swap_script(origin): """스크립트 블록 순서 정렬 함수""" if not origin or len(origin) == 0: return origin # 스크립트 블록 분류를 위한 임시 저장소 run_button_blocks = [] key_press_blocks = [] clone_start_block = None message_cast_block = [] other_blocks = [] # 각 블록을 타입별로 분류 for i, block in enumerate(origin): if not block or len(block) == 0: continue block_type = block[0].get("type") # # 기존 로직: run_button과 scene_start 처리 # if (block_type == "when_run_button_click" or block_type == "when_scene_start") and i > 0: # print(f"swap script run button or scene start") # swap = origin[0] # origin[0] = origin[i] # origin[i] = swap # # 기존 로직: clone_start와 message_cast 처리 # elif (block_type == "when_clone_start" or block_type == "when_message_cast") and i > 0: # print(f"swap script clone start or msg cast") # swap = origin[1] # origin[1] = origin[i] # origin[i] = swap # 새로운 로직: 키 이벤트 처리 if block_type == "when_run_button_click": run_button_blocks.append(block) elif block_type == "when_some_key_pressed": # params[1]에서 키 값 확인 (49, 50, 51) 아스키코드 값이므로 필요하면 추가 가능 key_value = block[0].get("params")[1] if key_value in ["32","49","50","51"]: key_press_blocks.append((key_value, block)) elif block_type == "when_clone_start": clone_start_block = block elif block_type == "when_message_cast": message_cast_block.append(block) else: other_blocks.append(block) # 결과 배열 재구성 result = [] # 1. when_run_button_click 블록을 첫 번째로 추가 if run_button_blocks: result.extend(run_button_blocks) # 2. when_some_key_pressed 블록들을 키 값 순서대로 정렬하여 추가 key_press_blocks.sort(key=lambda x: int(x[0]) if x[0] else 0) result.extend([block for _, block in key_press_blocks]) # 3. clone_start와 message_cast 블록 추가 if message_cast_block: result.extend(message_cast_block) if clone_start_block: result.append(clone_start_block) # 4. 나머지 블록 추가 result.extend(other_blocks) # 결과가 비어있으면 원본 반환 return result if result else origin def reorder_script_all_cases(script_json, block_event_order): """ script_json의 'type' 순서를 block_event_order 순서에 맞게 재정렬합니다. Args: script_json (list[dict]): 각 요소가 {'type': '...', ...} 형태의 리스트 block_event_order (list[str]): 원하는 type 순서 예: ['when_run_button_click', 'when_message_cast'] Returns: list[dict]: block_event_order 순서대로 재정렬된 리스트 """ # 타입별로 스크립트를 분류 type_map = {} for s in script_json: type_map.setdefault(s[0]["type"], []).append(s) results = [] def backtrack(order_idx, used_counts, current): # ✅ 모든 이벤트 순서를 처리했으면 결과에 추가 if order_idx == len(block_event_order): results.append(copy.deepcopy(current)) return event_type = block_event_order[order_idx] available_scripts = type_map.get(event_type, []) # 아직 사용하지 않은 script만 선택 for i, script in enumerate(available_scripts): if used_counts[event_type][i]: continue used_counts[event_type][i] = True current.append(script) backtrack(order_idx + 1, used_counts, current) current.pop() used_counts[event_type][i] = False # 사용 여부 초기화 used_counts = {t: [False] * len(lst) for t, lst in type_map.items()} backtrack(0, used_counts, []) return results def clean_string(text): """문자열 끝의 . 또는 ! 제거""" if isinstance(text, str): # 문자열 끝의 . 또는 ! 제거 return re.sub(r'', '', text.strip()) return text def convert_to_str(value): """값을 문자열로 변환""" if isinstance(value, (list, tuple)): return [convert_to_str(v) for v in value] return str(value) def process_project(project_data, scoring_data): total_points = 0 score_list = [] # - 시작하기 버튼을 클릭했을 때 : when_run_button_click # - (특정) 신호를 받았을 때 : when_message_cast # - 복제본이 생성 되었을 때 : when_clone_start # - 장면이 시작 되었을 때 : when_scene_start # - 오브젝트를 클릭 했을 때 : when_object_click # 이벤트 블록이 존재하는지 여부 확인용 변수 target_event_type = { "when_run_button_click", "when_message_cast", "when_clone_start", "when_scene_start", "when_object_click" } for question_key, question_info in scoring_data.items(): element_path = question_info.get('ele') question_type = question_info.get('type') block_list = question_info.get('blocks') expected_answer = question_info.get('answer') question_points = question_info.get('point') block_event_order = [] # 결과를 저장할 리스트 생성 # target_event_type 리스트 중 # 채점기준표의 block_list의 answer 키값과 일치하는 값이 있으면 # block_event_order 리스트에 추가 if isinstance(block_list, list) and block_list: for block in block_list: answer = block.get("answer") # answer 키값을 안전하게 가져오기 if isinstance(answer, str) and answer in target_event_type: block_event_order.append(answer) # 리스트에 추가 print(f"▶ Processing question: {question_key}") # ✅ SCENE TYPE 처리 if question_type == "scene": scene_elements = find_element(project_data, element_path) if scene_elements: print(f"🟨 Found elements for '{element_path}'") scene_elements = [convert_to_str(x) for x in scene_elements] if scene_elements == expected_answer or (expected_answer is None and scene_elements): total_points += question_points score_list.append(question_points) # print(f"✅ Matched: {scene_elements}") else: score_list.append(0) print(f"❌ Mismatch: {scene_elements} vs {expected_answer}") else: print(f"🟥 Element '{element_path}' not found") score_list.append(0) # ✅ SCRIPT TYPE 처리 elif question_type == "script": script_raw = find_script_element(project_data, element_path) script_json = json.loads(script_raw) if script_raw else None # 스크립트 블록 순서 재정렬 script_data = reorder_script_all_cases(script_json, block_event_order) if script_json else None block_index = 1 for block in block_list: block_type = block.get('type') block_path = block.get('ele') block_answer = block.get('answer', None) block_points = block.get('point') if script_data is None: print(f"{question_key}-{block_index}: Script Not Found") score_list.append("확인 필요") block_index += 1 continue # 1. 현재 블록(문제)이 정답인지 판별하는 플래그 is_block_correct = False # 2. 로깅을 위해 마지막으로 찾은 값을 저장할 변수 last_found_values = None # 3. script_data가 단일 객체여도 처리 가능하도록 리스트로 통일 scripts_to_check = script_data if isinstance(script_data, list) else [script_data] # 4. 여러 스크립트 뭉치를 순회하며 정답이 하나라도 있는지 확인 for single_script in scripts_to_check: # 단일 스크립트 객체(single_script)에 대해 블록 요소 검색 if block_type == "list": block_elements = find_list_element(single_script, block_path) else: block_elements = find_element(single_script, block_path) # 결과값 정리 if block_elements and isinstance(block_answer, list): # 1. 비어있는 sublist를 ['None']으로 먼저 치환합니다. # - sublist가 비어있지 않으면(if sublist) sublist를 그대로 사용하고, # - 비어있으면 ['None']으로 대체합니다. processed_elements = [sublist if sublist else ['None'] for sublist in block_elements] found_values = [convert_to_str(x) for x in itertools.chain.from_iterable(processed_elements)] else: found_values = convert_to_str(block_elements[0]) if block_elements else None # 실패 시 출력할 값을 위해 마지막으로 찾은 값을 저장 last_found_values = found_values expected_str = convert_to_str(block_answer) if block_answer is not None else None # 정답 조건 확인 if block_elements: # 5-1. 정답(block_answer)이 없고, 요소만 찾으면 되는 경우 if block_answer is None: is_block_correct = True # 5-2. 정답이 있고, 찾은 값과 일치하는 경우 elif expected_str == found_values: is_block_correct = True # 6. 정답을 찾았으면, 더 이상 다른 스크립트를 확인할 필요 없이 내부 반복문 탈출 if is_block_correct: break # for single_script in scripts_to_check: 루프를 중단 # 7. 내부 반복문 종료 후, 플래그를 기반으로 최종 점수 처리 if is_block_correct: # 정답을 맞힌 경우 if block_answer is not None: print(f"{question_key}-{block_index}: ✅ {expected_str} == {last_found_values}") else: print(f"{question_key}-{block_index}: Element Exists") total_points += block_points score_list.append(block_points) else: # 모든 스크립트를 확인했지만 정답이 없는 경우 if last_found_values is not None: # 요소는 찾았으나 값이 틀린 경우 print(f"{question_key}-{block_index}: ❌ {expected_str} != {last_found_values}") else: # 요소를 전혀 찾지 못한 경우 print(f"{question_key}-{block_index}: No elements found for {block_path}") score_list.append(0) block_index += 1 # total_points = round(total_points, ndigits=0) # 총점 반올림 score_list.append(total_points) return score_list def normalize_path(path): """한글 경로명을 NFC 방식으로 정규화""" return unicodedata.normalize('NFC', path) def main(): timestamp = datetime.now().strftime("%y%m%d") test_mode = False # 테스트 모드 설정 # test_mode = True # 테스트 모드 설정 exam_round = "2511" exam_names = ["CAT_3_A"] # 여러 시험명을 리스트로 설정 # exam_names = ["CAS_2_A"] # 여러 시험명을 리스트로 설정 excel_list = [] for exam_name in exam_names: scoring_json_path = f'./correct/{exam_round}_{exam_name}.json' project_json_path = f'./output/{"00_test" if test_mode else exam_round+"_"+exam_name}/' excel_path = f'{timestamp}_{exam_round}_{exam_name}_{"TEST" if test_mode else "채점결과"}.xlsx' scoring_data = read_json(scoring_json_path) student_score_list = [] # 컬럼명 생성 columns = ['학생명'] idx = 1 for key in scoring_data.keys(): if scoring_data[key].get('type') == 'scene': columns.append(f'{idx}') idx = idx + 1 elif scoring_data[key].get('type') == 'script': for i in range(len(scoring_data[key].get('blocks', []))): columns.append(f'{idx}') idx = idx + 1 columns.append('총점') # os.walk 결과를 리스트로 변환하고 정렬 walk_results = [] for root, dirs, files in os.walk(project_json_path): # 디렉토리명 정규화 normalized_root = normalize_path(root) normalized_dirs = [normalize_path(d) for d in dirs] normalized_files = [normalize_path(f) for f in files] normalized_dirs.sort() # 정규화된 디렉토리 정렬 walk_results.append((normalized_root, normalized_dirs, normalized_files)) # 정렬된 결과를 바탕으로 처리 for root, dirs, files in sorted(walk_results): for file in sorted(files): # 파일도 정렬 if file == 'project.json': full_path = os.path.join(root, file) print(f"\n🟠 Processing: {full_path}") try: # 디렉토리 패스 내에 학생 이름만 뽑아서 엑셀 컬럼 명으로 추가 match = re.search(r'(\d{6}[-_][^\\/]+)[\\/]', full_path) if match: student_id = match.group(1) else: if '정답' in full_path: student_id = '정답' else: student_id = '000000' # project.json 파일 내용 project_data = read_json(full_path) if project_data: extract_and_format_scripts(project_data, root) points = process_project(project_data, scoring_data) points.insert(0, student_id) student_score_list.append(points) print(f"Total Points for {points}") except Exception as e: logging.exception(f"🚫Error processing {full_path}: {str(e)}") continue # DataFrame 생성 및 엑셀 저장 df = pd.DataFrame(student_score_list, columns=columns).transpose() df.columns = df.iloc[0] df = df[1:] df.to_excel(excel_path, index=False) excel_list.append(excel_path) if excel_list: print(f"\nResults saved to {excel_list}") if __name__ == "__main__": main()