#!/usr/bin/env python3 """计算 JointDetialTraj 类文件的速度 / 加速度 / 跃度峰值,并与当前生效轴限位对比。 输入格式: time joint1 joint2 ... jointN 本脚本采用的规则: 1. 将轨迹按离散时间采样点读取,允许时间轴非等间隔。 2. 自动推断角度单位: - 任一关节绝对值超过 2*pi*1.5,则按 degree 处理 - 否则按 radian 处理 3. 使用后向差分计算导数: v_i = (q_i - q_{i-1}) / dt_i a_i = (v_i - v_{i-1}) / dt_i j_i = (a_i - a_{i-1}) / dt_i 4. 所有导数量统一换算成 rad 基单位,再与当前生效的机器人限值比较。 当前生效限值来源: .robot limit.velocity .robot limit.acceleration * RobotConfig.robot.acc_limit .robot limit.jerk * RobotConfig.robot.jerk_limit """ from __future__ import annotations import argparse import math from dataclasses import dataclass from pathlib import Path from typing import Iterable AUTO_DEG_THRESHOLD = 2.0 * math.pi * 1.5 DEFAULT_VELOCITY_LIMITS = [6.45, 5.41, 7.15, 9.59, 9.51, 17.45] DEFAULT_ACCELERATION_LIMITS = [26.90, 22.54, 29.81, 39.99, 39.63, 72.72] DEFAULT_JERK_LIMITS = [224.22, 187.86, 248.46, 333.30, 330.27, 606.01] DEFAULT_JOINT_NAMES = [f"Joint{index}" for index in range(1, 7)] @dataclass(frozen=True) class JointLimit: name: str velocity: float acceleration: float jerk: float @dataclass(frozen=True) class PeakMetric: joint_name: str axis_index: int window_start: float window_end: float row_number: int metric_native: float metric_rad: float effective_limit_rad: float @property def ratio_vs_limit(self) -> float: return abs(self.metric_rad) / self.effective_limit_rad @dataclass(frozen=True) class EffectiveLimits: joints: list[JointLimit] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Calculate velocity / acceleration / jerk peaks from JointDetialTraj.txt and compare with built-in effective robot limits." ) parser.add_argument("joint_detail", type=Path, help="Path to JointDetialTraj.txt") parser.add_argument( "--limit-csv", type=Path, default=None, help="Optional CSV file with columns: Joint,Velocity,Acceleration,Jerk . If omitted, use built-in 1/1 effective limits.", ) parser.add_argument( "--unit", choices=("auto", "rad", "deg"), default="auto", help="Input joint-angle unit. Default: auto.", ) return parser.parse_args() def resolve_path(path: Path) -> Path: return path if path.is_absolute() else (Path.cwd() / path).resolve() def read_joint_rows(path: Path) -> list[list[float]]: rows: list[list[float]] = [] for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line: continue rows.append([float(part) for part in line.split()]) if len(rows) < 4: raise ValueError(f"{path} must contain at least 4 rows to calculate jerk.") width = len(rows[0]) if width < 3: raise ValueError(f"{path} must contain time + at least 2 joint columns.") for index, row in enumerate(rows, start=1): if len(row) != width: raise ValueError(f"{path} line {index} has inconsistent column count.") return rows def infer_unit(rows: Iterable[list[float]], requested_unit: str) -> str: if requested_unit != "auto": return requested_unit max_abs_joint = max(abs(value) for row in rows for value in row[1:]) return "deg" if max_abs_joint > AUTO_DEG_THRESHOLD else "rad" def read_limit_csv(path: Path) -> list[JointLimit]: rows = [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()] if len(rows) < 2: raise ValueError(f"{path} must contain a header and at least one data row.") header = [part.strip().lower() for part in rows[0].split(",")] expected = ["joint", "velocity", "acceleration", "jerk"] if header != expected: raise ValueError(f"{path} header must be: Joint,Velocity,Acceleration,Jerk") limits: list[JointLimit] = [] for row_index, row in enumerate(rows[1:], start=2): parts = [part.strip() for part in row.split(",")] if len(parts) != 4: raise ValueError(f"{path} line {row_index} must contain 4 columns.") limits.append( JointLimit( name=parts[0], velocity=float(parts[1]), acceleration=float(parts[2]), jerk=float(parts[3]), ) ) return limits def load_effective_limits(limit_csv_path: Path | None) -> EffectiveLimits: if limit_csv_path is not None: limits = read_limit_csv(resolve_path(limit_csv_path)) else: limits = [ JointLimit( name=name, velocity=velocity, acceleration=acceleration, jerk=jerk, ) for name, velocity, acceleration, jerk in zip( DEFAULT_JOINT_NAMES, DEFAULT_VELOCITY_LIMITS, DEFAULT_ACCELERATION_LIMITS, DEFAULT_JERK_LIMITS, strict=True, ) ] return EffectiveLimits(joints=limits) def to_radians(value: float, unit: str) -> float: return math.radians(value) if unit == "deg" else value def to_native_from_rad(value: float, unit: str) -> float: return math.degrees(value) if unit == "deg" else value def calculate_velocity_peaks(rows: list[list[float]], unit: str, limits: list[JointLimit]) -> list[PeakMetric]: joint_count = len(rows[0]) - 1 if joint_count != len(limits): raise ValueError(f"Joint column count ({joint_count}) does not match robot limit count ({len(limits)}).") peaks: list[PeakMetric | None] = [None] * joint_count for row_index in range(1, len(rows)): previous = rows[row_index - 1] current = rows[row_index] dt = current[0] - previous[0] if dt <= 0.0: raise ValueError(f"Non-positive dt at line {row_index + 1}: {dt}") for joint_index in range(joint_count): dq_native = current[joint_index + 1] - previous[joint_index + 1] dq_rad = to_radians(dq_native, unit) velocity_rad = dq_rad / dt velocity_native = to_native_from_rad(velocity_rad, unit) candidate = PeakMetric( joint_name=limits[joint_index].name, axis_index=joint_index + 1, window_start=previous[0], window_end=current[0], row_number=row_index + 1, metric_native=velocity_native, metric_rad=velocity_rad, effective_limit_rad=limits[joint_index].velocity, ) current_peak = peaks[joint_index] if current_peak is None or abs(candidate.metric_rad) > abs(current_peak.metric_rad): peaks[joint_index] = candidate return [peak for peak in peaks if peak is not None] def calculate_acceleration_peaks(rows: list[list[float]], unit: str, limits: list[JointLimit]) -> list[PeakMetric]: joint_count = len(rows[0]) - 1 if joint_count != len(limits): raise ValueError(f"Joint column count ({joint_count}) does not match robot limit count ({len(limits)}).") velocities_rad: list[list[float]] = [] velocity_windows: list[tuple[float, float, int]] = [] for row_index in range(1, len(rows)): previous = rows[row_index - 1] current = rows[row_index] dt = current[0] - previous[0] if dt <= 0.0: raise ValueError(f"Non-positive dt at line {row_index + 1}: {dt}") velocity_row = [] for joint_index in range(joint_count): dq_native = current[joint_index + 1] - previous[joint_index + 1] dq_rad = to_radians(dq_native, unit) velocity_row.append(dq_rad / dt) velocities_rad.append(velocity_row) velocity_windows.append((previous[0], current[0], row_index + 1)) peaks: list[PeakMetric | None] = [None] * joint_count for velocity_index in range(1, len(velocities_rad)): dt = velocity_windows[velocity_index][1] - velocity_windows[velocity_index][0] for joint_index in range(joint_count): acceleration_rad = (velocities_rad[velocity_index][joint_index] - velocities_rad[velocity_index - 1][joint_index]) / dt acceleration_native = to_native_from_rad(acceleration_rad, unit) candidate = PeakMetric( joint_name=limits[joint_index].name, axis_index=joint_index + 1, window_start=velocity_windows[velocity_index][0], window_end=velocity_windows[velocity_index][1], row_number=velocity_windows[velocity_index][2], metric_native=acceleration_native, metric_rad=acceleration_rad, effective_limit_rad=limits[joint_index].acceleration, ) current_peak = peaks[joint_index] if current_peak is None or abs(candidate.metric_rad) > abs(current_peak.metric_rad): peaks[joint_index] = candidate return [peak for peak in peaks if peak is not None] def calculate_jerk_peaks(rows: list[list[float]], unit: str, limits: list[JointLimit]) -> list[PeakMetric]: joint_count = len(rows[0]) - 1 if joint_count != len(limits): raise ValueError(f"Joint column count ({joint_count}) does not match robot limit count ({len(limits)}).") velocities_rad: list[list[float]] = [] velocity_windows: list[tuple[float, float, int]] = [] for row_index in range(1, len(rows)): previous = rows[row_index - 1] current = rows[row_index] dt = current[0] - previous[0] if dt <= 0.0: raise ValueError(f"Non-positive dt at line {row_index + 1}: {dt}") velocity_row = [] for joint_index in range(joint_count): dq_native = current[joint_index + 1] - previous[joint_index + 1] dq_rad = to_radians(dq_native, unit) velocity_row.append(dq_rad / dt) velocities_rad.append(velocity_row) velocity_windows.append((previous[0], current[0], row_index + 1)) accelerations_rad: list[list[float]] = [] acceleration_windows: list[tuple[float, float, int]] = [] for velocity_index in range(1, len(velocities_rad)): dt = velocity_windows[velocity_index][1] - velocity_windows[velocity_index][0] acceleration_row = [] for joint_index in range(joint_count): acceleration_row.append((velocities_rad[velocity_index][joint_index] - velocities_rad[velocity_index - 1][joint_index]) / dt) accelerations_rad.append(acceleration_row) acceleration_windows.append((velocity_windows[velocity_index][0], velocity_windows[velocity_index][1], velocity_windows[velocity_index][2])) peaks: list[PeakMetric | None] = [None] * joint_count for acceleration_index in range(1, len(accelerations_rad)): dt = acceleration_windows[acceleration_index][1] - acceleration_windows[acceleration_index][0] for joint_index in range(joint_count): jerk_rad = (accelerations_rad[acceleration_index][joint_index] - accelerations_rad[acceleration_index - 1][joint_index]) / dt jerk_native = to_native_from_rad(jerk_rad, unit) candidate = PeakMetric( joint_name=limits[joint_index].name, axis_index=joint_index + 1, window_start=acceleration_windows[acceleration_index][0], window_end=acceleration_windows[acceleration_index][1], row_number=acceleration_windows[acceleration_index][2], metric_native=jerk_native, metric_rad=jerk_rad, effective_limit_rad=limits[joint_index].jerk, ) current_peak = peaks[joint_index] if current_peak is None or abs(candidate.metric_rad) > abs(current_peak.metric_rad): peaks[joint_index] = candidate return [peak for peak in peaks if peak is not None] def format_table(peaks: list[PeakMetric], native_unit: str, rad_unit: str, limit_header: str) -> str: lines = [ f"{'Joint':<8} {'Window(s)':<20} {'Line':>6} {'Peak(' + native_unit + ')':>18} {'Peak(' + rad_unit + ')':>18} {limit_header:>20} {'Ratio':>10}", "-" * 108, ] for peak in peaks: lines.append( f"{peak.joint_name:<8} " f"{peak.window_start:>7.6f}->{peak.window_end:<10.6f} " f"{peak.row_number:>6} " f"{peak.metric_native:>18.6f} " f"{peak.metric_rad:>18.6f} " f"{peak.effective_limit_rad:>20.6f} " f"{peak.ratio_vs_limit:>10.4f}" ) return "\n".join(lines) def print_metric_section(title: str, peaks: list[PeakMetric], unit: str, native_suffix: str, rad_suffix: str, limit_header: str) -> None: print(build_metric_section(title, peaks, unit, native_suffix, rad_suffix, limit_header)) def build_metric_section(title: str, peaks: list[PeakMetric], unit: str, native_suffix: str, rad_suffix: str, limit_header: str) -> str: native_unit = f"deg/{native_suffix}" if unit == "deg" else f"rad/{native_suffix}" rad_unit = f"rad/{rad_suffix}" lines = [title, format_table(peaks, native_unit, rad_unit, limit_header)] worst = max(peaks, key=lambda item: item.ratio_vs_limit) metric_key = title.lower().replace(" ", "_") lines.append( f"worst_{metric_key}=" f"{worst.joint_name}, window={worst.window_start:.6f}->{worst.window_end:.6f}, " f"peak_rad={worst.metric_rad:.6f}, limit_rad={worst.effective_limit_rad:.6f}, " f"ratio={worst.ratio_vs_limit:.4f}" ) return "\n".join(lines) def build_report_text( joint_detail_path: Path, rows: list[list[float]], unit: str, max_abs_joint: float, limit_source_text: str, velocity_peaks: list[PeakMetric], acceleration_peaks: list[PeakMetric], jerk_peaks: list[PeakMetric], ) -> str: lines = [ f"joint_detail={joint_detail_path}", limit_source_text, f"row_count={len(rows)}", f"joint_count={len(rows[0]) - 1}", f"inferred_unit={unit}", f"max_abs_joint_value={max_abs_joint:.6f}", "", build_metric_section("Velocity Peaks", velocity_peaks, unit, "s", "s", "VelLimit(rad/s)"), "", build_metric_section("Acceleration Peaks", acceleration_peaks, unit, "s^2", "s^2", "AccLimit(rad/s^2)"), "", build_metric_section("Jerk Peaks", jerk_peaks, unit, "s^3", "s^3", "JerkLimit(rad/s^3)"), "", ] return "\n".join(lines) def main() -> int: args = parse_args() joint_detail_path = resolve_path(args.joint_detail) rows = read_joint_rows(joint_detail_path) unit = infer_unit(rows, args.unit) limits_info = load_effective_limits(args.limit_csv) velocity_peaks = calculate_velocity_peaks(rows, unit, limits_info.joints) acceleration_peaks = calculate_acceleration_peaks(rows, unit, limits_info.joints) jerk_peaks = calculate_jerk_peaks(rows, unit, limits_info.joints) max_abs_joint = max(abs(value) for row in rows for value in row[1:]) if args.limit_csv is None: limit_source_text = "limit_source=built-in fixed effective limits (acc_limit=1, jerk_limit=1)" else: limit_source_text = f"limit_source_csv={resolve_path(args.limit_csv)}" report_text = build_report_text( joint_detail_path=joint_detail_path, rows=rows, unit=unit, max_abs_joint=max_abs_joint, limit_source_text=limit_source_text, velocity_peaks=velocity_peaks, acceleration_peaks=acceleration_peaks, jerk_peaks=jerk_peaks, ) print(report_text, end="") output_path = joint_detail_path.with_suffix(".analysis.txt") try: output_path.write_text(report_text, encoding="utf-8") print(f"saved_report={output_path}") except PermissionError as error: print(f"save_report_failed={output_path}") print(f"save_report_error={error}") return 0 if __name__ == "__main__": raise SystemExit(main())