Files
FlyShotHost/analysis/analyze_2026042802_1_trigger_vs_teach_points.py
yunxiao.zhu f7e2bb0e7b feat(*): 添加触发样本偏移与实发轨迹分析导出
* 为 RobotConfig 增加 trigger_sample_index_offset_cycles 配置
  * 让 DO 事件携带示教点关节角并按最接近 sample 绑定触发
  * 调整运行时 IO 地址位掩码映射并补充 ShotEvents 导出
  * 新增 2026042802-1 抓包分析脚本、数据产物与结论文档
  * 补齐配置兼容、规划绑定和运行时触发相关测试
2026-05-09 11:12:31 +08:00

251 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""提取 2026042802-1 抓包中的真实 J519 发包,并对比 UTTC_MS11 示教点。"""
from __future__ import annotations
import csv
import json
import math
import struct
import subprocess
from collections import Counter
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_PCAP = REPO_ROOT.parent / "Rvbust" / "uttc-20260428" / "2026042802-1.pcap"
DEFAULT_TSHARK = Path(r"D:\Zyx\Downloads\WiresharkPortable32\App\Wireshark\tshark.exe")
OUTPUT_DIR = REPO_ROOT / "analysis" / "2026042802-1"
CONFIG_PATH = REPO_ROOT / "Config" / "RobotConfig.json"
RUNTIME_DATA_DIR = REPO_ROOT / "Config" / "Data" / "UTTC_MS11"
def be_u32(data: bytes, offset: int) -> int:
"""按大端读取 4 字节无符号整数。"""
return struct.unpack(">I", data[offset : offset + 4])[0]
def be_u16(data: bytes, offset: int) -> int:
"""按大端读取 2 字节无符号整数。"""
return struct.unpack(">H", data[offset : offset + 2])[0]
def be_f32(data: bytes, offset: int) -> float:
"""按大端读取 4 字节浮点数。"""
return struct.unpack(">f", data[offset : offset + 4])[0]
def load_j519_command_rows(pcap: Path, tshark: Path) -> list[list[str]]:
"""只提取 UDP 60015 的原始字段,后续再按 IP 方向筛选真实下发命令。"""
command = [
str(tshark),
"-r",
str(pcap),
"-Y",
"udp.port==60015",
"-T",
"fields",
"-e",
"frame.number",
"-e",
"frame.time_relative",
"-e",
"ip.src",
"-e",
"udp.srcport",
"-e",
"ip.dst",
"-e",
"udp.dstport",
"-e",
"udp.payload",
]
output = subprocess.check_output(command, text=True, encoding="utf-8", errors="ignore")
rows: list[list[str]] = []
for line in output.splitlines():
if not line.strip():
continue
parts = line.split("\t")
if len(parts) >= 7:
rows.append(parts[:7])
return rows
def decode_command_records(rows: list[list[str]], client_ip: str, robot_ip: str) -> list[dict]:
"""把抓包中的 64B J519 命令帧解码成带 IO 信息的结构化记录。"""
records: list[dict] = []
for frame_no, time_rel, ip_src, _udp_src, ip_dst, _udp_dst, payload_hex in rows:
if ip_src != client_ip or ip_dst != robot_ip:
continue
payload = bytes.fromhex(payload_hex)
if len(payload) != 64:
continue
io_value = be_u16(payload, 0x18)
io_addrs = [bit + 1 for bit in range(16) if io_value & (1 << bit)]
targets = [be_f32(payload, 0x1C + index * 4) for index in range(9)]
records.append(
{
"frame_number": int(frame_no),
"time_relative_s": float(time_rel),
"sequence": be_u32(payload, 0x08),
"last_data": payload[0x0C],
"write_io_type": payload[0x13],
"write_io_index": be_u16(payload, 0x14),
"write_io_mask": be_u16(payload, 0x16),
"write_io_value": io_value,
"io_addrs": io_addrs,
"j1_deg": targets[0],
"j2_deg": targets[1],
"j3_deg": targets[2],
"j4_deg": targets[3],
"j5_deg": targets[4],
"j6_deg": targets[5],
"ext1_deg": targets[6],
"ext2_deg": targets[7],
"ext3_deg": targets[8],
}
)
return records
def pick_trigger_first_high_frames(records: list[dict]) -> list[dict]:
"""由于 io_keep_cycles=2只记录每组高电平脉冲的第一帧。"""
trigger_frames: list[dict] = []
previous_high = False
for record in records:
current_high = record["write_io_value"] > 0
if current_high and not previous_high:
trigger_frames.append(record)
previous_high = current_high
return trigger_frames
def load_uttc_ms11_config() -> dict:
"""读取 UTTC_MS11 的示教点和触发配置。"""
config = json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
return config["flying_shots"]["UTTC_MS11"]
def build_trigger_vs_teach_rows(trigger_frames: list[dict], shot_config: dict) -> list[dict]:
"""按 shot_flags 为 true 的 waypoint 顺序,对齐抓包触发帧和示教点。"""
rows: list[dict] = []
trigger_waypoint_indices = [index for index, flag in enumerate(shot_config["shot_flags"]) if flag]
for trigger_no, (frame, waypoint_index) in enumerate(zip(trigger_frames, trigger_waypoint_indices), start=1):
teach_rad = shot_config["traj_waypoints"][waypoint_index]
teach_deg = [math.degrees(value) for value in teach_rad]
actual_deg = [frame[f"j{joint_index}_deg"] for joint_index in range(1, 7)]
diffs = [actual_deg[index] - teach_deg[index] for index in range(6)]
abs_diffs = [abs(value) for value in diffs]
max_error = max(abs_diffs)
max_error_axis = f"J{abs_diffs.index(max_error) + 1}"
rms_error = math.sqrt(sum(value * value for value in diffs) / 6.0)
row = {
"trigger_no": trigger_no,
"waypoint_index": waypoint_index,
"frame_number": frame["frame_number"],
"sequence": frame["sequence"],
"time_relative_s": frame["time_relative_s"],
"write_io_value": frame["write_io_value"],
"io_addrs": frame["io_addrs"],
"config_addr": shot_config["addr"][waypoint_index],
"max_error_axis": max_error_axis,
"max_error_deg": max_error,
"rms_error_deg": rms_error,
}
for joint_index in range(6):
joint_no = joint_index + 1
row[f"j{joint_no}_actual_deg"] = actual_deg[joint_index]
row[f"j{joint_no}_teach_deg"] = teach_deg[joint_index]
row[f"diff_j{joint_no}_deg"] = diffs[joint_index]
rows.append(row)
return rows
def write_csv(path: Path, rows: list[dict]) -> None:
"""把分析结果落成 UTF-8 CSV便于后续继续筛选和画图。"""
if not rows:
raise ValueError(f"No rows to write: {path}")
serializable_rows: list[dict] = []
for row in rows:
serializable_row: dict = {}
for key, value in row.items():
if isinstance(value, list):
serializable_row[key] = json.dumps(value, ensure_ascii=False)
else:
serializable_row[key] = value
serializable_rows.append(serializable_row)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=list(serializable_rows[0].keys()))
writer.writeheader()
writer.writerows(serializable_rows)
def build_summary(records: list[dict], trigger_rows: list[dict]) -> dict:
"""汇总本次分析关心的导出状态和误差统计。"""
max_errors = [float(row["max_error_deg"]) for row in trigger_rows]
rms_errors = [float(row["rms_error_deg"]) for row in trigger_rows]
axis_counter = Counter(str(row["max_error_axis"]) for row in trigger_rows)
order_only_addr_mismatch = 0
real_addr_mismatch = 0
for row in trigger_rows:
io_addrs = list(row["io_addrs"])
config_addr = list(row["config_addr"])
if io_addrs != config_addr:
if sorted(io_addrs) == sorted(config_addr):
order_only_addr_mismatch += 1
else:
real_addr_mismatch += 1
return {
"pcap_path": str(DEFAULT_PCAP),
"all_command_count": len(records),
"trigger_count": len(trigger_rows),
"existing_runtime_actual_send_exists": (RUNTIME_DATA_DIR / "ActualSendJointTraj.txt").exists(),
"existing_runtime_actual_send_has_io_columns": False,
"existing_shot_events_exists": (RUNTIME_DATA_DIR / "ShotEvents.json").exists(),
"pcap_specific_combined_export_preexisting": False,
"average_max_error_deg": sum(max_errors) / len(max_errors),
"max_error_deg": max(max_errors),
"average_rms_error_deg": sum(rms_errors) / len(rms_errors),
"max_error_axis_counter": dict(axis_counter),
"order_only_addr_mismatch_count": order_only_addr_mismatch,
"real_addr_mismatch_count": real_addr_mismatch,
}
def main() -> None:
"""执行抓包提取、示教点对齐、CSV 导出和摘要落盘。"""
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
rows = load_j519_command_rows(DEFAULT_PCAP, DEFAULT_TSHARK)
records = decode_command_records(rows, client_ip="192.168.10.10", robot_ip="192.168.10.11")
trigger_frames = pick_trigger_first_high_frames(records)
shot_config = load_uttc_ms11_config()
trigger_rows = build_trigger_vs_teach_rows(trigger_frames, shot_config)
summary = build_summary(records, trigger_rows)
write_csv(OUTPUT_DIR / "2026042802-1_j519_actual_send_all_with_io.csv", records)
write_csv(OUTPUT_DIR / "2026042802-1_j519_trigger_frames.csv", trigger_frames)
write_csv(OUTPUT_DIR / "2026042802-1_trigger_vs_teach_points.csv", trigger_rows)
(OUTPUT_DIR / "2026042802-1_analysis_summary.json").write_text(
json.dumps(summary, ensure_ascii=False, indent=2),
encoding="utf-8",
)
print(json.dumps(summary, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()