- 改为高优先级 J519 接收线程与复用缓冲区发送链路 - 增加稠密执行前的 J519 就绪重试与状态诊断 - 修正程序状态响应字段顺序与 EnableRobot 默认参数 - 为飞拍轨迹补充平滑起停时间轴与首尾整形验证 - 补充真实运行配置、报警窗口与边界对比测试 - 同步更新限值文档、分析脚本与 .NET 8 SDK 固定配置
462 lines
17 KiB
Python
462 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""计算 JointDetialTraj 类文件的速度 / 加速度 / 跃度峰值,并与当前生效轴限位对比。
|
|
|
|
输入格式:
|
|
time joint1 joint2 ... jointN
|
|
|
|
本脚本采用的规则:
|
|
1. 将轨迹按离散时间采样点读取,允许时间轴非等间隔。
|
|
2. 自动推断角度单位:
|
|
- 任一关节绝对值超过 2*pi*1.5,则按 degree 处理
|
|
- 否则按 radian 处理
|
|
3. 使用后向差分计算导数:
|
|
v_i = (q_i - q_{i-1}) / dt_i
|
|
a_i = (v_i - v_{i-1}) / dt_i
|
|
j_i = (a_i - a_{i-1}) / dt_i
|
|
4. 所有导数量统一换算成 rad 基单位,再与当前生效的机器人限值比较。
|
|
|
|
当前生效限值来源:
|
|
.robot limit.velocity
|
|
.robot limit.acceleration * RobotConfig.robot.acc_limit
|
|
.robot limit.jerk * RobotConfig.robot.jerk_limit
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import math
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
|
|
AUTO_DEG_THRESHOLD = 2.0 * math.pi * 1.5
|
|
DEFAULT_VELOCITY_LIMITS = [6.45, 5.41, 7.15, 9.59, 9.51, 17.45]
|
|
DEFAULT_ACCELERATION_LIMITS = [26.90, 22.54, 29.81, 39.99, 39.63, 72.72]
|
|
DEFAULT_JERK_LIMITS = [224.22, 187.86, 248.46, 333.30, 330.27, 606.01]
|
|
DEFAULT_JOINT_NAMES = [f"Joint{index}" for index in range(1, 7)]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class JointLimit:
|
|
name: str
|
|
velocity: float
|
|
acceleration: float
|
|
jerk: float
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PeakMetric:
|
|
joint_name: str
|
|
axis_index: int
|
|
window_start: float
|
|
window_end: float
|
|
row_number: int
|
|
metric_native: float
|
|
metric_rad: float
|
|
effective_limit_rad: float
|
|
|
|
@property
|
|
def ratio_vs_limit(self) -> float:
|
|
return abs(self.metric_rad) / self.effective_limit_rad
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class EffectiveLimits:
|
|
joints: list[JointLimit]
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Calculate velocity / acceleration / jerk peaks from JointDetialTraj.txt and compare with built-in effective robot limits."
|
|
)
|
|
parser.add_argument("joint_detail", type=Path, help="Path to JointDetialTraj.txt")
|
|
parser.add_argument(
|
|
"--limit-csv",
|
|
type=Path,
|
|
default=None,
|
|
help="Optional CSV file with columns: Joint,Velocity,Acceleration,Jerk . If omitted, use built-in 1/1 effective limits.",
|
|
)
|
|
parser.add_argument(
|
|
"--unit",
|
|
choices=("auto", "rad", "deg"),
|
|
default="auto",
|
|
help="Input joint-angle unit. Default: auto.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def resolve_path(path: Path) -> Path:
|
|
return path if path.is_absolute() else (Path.cwd() / path).resolve()
|
|
|
|
|
|
def read_joint_rows(path: Path) -> list[list[float]]:
|
|
rows: list[list[float]] = []
|
|
for raw_line in path.read_text(encoding="utf-8").splitlines():
|
|
line = raw_line.strip()
|
|
if not line:
|
|
continue
|
|
rows.append([float(part) for part in line.split()])
|
|
|
|
if len(rows) < 4:
|
|
raise ValueError(f"{path} must contain at least 4 rows to calculate jerk.")
|
|
|
|
width = len(rows[0])
|
|
if width < 3:
|
|
raise ValueError(f"{path} must contain time + at least 2 joint columns.")
|
|
|
|
for index, row in enumerate(rows, start=1):
|
|
if len(row) != width:
|
|
raise ValueError(f"{path} line {index} has inconsistent column count.")
|
|
|
|
return rows
|
|
|
|
|
|
def trim_rows_to_limit_count(rows: list[list[float]], limit_count: int) -> tuple[list[list[float]], str | None]:
|
|
joint_count = len(rows[0]) - 1
|
|
if joint_count == limit_count:
|
|
return rows, None
|
|
|
|
if joint_count < limit_count:
|
|
raise ValueError(f"Joint column count ({joint_count}) is smaller than robot limit count ({limit_count}).")
|
|
|
|
trimmed_rows = [row[: limit_count + 1] for row in rows]
|
|
ignored_joint_count = joint_count - limit_count
|
|
trim_note = (
|
|
f"ignored_joint_columns={ignored_joint_count} "
|
|
f"(using first {limit_count} joints out of {joint_count}; trailing columns treated as external axes/placeholders)"
|
|
)
|
|
return trimmed_rows, trim_note
|
|
|
|
|
|
def infer_unit(rows: Iterable[list[float]], requested_unit: str) -> str:
|
|
if requested_unit != "auto":
|
|
return requested_unit
|
|
|
|
max_abs_joint = max(abs(value) for row in rows for value in row[1:])
|
|
return "deg" if max_abs_joint > AUTO_DEG_THRESHOLD else "rad"
|
|
|
|
|
|
def read_limit_csv(path: Path) -> list[JointLimit]:
|
|
rows = [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()]
|
|
if len(rows) < 2:
|
|
raise ValueError(f"{path} must contain a header and at least one data row.")
|
|
|
|
header = [part.strip().lower() for part in rows[0].split(",")]
|
|
expected = ["joint", "velocity", "acceleration", "jerk"]
|
|
if header != expected:
|
|
raise ValueError(f"{path} header must be: Joint,Velocity,Acceleration,Jerk")
|
|
|
|
limits: list[JointLimit] = []
|
|
for row_index, row in enumerate(rows[1:], start=2):
|
|
parts = [part.strip() for part in row.split(",")]
|
|
if len(parts) != 4:
|
|
raise ValueError(f"{path} line {row_index} must contain 4 columns.")
|
|
limits.append(
|
|
JointLimit(
|
|
name=parts[0],
|
|
velocity=float(parts[1]),
|
|
acceleration=float(parts[2]),
|
|
jerk=float(parts[3]),
|
|
)
|
|
)
|
|
return limits
|
|
|
|
|
|
def load_effective_limits(limit_csv_path: Path | None) -> EffectiveLimits:
|
|
if limit_csv_path is not None:
|
|
limits = read_limit_csv(resolve_path(limit_csv_path))
|
|
else:
|
|
limits = [
|
|
JointLimit(
|
|
name=name,
|
|
velocity=velocity,
|
|
acceleration=acceleration,
|
|
jerk=jerk,
|
|
)
|
|
for name, velocity, acceleration, jerk in zip(
|
|
DEFAULT_JOINT_NAMES,
|
|
DEFAULT_VELOCITY_LIMITS,
|
|
DEFAULT_ACCELERATION_LIMITS,
|
|
DEFAULT_JERK_LIMITS,
|
|
strict=True,
|
|
)
|
|
]
|
|
|
|
return EffectiveLimits(joints=limits)
|
|
|
|
|
|
def to_radians(value: float, unit: str) -> float:
|
|
return math.radians(value) if unit == "deg" else value
|
|
|
|
|
|
def to_native_from_rad(value: float, unit: str) -> float:
|
|
return math.degrees(value) if unit == "deg" else value
|
|
|
|
|
|
def calculate_velocity_peaks(rows: list[list[float]], unit: str, limits: list[JointLimit]) -> list[PeakMetric]:
|
|
joint_count = len(rows[0]) - 1
|
|
if joint_count != len(limits):
|
|
raise ValueError(f"Joint column count ({joint_count}) does not match robot limit count ({len(limits)}).")
|
|
|
|
peaks: list[PeakMetric | None] = [None] * joint_count
|
|
|
|
for row_index in range(1, len(rows)):
|
|
previous = rows[row_index - 1]
|
|
current = rows[row_index]
|
|
dt = current[0] - previous[0]
|
|
if dt <= 0.0:
|
|
raise ValueError(f"Non-positive dt at line {row_index + 1}: {dt}")
|
|
|
|
for joint_index in range(joint_count):
|
|
dq_native = current[joint_index + 1] - previous[joint_index + 1]
|
|
dq_rad = to_radians(dq_native, unit)
|
|
velocity_rad = dq_rad / dt
|
|
velocity_native = to_native_from_rad(velocity_rad, unit)
|
|
|
|
candidate = PeakMetric(
|
|
joint_name=limits[joint_index].name,
|
|
axis_index=joint_index + 1,
|
|
window_start=previous[0],
|
|
window_end=current[0],
|
|
row_number=row_index + 1,
|
|
metric_native=velocity_native,
|
|
metric_rad=velocity_rad,
|
|
effective_limit_rad=limits[joint_index].velocity,
|
|
)
|
|
|
|
current_peak = peaks[joint_index]
|
|
if current_peak is None or abs(candidate.metric_rad) > abs(current_peak.metric_rad):
|
|
peaks[joint_index] = candidate
|
|
|
|
return [peak for peak in peaks if peak is not None]
|
|
|
|
|
|
def calculate_acceleration_peaks(rows: list[list[float]], unit: str, limits: list[JointLimit]) -> list[PeakMetric]:
|
|
joint_count = len(rows[0]) - 1
|
|
if joint_count != len(limits):
|
|
raise ValueError(f"Joint column count ({joint_count}) does not match robot limit count ({len(limits)}).")
|
|
|
|
velocities_rad: list[list[float]] = []
|
|
velocity_windows: list[tuple[float, float, int]] = []
|
|
|
|
for row_index in range(1, len(rows)):
|
|
previous = rows[row_index - 1]
|
|
current = rows[row_index]
|
|
dt = current[0] - previous[0]
|
|
if dt <= 0.0:
|
|
raise ValueError(f"Non-positive dt at line {row_index + 1}: {dt}")
|
|
|
|
velocity_row = []
|
|
for joint_index in range(joint_count):
|
|
dq_native = current[joint_index + 1] - previous[joint_index + 1]
|
|
dq_rad = to_radians(dq_native, unit)
|
|
velocity_row.append(dq_rad / dt)
|
|
|
|
velocities_rad.append(velocity_row)
|
|
velocity_windows.append((previous[0], current[0], row_index + 1))
|
|
|
|
peaks: list[PeakMetric | None] = [None] * joint_count
|
|
|
|
for velocity_index in range(1, len(velocities_rad)):
|
|
dt = velocity_windows[velocity_index][1] - velocity_windows[velocity_index][0]
|
|
for joint_index in range(joint_count):
|
|
acceleration_rad = (velocities_rad[velocity_index][joint_index] - velocities_rad[velocity_index - 1][joint_index]) / dt
|
|
acceleration_native = to_native_from_rad(acceleration_rad, unit)
|
|
|
|
candidate = PeakMetric(
|
|
joint_name=limits[joint_index].name,
|
|
axis_index=joint_index + 1,
|
|
window_start=velocity_windows[velocity_index][0],
|
|
window_end=velocity_windows[velocity_index][1],
|
|
row_number=velocity_windows[velocity_index][2],
|
|
metric_native=acceleration_native,
|
|
metric_rad=acceleration_rad,
|
|
effective_limit_rad=limits[joint_index].acceleration,
|
|
)
|
|
|
|
current_peak = peaks[joint_index]
|
|
if current_peak is None or abs(candidate.metric_rad) > abs(current_peak.metric_rad):
|
|
peaks[joint_index] = candidate
|
|
|
|
return [peak for peak in peaks if peak is not None]
|
|
|
|
|
|
def calculate_jerk_peaks(rows: list[list[float]], unit: str, limits: list[JointLimit]) -> list[PeakMetric]:
|
|
joint_count = len(rows[0]) - 1
|
|
if joint_count != len(limits):
|
|
raise ValueError(f"Joint column count ({joint_count}) does not match robot limit count ({len(limits)}).")
|
|
|
|
velocities_rad: list[list[float]] = []
|
|
velocity_windows: list[tuple[float, float, int]] = []
|
|
for row_index in range(1, len(rows)):
|
|
previous = rows[row_index - 1]
|
|
current = rows[row_index]
|
|
dt = current[0] - previous[0]
|
|
if dt <= 0.0:
|
|
raise ValueError(f"Non-positive dt at line {row_index + 1}: {dt}")
|
|
|
|
velocity_row = []
|
|
for joint_index in range(joint_count):
|
|
dq_native = current[joint_index + 1] - previous[joint_index + 1]
|
|
dq_rad = to_radians(dq_native, unit)
|
|
velocity_row.append(dq_rad / dt)
|
|
|
|
velocities_rad.append(velocity_row)
|
|
velocity_windows.append((previous[0], current[0], row_index + 1))
|
|
|
|
accelerations_rad: list[list[float]] = []
|
|
acceleration_windows: list[tuple[float, float, int]] = []
|
|
for velocity_index in range(1, len(velocities_rad)):
|
|
dt = velocity_windows[velocity_index][1] - velocity_windows[velocity_index][0]
|
|
acceleration_row = []
|
|
for joint_index in range(joint_count):
|
|
acceleration_row.append((velocities_rad[velocity_index][joint_index] - velocities_rad[velocity_index - 1][joint_index]) / dt)
|
|
accelerations_rad.append(acceleration_row)
|
|
acceleration_windows.append((velocity_windows[velocity_index][0], velocity_windows[velocity_index][1], velocity_windows[velocity_index][2]))
|
|
|
|
peaks: list[PeakMetric | None] = [None] * joint_count
|
|
|
|
for acceleration_index in range(1, len(accelerations_rad)):
|
|
dt = acceleration_windows[acceleration_index][1] - acceleration_windows[acceleration_index][0]
|
|
for joint_index in range(joint_count):
|
|
jerk_rad = (accelerations_rad[acceleration_index][joint_index] - accelerations_rad[acceleration_index - 1][joint_index]) / dt
|
|
jerk_native = to_native_from_rad(jerk_rad, unit)
|
|
|
|
candidate = PeakMetric(
|
|
joint_name=limits[joint_index].name,
|
|
axis_index=joint_index + 1,
|
|
window_start=acceleration_windows[acceleration_index][0],
|
|
window_end=acceleration_windows[acceleration_index][1],
|
|
row_number=acceleration_windows[acceleration_index][2],
|
|
metric_native=jerk_native,
|
|
metric_rad=jerk_rad,
|
|
effective_limit_rad=limits[joint_index].jerk,
|
|
)
|
|
|
|
current_peak = peaks[joint_index]
|
|
if current_peak is None or abs(candidate.metric_rad) > abs(current_peak.metric_rad):
|
|
peaks[joint_index] = candidate
|
|
|
|
return [peak for peak in peaks if peak is not None]
|
|
|
|
|
|
def format_table(peaks: list[PeakMetric], native_unit: str, rad_unit: str, limit_header: str) -> str:
|
|
lines = [
|
|
f"{'Joint':<8} {'Window(s)':<20} {'Line':>6} {'Peak(' + native_unit + ')':>18} {'Peak(' + rad_unit + ')':>18} {limit_header:>20} {'Ratio':>10}",
|
|
"-" * 108,
|
|
]
|
|
for peak in peaks:
|
|
lines.append(
|
|
f"{peak.joint_name:<8} "
|
|
f"{peak.window_start:>7.6f}->{peak.window_end:<10.6f} "
|
|
f"{peak.row_number:>6} "
|
|
f"{peak.metric_native:>18.6f} "
|
|
f"{peak.metric_rad:>18.6f} "
|
|
f"{peak.effective_limit_rad:>20.6f} "
|
|
f"{peak.ratio_vs_limit:>10.4f}"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def print_metric_section(title: str, peaks: list[PeakMetric], unit: str, native_suffix: str, rad_suffix: str, limit_header: str) -> None:
|
|
print(build_metric_section(title, peaks, unit, native_suffix, rad_suffix, limit_header))
|
|
|
|
|
|
def build_metric_section(title: str, peaks: list[PeakMetric], unit: str, native_suffix: str, rad_suffix: str, limit_header: str) -> str:
|
|
native_unit = f"deg/{native_suffix}" if unit == "deg" else f"rad/{native_suffix}"
|
|
rad_unit = f"rad/{rad_suffix}"
|
|
lines = [title, format_table(peaks, native_unit, rad_unit, limit_header)]
|
|
worst = max(peaks, key=lambda item: item.ratio_vs_limit)
|
|
metric_key = title.lower().replace(" ", "_")
|
|
lines.append(
|
|
f"worst_{metric_key}="
|
|
f"{worst.joint_name}, window={worst.window_start:.6f}->{worst.window_end:.6f}, "
|
|
f"peak_rad={worst.metric_rad:.6f}, limit_rad={worst.effective_limit_rad:.6f}, "
|
|
f"ratio={worst.ratio_vs_limit:.4f}"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def build_report_text(
|
|
joint_detail_path: Path,
|
|
rows: list[list[float]],
|
|
unit: str,
|
|
max_abs_joint: float,
|
|
limit_source_text: str,
|
|
trim_note: str | None,
|
|
velocity_peaks: list[PeakMetric],
|
|
acceleration_peaks: list[PeakMetric],
|
|
jerk_peaks: list[PeakMetric],
|
|
) -> str:
|
|
lines = [
|
|
f"joint_detail={joint_detail_path}",
|
|
limit_source_text,
|
|
f"row_count={len(rows)}",
|
|
f"joint_count={len(rows[0]) - 1}",
|
|
f"inferred_unit={unit}",
|
|
f"max_abs_joint_value={max_abs_joint:.6f}",
|
|
]
|
|
if trim_note is not None:
|
|
lines.append(trim_note)
|
|
lines.extend(
|
|
[
|
|
"",
|
|
build_metric_section("Velocity Peaks", velocity_peaks, unit, "s", "s", "VelLimit(rad/s)"),
|
|
"",
|
|
build_metric_section("Acceleration Peaks", acceleration_peaks, unit, "s^2", "s^2", "AccLimit(rad/s^2)"),
|
|
"",
|
|
build_metric_section("Jerk Peaks", jerk_peaks, unit, "s^3", "s^3", "JerkLimit(rad/s^3)"),
|
|
"",
|
|
]
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
joint_detail_path = resolve_path(args.joint_detail)
|
|
rows = read_joint_rows(joint_detail_path)
|
|
limits_info = load_effective_limits(args.limit_csv)
|
|
rows, trim_note = trim_rows_to_limit_count(rows, len(limits_info.joints))
|
|
unit = infer_unit(rows, args.unit)
|
|
|
|
velocity_peaks = calculate_velocity_peaks(rows, unit, limits_info.joints)
|
|
acceleration_peaks = calculate_acceleration_peaks(rows, unit, limits_info.joints)
|
|
jerk_peaks = calculate_jerk_peaks(rows, unit, limits_info.joints)
|
|
|
|
max_abs_joint = max(abs(value) for row in rows for value in row[1:])
|
|
|
|
if args.limit_csv is None:
|
|
limit_source_text = "limit_source=built-in fixed effective limits (acc_limit=1, jerk_limit=1)"
|
|
else:
|
|
limit_source_text = f"limit_source_csv={resolve_path(args.limit_csv)}"
|
|
|
|
report_text = build_report_text(
|
|
joint_detail_path=joint_detail_path,
|
|
rows=rows,
|
|
unit=unit,
|
|
max_abs_joint=max_abs_joint,
|
|
limit_source_text=limit_source_text,
|
|
trim_note=trim_note,
|
|
velocity_peaks=velocity_peaks,
|
|
acceleration_peaks=acceleration_peaks,
|
|
jerk_peaks=jerk_peaks,
|
|
)
|
|
|
|
print(report_text, end="")
|
|
|
|
output_path = joint_detail_path.with_suffix(".analysis.txt")
|
|
try:
|
|
output_path.write_text(report_text, encoding="utf-8")
|
|
print(f"saved_report={output_path}")
|
|
except PermissionError as error:
|
|
print(f"save_report_failed={output_path}")
|
|
print(f"save_report_error={error}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|