#!/usr/bin/env python3 """生成 2026042802-1 抓包中触发偏移 6/7/8 周期的对照表。""" from __future__ import annotations import csv import json import math from pathlib import Path REPO_ROOT = Path(__file__).resolve().parents[1] OUTPUT_DIR = REPO_ROOT / "analysis" / "2026042802-1" CONFIG_PATH = REPO_ROOT / "Config" / "RobotConfig.json" ACTUAL_SEND_CSV = OUTPUT_DIR / "2026042802-1_j519_actual_send_all_with_io.csv" TRIGGER_CSV = OUTPUT_DIR / "2026042802-1_j519_trigger_frames.csv" OUTPUT_CSV = OUTPUT_DIR / "2026042802-1_trigger_offset_6_7_8_compare.csv" OUTPUT_JSON = OUTPUT_DIR / "2026042802-1_trigger_offset_6_7_8_summary.json" def load_rows(path: Path) -> list[dict]: with path.open(encoding="utf-8") as handle: return list(csv.DictReader(handle)) def to_float_list(record: dict, prefix: str = "j") -> list[float]: return [float(record[f"{prefix}{index}_deg"]) for index in range(1, 7)] def compute_diff_metrics(actual_deg: list[float], teach_deg: list[float]) -> tuple[list[float], float, float, str]: diffs = [actual_deg[index] - teach_deg[index] for index in range(6)] abs_diffs = [abs(value) for value in diffs] max_error = max(abs_diffs) rms_error = math.sqrt(sum(value * value for value in diffs) / 6.0) max_axis = f"J{abs_diffs.index(max_error) + 1}" return diffs, max_error, rms_error, max_axis def main() -> None: actual_rows = load_rows(ACTUAL_SEND_CSV) trigger_rows = load_rows(TRIGGER_CSV) config = json.loads(CONFIG_PATH.read_text(encoding="utf-8"))["flying_shots"]["UTTC_MS11"] trigger_waypoint_indices = [index for index, flag in enumerate(config["shot_flags"]) if flag] actual_index_by_frame = {int(row["frame_number"]): row for row in actual_rows} actual_order_by_frame = {int(row["frame_number"]): idx for idx, row in enumerate(actual_rows)} output_rows: list[dict] = [] offset_win_counts = {6: 0, 7: 0, 8: 0} for trigger_no, (trigger_row, waypoint_index) in enumerate(zip(trigger_rows, trigger_waypoint_indices), start=1): trigger_frame = int(trigger_row["frame_number"]) trigger_order = actual_order_by_frame[trigger_frame] teach_deg = [math.degrees(value) for value in config["traj_waypoints"][waypoint_index]] window_start = max(0, trigger_order - 20) window_end = min(len(actual_rows) - 1, trigger_order + 20) best_order = trigger_order best_rms = float("inf") best_max = float("inf") for candidate_order in range(window_start, window_end + 1): candidate = actual_rows[candidate_order] _, max_error, rms_error, _ = compute_diff_metrics(to_float_list(candidate), teach_deg) score = (rms_error, max_error, abs(candidate_order - trigger_order)) if (best_rms, best_max, abs(best_order - trigger_order)) > score: best_order = candidate_order best_rms = rms_error best_max = max_error row = { "trigger_no": trigger_no, "waypoint_index": waypoint_index, "best_sample_order": best_order, "best_frame_number": int(actual_rows[best_order]["frame_number"]), "best_sequence": int(actual_rows[best_order]["sequence"]), "best_time_relative_s": float(actual_rows[best_order]["time_relative_s"]), } for joint_index in range(6): row[f"teach_j{joint_index + 1}_deg"] = teach_deg[joint_index] for offset in (6, 7, 8): target_order = min(len(actual_rows) - 1, best_order + offset) target = actual_rows[target_order] actual_deg = to_float_list(target) diffs, max_error, rms_error, max_axis = compute_diff_metrics(actual_deg, teach_deg) row[f"offset_{offset}_frame_number"] = int(target["frame_number"]) row[f"offset_{offset}_sequence"] = int(target["sequence"]) row[f"offset_{offset}_time_relative_s"] = float(target["time_relative_s"]) row[f"offset_{offset}_max_error_axis"] = max_axis row[f"offset_{offset}_max_error_deg"] = max_error row[f"offset_{offset}_rms_error_deg"] = rms_error row[f"offset_{offset}_delta_from_best_ms"] = ( float(target["time_relative_s"]) - float(actual_rows[best_order]["time_relative_s"]) ) * 1000.0 for joint_index in range(6): joint_no = joint_index + 1 row[f"offset_{offset}_j{joint_no}_actual_deg"] = actual_deg[joint_index] row[f"offset_{offset}_diff_j{joint_no}_deg"] = diffs[joint_index] best_offset = min( (6, 7, 8), key=lambda offset: ( row[f"offset_{offset}_rms_error_deg"], row[f"offset_{offset}_max_error_deg"], ), ) row["best_of_6_7_8_offset"] = best_offset offset_win_counts[best_offset] += 1 output_rows.append(row) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) with OUTPUT_CSV.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=list(output_rows[0].keys())) writer.writeheader() writer.writerows(output_rows) summary = { "rows": len(output_rows), "best_offset_win_counts": offset_win_counts, "average_max_error_deg": { str(offset): sum(row[f"offset_{offset}_max_error_deg"] for row in output_rows) / len(output_rows) for offset in (6, 7, 8) }, "average_rms_error_deg": { str(offset): sum(row[f"offset_{offset}_rms_error_deg"] for row in output_rows) / len(output_rows) for offset in (6, 7, 8) }, } OUTPUT_JSON.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(summary, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()