STM_ATEM/Scripts/ATEMParse.py
zhoujie 4f8feccc06 feat(filesystem): 增强SD卡文件系统挂载与格式化逻辑
- 添加工作缓冲区以支持带参数的格式化操作
- 重构挂载逻辑,在挂载成功后验证文件系统类型和簇大小是否符合目标格式(FAT32,簇大小32KB)
- 将格式化条件判断与执行分离,提高代码可读性
- 使用宏定义目标簇大小和扇区大小参数,便于维护

🔧 chore(scripts): 重命名脚本文件并添加压缩包

- 将`atem_parse.py`重命名为`ATEMParse.py`以遵循命名规范
- 添加`ATEMParse.7z`压缩包文件
2026-03-07 13:54:52 +08:00

686 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import time
import numpy as np
import pandas as pd
import serial
import serial.tools.list_ports
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QPushButton, QFileDialog, QTableView,
QLabel, QRadioButton, QButtonGroup, QTabWidget,
QMessageBox, QHeaderView, QCheckBox, QComboBox, QSplitter)
from PyQt6.QtCore import Qt, QAbstractTableModel, QThread, pyqtSignal, QTimer
import pyqtgraph as pg
# ==========================================
# 1. 数据结构定义 (移除了 timestamp)
# ==========================================
RAW_DTYPE_V1 = np.dtype([
('start_byte', '<u4'),
('adc1', '<i4'), ('adc2', '<i4'), ('adc3', '<i4'),
('checksum', '<u2'), ('end_byte', '<u2')
])
CORRECTED_DTYPE_V1 = np.dtype([
('start_byte', '<u4'),
('corr_x', '<f4'), ('corr_y', '<f4'), ('corr_z', '<f4'),
('checksum', '<u2'), ('end_byte', '<u2')
])
# ⚠️ 注意: 移除了 timestamp保留了 gps_altitude
RAW_DTYPE_V2 = np.dtype([
('start_byte', '<u4'),
('adc1', '<i4'), ('adc2', '<i4'), ('adc3', '<i4'),
('gps_time', '<u4'), ('gps_latitude', '<f4'), ('gps_longitude', '<f4'),
('gps_altitude', '<f4')
])
CORRECTED_DTYPE_V2 = np.dtype([
('start_byte', '<u4'),
('corr_x', '<f4'), ('corr_y', '<f4'), ('corr_z', '<f4'),
('gps_time', '<u4'), ('gps_latitude', '<f4'), ('gps_longitude', '<f4'),
('gps_altitude', '<f4')
])
# ==========================================
# 2. 高性能表格模型
# ==========================================
class BigDataModel(QAbstractTableModel):
def __init__(self, data):
super(BigDataModel, self).__init__()
self._data = data
def rowCount(self, parent=None):
return self._data.shape[0]
def columnCount(self, parent=None):
return self._data.shape[1]
def data(self, index, role=Qt.ItemDataRole.DisplayRole):
if index.isValid() and role == Qt.ItemDataRole.DisplayRole:
val = self._data.iloc[index.row(), index.column()]
if isinstance(val, (float, np.floating)):
return f"{val:.6f}"
return str(val)
return None
def headerData(self, col, orientation, role):
if orientation == Qt.Orientation.Horizontal and role == Qt.ItemDataRole.DisplayRole:
return self._data.columns[col]
return None
# ==========================================
# 3. 后台线程:串口数据接收 (带异常安全关闭)
# ==========================================
class SerialReaderThread(QThread):
data_received = pyqtSignal(np.ndarray)
error_occurred = pyqtSignal(str)
def __init__(self, port, baudrate, dtype):
super().__init__()
self.port = port
self.baudrate = baudrate
self.dtype = dtype
self.is_running = False
self.serial_port = None
def run(self):
self.is_running = True
buffer = bytearray()
packet_size = self.dtype.itemsize
start_marker = b'\xff\xff\xff\xff'
try:
self.serial_port = serial.Serial(self.port, self.baudrate, timeout=1)
while self.is_running:
if self.serial_port and self.serial_port.in_waiting > 0:
data = self.serial_port.read(self.serial_port.in_waiting)
buffer.extend(data)
packets = []
while True:
idx = buffer.find(start_marker)
if idx == -1:
buffer = buffer[-3:] if len(buffer) >= 3 else buffer
break
if len(buffer) >= idx + packet_size:
packet_bytes = buffer[idx : idx + packet_size]
packets.append(packet_bytes)
buffer = buffer[idx + packet_size :]
else:
buffer = buffer[idx:]
break
if packets:
combined_bytes = b''.join(packets)
parsed_arr = np.frombuffer(combined_bytes, dtype=self.dtype)
self.data_received.emit(parsed_arr)
except Exception as e:
if self.is_running:
self.error_occurred.emit(str(e))
finally:
self.stop()
def stop(self):
self.is_running = False
if self.serial_port:
try:
self.serial_port.cancel_read()
except Exception:
pass
try:
if self.serial_port.is_open:
self.serial_port.close()
except Exception:
pass
finally:
self.serial_port = None
# ==========================================
# 4. 后台线程GPS 动态模拟输出 (10Hz)
# ==========================================
class GPSSimulatorThread(QThread):
error_occurred = pyqtSignal(str)
def __init__(self, port, baudrate):
super().__init__()
self.port = port
self.baudrate = baudrate
self.is_running = False
self.serial_port = None
def get_nmea_checksum(self, sentence):
calc_cksum = 0
for char in sentence:
calc_cksum ^= ord(char)
return f"{calc_cksum:02X}"
def run(self):
self.is_running = True
try:
self.serial_port = serial.Serial(self.port, self.baudrate, timeout=1)
current_lat = 39.9042
current_lon = 116.3972
current_alt = 43.0
lat_step = 0.00001
lon_step = 0.00001
alt_step = 0.05
while self.is_running:
start_time = time.perf_counter()
now = time.time()
gm_time = time.gmtime(now)
ms = int((now % 1) * 1000)
time_str = time.strftime("%H%M%S", gm_time) + f".{ms:03d}"
date_str = time.strftime("%d%m%y", gm_time)
current_lat += lat_step
current_lon += lon_step
current_alt += alt_step
lat_deg = int(abs(current_lat))
lat_min = (abs(current_lat) - lat_deg) * 60
lat_str = f"{lat_deg:02d}{lat_min:07.4f}"
lat_dir = "N" if current_lat >= 0 else "S"
lon_deg = int(abs(current_lon))
lon_min = (abs(current_lon) - lon_deg) * 60
lon_str = f"{lon_deg:03d}{lon_min:07.4f}"
lon_dir = "E" if current_lon >= 0 else "W"
speed_knots = "19.4"
course_true = "45.0"
gga_core = f"GPGGA,{time_str},{lat_str},{lat_dir},{lon_str},{lon_dir},1,08,1.0,{current_alt:.1f},M,0.0,M,,"
rmc_core = f"GPRMC,{time_str},A,{lat_str},{lat_dir},{lon_str},{lon_dir},{speed_knots},{course_true},{date_str},0.0,E"
gga_sentence = f"${gga_core}*{self.get_nmea_checksum(gga_core)}\r\n"
rmc_sentence = f"${rmc_core}*{self.get_nmea_checksum(rmc_core)}\r\n"
if self.serial_port and self.serial_port.is_open:
self.serial_port.write(gga_sentence.encode('ascii'))
self.serial_port.write(rmc_sentence.encode('ascii'))
elapsed = time.perf_counter() - start_time
sleep_time = 0.1 - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
except Exception as e:
if self.is_running:
self.error_occurred.emit(str(e))
finally:
self.stop()
def stop(self):
self.is_running = False
if self.serial_port:
try:
self.serial_port.cancel_write()
except Exception:
pass
try:
if self.serial_port.is_open:
self.serial_port.close()
except Exception:
pass
finally:
self.serial_port = None
# ==========================================
# 5. 主窗口
# ==========================================
class DataAnalyzerUI(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("🚀 高性能数据分析工具 (含经纬度地图与海拔曲线)")
self.resize(1300, 900)
self.df = pd.DataFrame()
self.live_data_list = []
self.is_live_mode = False
self.curves = []
self.last_fps_time = 0
self.frame_count = 0
self.last_packet_count = 0
self.current_packet_count = 0
self.is_sim_running = False
self.sim_thread = None
pg.setConfigOptions(antialias=False)
self.init_ui()
self.plot_timer = QTimer()
self.plot_timer.timeout.connect(self.update_live_plot)
def init_ui(self):
main_widget = QWidget()
self.setCentralWidget(main_widget)
layout = QVBoxLayout(main_widget)
# --- 第一排工具栏 ---
top_bar1 = QHBoxLayout()
self.ver_group = QButtonGroup(self)
self.rb_v2 = QRadioButton("V2 (带GPS与海拔)")
self.rb_v2.setChecked(True)
self.ver_group.addButton(self.rb_v2)
self.mode_group = QButtonGroup(self)
self.rb_raw = QRadioButton("原始数据 (Raw)")
self.rb_corr = QRadioButton("校正数据 (Corr)")
self.rb_raw.setChecked(True)
self.mode_group.addButton(self.rb_raw)
self.mode_group.addButton(self.rb_corr)
btn_load = QPushButton("📂 打开文件")
btn_load.clicked.connect(self.load_file)
btn_export = QPushButton("💾 导出CSV")
btn_export.clicked.connect(self.export_csv)
self.chk_mouse_mode = QCheckBox("🔍 鼠标框选放大")
self.chk_mouse_mode.stateChanged.connect(self.toggle_mouse_mode)
btn_autoscale = QPushButton("⟲ 复位视图")
btn_autoscale.clicked.connect(self.reset_view)
top_bar1.addWidget(QLabel("版本:"))
top_bar1.addWidget(self.rb_v2)
top_bar1.addSpacing(15)
top_bar1.addWidget(QLabel("模式:"))
top_bar1.addWidget(self.rb_raw)
top_bar1.addWidget(self.rb_corr)
top_bar1.addSpacing(15)
top_bar1.addWidget(btn_load)
top_bar1.addWidget(btn_export)
top_bar1.addStretch()
top_bar1.addWidget(self.chk_mouse_mode)
top_bar1.addWidget(btn_autoscale)
# --- 第二排工具栏 ---
top_bar2 = QHBoxLayout()
self.cb_ports = QComboBox()
self.cb_baudrate = QComboBox()
self.cb_baudrate.addItems(["9600", "115200", "230400", "460800", "921600", "2000000"])
self.cb_baudrate.setCurrentText("115200")
btn_refresh_ports = QPushButton("🔄 刷新端口")
btn_refresh_ports.clicked.connect(self.refresh_ports)
self.btn_toggle_serial = QPushButton("▶ 打开接收串口")
self.btn_toggle_serial.setStyleSheet("background-color: #4CAF50; color: white; font-weight: bold;")
self.btn_toggle_serial.clicked.connect(self.toggle_serial)
top_bar2.addWidget(QLabel("🔌 接收串口:"))
top_bar2.addWidget(self.cb_ports)
top_bar2.addWidget(btn_refresh_ports)
top_bar2.addWidget(QLabel("波特率:"))
top_bar2.addWidget(self.cb_baudrate)
top_bar2.addWidget(self.btn_toggle_serial)
top_bar2.addStretch()
# --- 第三排工具栏 (GPS 模拟器) ---
top_bar3 = QHBoxLayout()
self.cb_sim_ports = QComboBox()
self.cb_sim_baudrate = QComboBox()
self.cb_sim_baudrate.addItems(["9600", "115200", "230400", "460800", "921600"])
self.cb_sim_baudrate.setCurrentText("115200")
self.btn_toggle_sim = QPushButton("🛰 开启GPS动态模拟 (10Hz)")
self.btn_toggle_sim.setStyleSheet("background-color: #FF9800; color: white; font-weight: bold;")
self.btn_toggle_sim.clicked.connect(self.toggle_gps_sim)
top_bar3.addWidget(QLabel("📡 输出串口:"))
top_bar3.addWidget(self.cb_sim_ports)
top_bar3.addWidget(QLabel("波特率:"))
top_bar3.addWidget(self.cb_sim_baudrate)
top_bar3.addWidget(self.btn_toggle_sim)
top_bar3.addStretch()
layout.addLayout(top_bar1)
layout.addLayout(top_bar2)
layout.addLayout(top_bar3)
self.refresh_ports()
# --- 监控栏 ---
info_layout = QHBoxLayout()
self.lbl_info = QLabel("请加载文件或打开串口接收数据...")
self.lbl_info.setStyleSheet("color: blue; font-weight: bold;")
self.lbl_fps = QLabel("📈 绘图帧率: -- FPS | 📥 接收率: -- 包/秒")
self.lbl_fps.setStyleSheet("color: #E91E63; font-weight: bold;")
info_layout.addWidget(self.lbl_info)
info_layout.addStretch()
info_layout.addWidget(self.lbl_fps)
layout.addLayout(info_layout)
# ==========================================
# 多标签内容区域设置
# ==========================================
self.tabs = QTabWidget()
layout.addWidget(self.tabs)
# [Tab 1] 主波形图
self.plot_widget = pg.PlotWidget()
self.plot_widget.setBackground('w')
self.plot_widget.showGrid(x=True, y=True, alpha=0.3)
self.plot_widget.addLegend()
self.plot_widget.setLabel('bottom', 'Data Points (Index)') # 更新了标签
self.plot_widget.setLabel('left', 'Value')
self.vb = self.plot_widget.plotItem.vb
self.tabs.addTab(self.plot_widget, "📈 波形图 (Plot)")
# [Tab 2] GPS 轨迹与海拔视图
traj_container = QWidget()
traj_layout = QVBoxLayout(traj_container)
traj_splitter = QSplitter(Qt.Orientation.Vertical)
self.traj_plot = pg.PlotWidget(title="🗺️ 实时轨迹 (经度 vs 纬度)")
self.traj_plot.setBackground('w')
self.traj_plot.showGrid(x=True, y=True, alpha=0.5)
self.traj_plot.setLabel('bottom', 'Longitude (经度)')
self.traj_plot.setLabel('left', 'Latitude (纬度)')
self.traj_curve = self.traj_plot.plot(pen=pg.mkPen('b', width=2), symbol='o', symbolSize=3, symbolBrush='b')
self.alt_plot = pg.PlotWidget(title="⛰️ 海拔高度 (Altitude)")
self.alt_plot.setBackground('w')
self.alt_plot.showGrid(x=True, y=True, alpha=0.5)
self.alt_plot.setLabel('bottom', 'Data Points (Index)') # 更新了标签
self.alt_plot.setLabel('left', 'Altitude (m)')
self.alt_curve = self.alt_plot.plot(pen=pg.mkPen('g', width=2))
traj_splitter.addWidget(self.traj_plot)
traj_splitter.addWidget(self.alt_plot)
traj_layout.addWidget(traj_splitter)
self.tabs.addTab(traj_container, "🗺️ 轨迹与海拔 (Trajectory)")
# [Tab 3] 数据表格
self.table_view = QTableView()
self.table_view.setAlternatingRowColors(True)
self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Interactive)
self.tabs.addTab(self.table_view, "🔢 数据表 (Table)")
def get_current_dtype(self):
is_v2 = self.rb_v2.isChecked()
is_raw = self.rb_raw.isChecked()
if is_v2:
return RAW_DTYPE_V2 if is_raw else CORRECTED_DTYPE_V2
return RAW_DTYPE_V1 if is_raw else CORRECTED_DTYPE_V1
def get_column_config(self):
is_v2 = self.rb_v2.isChecked()
is_raw = self.rb_raw.isChecked()
# 这里移除了 timestamp
if is_raw:
base_cols = ['adc1', 'adc2', 'adc3']
labels = ['ADC 1', 'ADC 2', 'ADC 3']
data_cols = ['adc1', 'adc2', 'adc3']
else:
base_cols = ['corr_x', 'corr_y', 'corr_z']
labels = ['X Axis', 'Y Axis', 'Z Axis']
data_cols = ['corr_x', 'corr_y', 'corr_z']
cols = base_cols + ['gps_time', 'gps_latitude', 'gps_longitude', 'gps_altitude'] if is_v2 else base_cols + ['checksum']
return cols, data_cols, labels
def load_file(self):
if self.is_live_mode:
QMessageBox.warning(self, "警告", "请先关闭串口后再加载文件。")
return
file_name, _ = QFileDialog.getOpenFileName(self, "选择文件", "", "Data (*.dat);;All (*)")
if not file_name: return
try:
dtype = self.get_current_dtype()
raw_data = np.fromfile(file_name, dtype=dtype)
if len(raw_data) == 0: return
self.df = pd.DataFrame(raw_data)
cols, data_cols, labels = self.get_column_config()
self.lbl_info.setText(f"文件加载成功 | {len(self.df)} 行 | 版本: {'V2' if self.rb_v2.isChecked() else 'V1'}")
self.lbl_fps.setText("📈 绘图帧率: -- FPS | 📥 接收率: -- 包/秒")
self.refresh_table(cols)
self.plot_static_data(data_cols, labels)
except Exception as e:
QMessageBox.critical(self, "解析错误", str(e))
def refresh_table(self, cols):
display_df = self.df[cols] if not self.df.empty else pd.DataFrame(columns=cols)
self.model = BigDataModel(display_df)
self.table_view.setModel(self.model)
self.table_view.resizeColumnsToContents()
def plot_static_data(self, data_cols, labels):
# 1. 主波形图静态渲染
self.plot_widget.clear()
self.curves.clear()
colors = ['#FF0000', '#00AA00', '#0000FF']
# 移除了 timestamp改为使用数据点索引
x_data = np.arange(len(self.df))
for i, col in enumerate(data_cols):
y_data = self.df[col].values
curve = pg.PlotCurveItem(x=x_data, y=y_data, pen=pg.mkPen(color=colors[i], width=1.5),
name=labels[i], skipFiniteCheck=True, autoDownsample=True, clipToView=True)
self.plot_widget.addItem(curve)
self.plot_widget.setLabel('bottom', 'Data Points (Index)')
self.reset_view()
# 2. 轨迹和海拔静态渲染
if self.rb_v2.isChecked() and 'gps_longitude' in self.df.columns:
lons = self.df['gps_longitude'].values
lats = self.df['gps_latitude'].values
alts = self.df['gps_altitude'].values if 'gps_altitude' in self.df.columns else np.zeros_like(lons)
valid_idx = (lons != 0.0) & (lats != 0.0)
if np.any(valid_idx):
self.traj_curve.setData(lons[valid_idx], lats[valid_idx])
else:
self.traj_curve.setData(lons, lats)
self.alt_curve.setData(alts)
self.traj_plot.autoRange()
self.alt_plot.autoRange()
else:
self.traj_curve.setData([], [])
self.alt_curve.setData([])
def refresh_ports(self):
self.cb_ports.clear()
self.cb_sim_ports.clear()
ports = serial.tools.list_ports.comports()
for p in ports:
port_name = f"{p.device} - {p.description}"
self.cb_ports.addItem(port_name, p.device)
self.cb_sim_ports.addItem(port_name, p.device)
def toggle_serial(self):
if not self.is_live_mode:
port = self.cb_ports.currentData()
if not port:
QMessageBox.warning(self, "提示", "未找到有效串口")
return
baud = int(self.cb_baudrate.currentText())
dtype = self.get_current_dtype()
self.df = pd.DataFrame()
self.live_data_list = []
self.plot_widget.clear()
self.curves.clear()
self.traj_curve.setData([], [])
self.alt_curve.setData([])
self.plot_widget.setLabel('bottom', 'Latest Points (Index)')
colors = ['#FF0000', '#00AA00', '#0000FF']
_, _, labels = self.get_column_config()
for i in range(len(labels)):
curve = pg.PlotCurveItem(pen=pg.mkPen(color=colors[i], width=1.5), name=labels[i])
self.plot_widget.addItem(curve)
self.curves.append(curve)
self.current_packet_count = 0
self.last_packet_count = 0
self.frame_count = 0
self.last_fps_time = time.perf_counter()
self.lbl_fps.setText("📈 绘图帧率: 计算中... | 📥 接收率: 计算中...")
self.serial_thread = SerialReaderThread(port, baud, dtype)
self.serial_thread.data_received.connect(self.on_live_data_received)
self.serial_thread.error_occurred.connect(self.on_serial_error)
self.serial_thread.start()
self.plot_timer.start(50)
self.is_live_mode = True
self.btn_toggle_serial.setText("⏹ 关闭串口停止接收")
self.btn_toggle_serial.setStyleSheet("background-color: #f44336; color: white; font-weight: bold;")
self.rb_v2.setEnabled(False)
self.rb_raw.setEnabled(False)
self.rb_corr.setEnabled(False)
else:
self.serial_thread.stop()
self.serial_thread.wait()
self.plot_timer.stop()
self.is_live_mode = False
self.btn_toggle_serial.setText("▶ 打开接收串口")
self.btn_toggle_serial.setStyleSheet("background-color: #4CAF50; color: white; font-weight: bold;")
self.lbl_fps.setText("📈 绘图帧率: -- FPS | 📥 接收率: -- 包/秒")
self.rb_v2.setEnabled(True)
self.rb_raw.setEnabled(True)
self.rb_corr.setEnabled(True)
if self.live_data_list:
full_array = np.concatenate(self.live_data_list)
self.df = pd.DataFrame(full_array)
cols, _, _ = self.get_column_config()
self.refresh_table(cols)
self.lbl_info.setText(f"串口采集完毕。总计收集 {len(self.df)} 行数据。")
def on_live_data_received(self, data_array):
self.live_data_list.append(data_array)
self.current_packet_count += len(data_array)
self.lbl_info.setText(f"🟢 正在接收数据... 已接收: {self.current_packet_count}")
def update_live_plot(self):
self.frame_count += 1
current_time = time.perf_counter()
elapsed = current_time - self.last_fps_time
if elapsed >= 1.0:
fps = self.frame_count / elapsed
pps = (self.current_packet_count - self.last_packet_count) / elapsed
self.lbl_fps.setText(f"📈 绘图帧率: {fps:.1f} FPS | 📥 接收率: {pps:.0f} 包/秒")
self.last_fps_time = current_time
self.frame_count = 0
self.last_packet_count = self.current_packet_count
if not self.live_data_list:
return
MAX_POINTS = 5000
recent_chunks = []
point_count = 0
for arr in reversed(self.live_data_list):
recent_chunks.append(arr)
point_count += len(arr)
if point_count >= MAX_POINTS:
break
recent_data = np.concatenate(recent_chunks[::-1])
if len(recent_data) > MAX_POINTS:
recent_data = recent_data[-MAX_POINTS:]
# 1. 更新主波形图 (自动以接收点索引作为 X 轴)
_, data_cols, _ = self.get_column_config()
for i, col in enumerate(data_cols):
y_data = recent_data[col]# / 429496729.0
self.curves[i].setData(y_data)
# 2. 更新轨迹和海拔
if self.rb_v2.isChecked() and 'gps_longitude' in recent_data.dtype.names:
lons = recent_data['gps_longitude']
lats = recent_data['gps_latitude']
alts = recent_data['gps_altitude'] if 'gps_altitude' in recent_data.dtype.names else np.zeros_like(lons)
valid_idx = (lons != 0.0) & (lats != 0.0)
if np.any(valid_idx):
self.traj_curve.setData(lons[valid_idx], lats[valid_idx])
else:
self.traj_curve.setData(lons, lats)
self.alt_curve.setData(alts)
def on_serial_error(self, err_msg):
self.toggle_serial()
QMessageBox.critical(self, "接收串口错误", f"发生错误:\n{err_msg}")
def toggle_gps_sim(self):
if not self.is_sim_running:
port = self.cb_sim_ports.currentData()
if not port:
QMessageBox.warning(self, "提示", "未找到有效的输出串口")
return
if self.is_live_mode and port == self.cb_ports.currentData():
reply = QMessageBox.question(self, "警告",
"模拟输出端口与当前接收端口相同,可能会导致端口冲突。确定要继续吗?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No)
if reply == QMessageBox.StandardButton.No: return
baud = int(self.cb_sim_baudrate.currentText())
self.sim_thread = GPSSimulatorThread(port, baud)
self.sim_thread.error_occurred.connect(self.on_sim_error)
self.sim_thread.start()
self.is_sim_running = True
self.btn_toggle_sim.setText("⏹ 关闭GPS动态模拟")
self.btn_toggle_sim.setStyleSheet("background-color: #f44336; color: white; font-weight: bold;")
else:
self.sim_thread.stop()
self.sim_thread.wait()
self.is_sim_running = False
self.btn_toggle_sim.setText("🛰 开启GPS动态模拟 (10Hz)")
self.btn_toggle_sim.setStyleSheet("background-color: #FF9800; color: white; font-weight: bold;")
def on_sim_error(self, err_msg):
self.toggle_gps_sim()
QMessageBox.critical(self, "输出串口错误", f"模拟器串口发生错误:\n{err_msg}")
def toggle_mouse_mode(self, state):
mode = self.vb.RectMode if state == 2 else self.vb.PanMode
self.vb.setMouseMode(mode)
self.traj_plot.plotItem.vb.setMouseMode(mode)
self.alt_plot.plotItem.vb.setMouseMode(mode)
def reset_view(self):
self.plot_widget.autoRange()
self.traj_plot.autoRange()
self.alt_plot.autoRange()
def export_csv(self):
if self.df.empty: return
path, _ = QFileDialog.getSaveFileName(self, "保存", "export.csv", "CSV (*.csv)")
if path:
self.df.to_csv(path, index=False)
QMessageBox.information(self, "完成", "导出成功")
if __name__ == "__main__":
app = QApplication(sys.argv)
w = DataAnalyzerUI()
w.show()
sys.exit(app.exec())