Files
cat/main.py

316 lines
12 KiB
Python
Raw Normal View History

from jsonpath_ng.ext import parse
import json
from itertools import chain
2025-04-11 14:51:29 +09:00
import os
import pandas as pd # 추가된 import
import unicodedata # 상단에 import 추가
import re # 상단에 추가
from datetime import datetime
2025-08-01 17:28:24 +09:00
import logging
from logging_config import setup_logging # logging 설정을 위한 import
import traceback
2025-09-01 17:53:37 +09:00
from script_utils import extract_and_format_scripts # 스크립트 추출 함수 import
2025-08-01 17:28:24 +09:00
setup_logging() # logging 설정 호출
# JSON 파일 읽기
def read_json(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return json.load(file)
# 요소 탐색 함수
def find_element(project_data, jsonpath_expr):
jsonpath_expr = parse(jsonpath_expr)
result = []
for match in jsonpath_expr.find(project_data):
value = match.value
result.append(value)
return result
# 요소 탐색 함수
def find_script_element(project_data, jsonpath_expr):
jsonpath_expr = parse(jsonpath_expr)
match = jsonpath_expr.find(project_data)
if not match:
return None
return match[0].value
# jsonpath_expr_list 로 넘어온 jsonpath들을 하나씩 parse 해주고 결과를 result 리스트로 반환
def find_list_element(data, jsonpath_expr_list):
result = []
for jsonpath_expr in jsonpath_expr_list:
jsonpath_expr = parse(jsonpath_expr)
result.append([match.value for match in jsonpath_expr.find(data)])
return result
2025-04-11 14:51:29 +09:00
# 스크립트 채점 진행 전 스크립트 블럭 순서가 when_run_button_click 1번째, when_clone_start 2번째 배열에 없으면
# 리스트 순서 스왑해서 각각 0, 1번 순서로 배치될 수 있도록 함
# when_scene_start 1번째, when_message_cast 2번째 동일한 순서로 리스트 순서 스왑해서 정렬
# when_scene_start = [장면이 시작되었을 때]
# when_message_cast = ["성공"신호를 받았을 때]
2025-04-11 14:51:29 +09:00
def swap_script(origin):
"""스크립트 블록 순서 정렬 함수"""
if not origin or len(origin) == 0:
return origin
# 스크립트 블록 분류를 위한 임시 저장소
run_button_blocks = []
2025-04-11 14:51:29 +09:00
key_press_blocks = []
clone_start_block = None
message_cast_block = []
2025-04-11 14:51:29 +09:00
other_blocks = []
2025-04-11 14:51:29 +09:00
# 각 블록을 타입별로 분류
for i, block in enumerate(origin):
if not block or len(block) == 0:
continue
block_type = block[0].get("type")
# # 기존 로직: run_button과 scene_start 처리
# if (block_type == "when_run_button_click" or block_type == "when_scene_start") and i > 0:
# print(f"swap script run button or scene start")
# swap = origin[0]
# origin[0] = origin[i]
# origin[i] = swap
2025-04-11 14:51:29 +09:00
# # 기존 로직: clone_start와 message_cast 처리
# elif (block_type == "when_clone_start" or block_type == "when_message_cast") and i > 0:
# print(f"swap script clone start or msg cast")
# swap = origin[1]
# origin[1] = origin[i]
# origin[i] = swap
2025-04-11 14:51:29 +09:00
# 새로운 로직: 키 이벤트 처리
if block_type == "when_run_button_click":
run_button_blocks.append(block)
2025-04-11 14:51:29 +09:00
elif block_type == "when_some_key_pressed":
# params[1]에서 키 값 확인 (49, 50, 51) 아스키코드 값이므로 필요하면 추가 가능
key_value = block[0].get("params")[1]
2025-09-01 17:53:37 +09:00
if key_value in ["32","49","50","51"]:
2025-04-11 14:51:29 +09:00
key_press_blocks.append((key_value, block))
elif block_type == "when_clone_start":
clone_start_block = block
elif block_type == "when_message_cast":
message_cast_block.append(block)
2025-04-11 14:51:29 +09:00
else:
other_blocks.append(block)
2025-04-11 14:51:29 +09:00
# 결과 배열 재구성
result = []
# 1. when_run_button_click 블록을 첫 번째로 추가
if run_button_blocks:
result.extend(run_button_blocks)
2025-04-11 14:51:29 +09:00
# 2. when_some_key_pressed 블록들을 키 값 순서대로 정렬하여 추가
key_press_blocks.sort(key=lambda x: int(x[0]) if x[0] else 0)
result.extend([block for _, block in key_press_blocks])
# 3. clone_start와 message_cast 블록 추가
if clone_start_block:
result.append(clone_start_block)
if message_cast_block:
result.extend(message_cast_block)
2025-04-11 14:51:29 +09:00
# 4. 나머지 블록 추가
result.extend(other_blocks)
2025-04-11 14:51:29 +09:00
# 결과가 비어있으면 원본 반환
return result if result else origin
def clean_string(text):
"""문자열 끝의 . 또는 ! 제거"""
if isinstance(text, str):
# 문자열 끝의 . 또는 ! 제거
return re.sub(r'', '', text.strip())
return text
2025-04-11 14:51:29 +09:00
def convert_to_str(value):
"""값을 문자열로 변환"""
if isinstance(value, (list, tuple)):
return [convert_to_str(v) for v in value]
return str(value)
def process_project(project_data, scoring_data):
total_points = 0
score_list = []
for question_key, question_info in scoring_data.items():
element_path = question_info.get('ele')
question_type = question_info.get('type')
block_list = question_info.get('blocks')
expected_answer = question_info.get('answer')
question_points = question_info.get('points')
print(f"▶ Processing question: {question_key}")
# ✅ SCENE TYPE 처리
if question_type == "scene":
scene_elements = find_element(project_data, element_path)
if scene_elements:
print(f"🟨 Found elements for '{element_path}'")
scene_elements = [convert_to_str(x) for x in scene_elements]
if scene_elements == expected_answer or (expected_answer is None and scene_elements):
total_points += question_points
score_list.append(question_points)
# print(f"✅ Matched: {scene_elements}")
else:
score_list.append(0)
print(f"❌ Mismatch: {scene_elements} vs {expected_answer}")
else:
print(f"🟥 Element '{element_path}' not found")
score_list.append(0)
# ✅ SCRIPT TYPE 처리
elif question_type == "script":
script_raw = find_script_element(project_data, element_path)
2025-09-01 17:53:37 +09:00
script_json = json.loads(script_raw) if script_raw else None
script_data = swap_script(script_json) if script_json else None
block_index = 1
for block in block_list:
block_type = block.get('type')
block_path = block.get('ele')
2025-08-01 17:28:24 +09:00
block_answer = block.get('answer', None)
block_points = block.get('points')
if script_data is None:
print(f"{question_key}-{block_index}: Script Not Found")
score_list.append("확인 필요")
block_index += 1
continue
# 블록 요소 검색
if block_type == "list":
block_elements = find_list_element(script_data, block_path)
else:
block_elements = find_element(script_data, block_path)
# 결과값 정리
2025-08-01 17:28:24 +09:00
if block_elements and isinstance(block_answer, list):
found_values = [convert_to_str(x) for x in chain.from_iterable(block_elements)]
else:
found_values = convert_to_str(block_elements[0]) if block_elements else None
2025-08-01 17:28:24 +09:00
expected_str = convert_to_str(block_answer) if block_answer is not None else None
# 비교 및 점수 처리
if block_elements:
2025-08-01 17:28:24 +09:00
if block_answer is not None and expected_str != found_values:
print(f"{question_key}-{block_index}: ❌ {expected_str} != {found_values}")
score_list.append(0)
2025-08-01 17:28:24 +09:00
elif block_answer is not None and expected_str == found_values:
print(f"{question_key}-{block_index}: ✅ {expected_str} == {found_values}")
total_points += block_points
score_list.append(block_points)
2025-08-01 17:28:24 +09:00
elif block_answer is None:
total_points += block_points
score_list.append(block_points)
print(f"{question_key}-{block_index}: Element Exists")
else:
print(f"{question_key}-{block_index}: No elements found for {block_path}")
score_list.append(0)
block_index += 1
2025-08-01 17:28:24 +09:00
# total_points = round(total_points, ndigits=0) # 총점 반올림
score_list.append(total_points)
return score_list
2025-04-11 14:51:29 +09:00
def normalize_path(path):
"""한글 경로명을 NFC 방식으로 정규화"""
2025-04-11 14:51:29 +09:00
return unicodedata.normalize('NFC', path)
2025-09-01 17:53:37 +09:00
2025-04-11 14:51:29 +09:00
def main():
2025-09-01 17:53:37 +09:00
timestamp = datetime.now().strftime("%y%m%d")
test_mode = False # 테스트 모드 설정
# test_mode = True # 테스트 모드 설정
exam_round = "2508"
exam_names = ["CAS_2_A", "CAS_2_B"] # 여러 시험명을 리스트로 설정
excel_list = []
2025-04-11 14:51:29 +09:00
2025-09-01 17:53:37 +09:00
for exam_name in exam_names:
scoring_json_path = f'./correct/{exam_round}_{exam_name}.json'
project_json_path = f'./output/{"00_test" if test_mode else exam_round+"_"+exam_name}/'
excel_path = f'{timestamp}_{exam_round}_{exam_name}_{"TEST" if test_mode else "채점결과"}.xlsx'
scoring_data = read_json(scoring_json_path)
student_score_list = []
# 컬럼명 생성
columns = ['학생명']
idx = 1
for key in scoring_data.keys():
if scoring_data[key].get('type') == 'scene':
columns.append(f'{idx}')
2025-04-11 14:51:29 +09:00
idx = idx + 1
2025-09-01 17:53:37 +09:00
elif scoring_data[key].get('type') == 'script':
for i in range(len(scoring_data[key].get('blocks', []))):
columns.append(f'{idx}')
idx = idx + 1
2025-04-11 14:51:29 +09:00
2025-09-01 17:53:37 +09:00
columns.append('총점')
2025-04-11 14:51:29 +09:00
2025-09-01 17:53:37 +09:00
# os.walk 결과를 리스트로 변환하고 정렬
walk_results = []
for root, dirs, files in os.walk(project_json_path):
# 디렉토리명 정규화
normalized_root = normalize_path(root)
normalized_dirs = [normalize_path(d) for d in dirs]
normalized_files = [normalize_path(f) for f in files]
normalized_dirs.sort() # 정규화된 디렉토리 정렬
walk_results.append((normalized_root, normalized_dirs, normalized_files))
# 정렬된 결과를 바탕으로 처리
for root, dirs, files in sorted(walk_results):
for file in sorted(files): # 파일도 정렬
if file == 'project.json':
full_path = os.path.join(root, file)
print(f"\nProcessing: {full_path}")
try:
# 디렉토리 패스 내에 학생 이름만 뽑아서 엑셀 컬럼 명으로 추가
match = re.search(r'(\d{6}[-_][^\\/]+)[\\/]', full_path)
if match:
student_id = match.group(1)
2025-08-01 17:28:24 +09:00
else:
2025-09-01 17:53:37 +09:00
if '정답' in full_path:
student_id = '정답'
else:
student_id = '000000'
# project.json 파일 내용
project_data = read_json(full_path)
if project_data:
extract_and_format_scripts(project_data, root)
points = process_project(project_data, scoring_data)
points.insert(0, student_id)
student_score_list.append(points)
print(f"Total Points for {points}")
except Exception as e:
logging.exception(f"🚫Error processing {full_path}: {str(e)}")
continue
# DataFrame 생성 및 엑셀 저장
df = pd.DataFrame(student_score_list, columns=columns).transpose()
df.columns = df.iloc[0]
df = df[1:]
df.to_excel(excel_path, index=False)
excel_list.append(excel_path)
if excel_list:
print(f"\nResults saved to {excel_list}")
if __name__ == "__main__":
main()