- 添加工作缓冲区以支持带参数的格式化操作
- 重构挂载逻辑,在挂载成功后验证文件系统类型和簇大小是否符合目标格式(FAT32,簇大小32KB)
- 将格式化条件判断与执行分离,提高代码可读性
- 使用宏定义目标簇大小和扇区大小参数,便于维护
🔧 chore(scripts): 重命名脚本文件并添加压缩包
- 将`atem_parse.py`重命名为`ATEMParse.py`以遵循命名规范
- 添加`ATEMParse.7z`压缩包文件
686 lines
27 KiB
Python
686 lines
27 KiB
Python
import sys
|
||
import time
|
||
import numpy as np
|
||
import pandas as pd
|
||
import serial
|
||
import serial.tools.list_ports
|
||
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
|
||
QHBoxLayout, QPushButton, QFileDialog, QTableView,
|
||
QLabel, QRadioButton, QButtonGroup, QTabWidget,
|
||
QMessageBox, QHeaderView, QCheckBox, QComboBox, QSplitter)
|
||
from PyQt6.QtCore import Qt, QAbstractTableModel, QThread, pyqtSignal, QTimer
|
||
import pyqtgraph as pg
|
||
|
||
# ==========================================
|
||
# 1. 数据结构定义 (移除了 timestamp)
|
||
# ==========================================
|
||
RAW_DTYPE_V1 = np.dtype([
|
||
('start_byte', '<u4'),
|
||
('adc1', '<i4'), ('adc2', '<i4'), ('adc3', '<i4'),
|
||
('checksum', '<u2'), ('end_byte', '<u2')
|
||
])
|
||
CORRECTED_DTYPE_V1 = np.dtype([
|
||
('start_byte', '<u4'),
|
||
('corr_x', '<f4'), ('corr_y', '<f4'), ('corr_z', '<f4'),
|
||
('checksum', '<u2'), ('end_byte', '<u2')
|
||
])
|
||
|
||
# ⚠️ 注意: 移除了 timestamp,保留了 gps_altitude
|
||
RAW_DTYPE_V2 = np.dtype([
|
||
('start_byte', '<u4'),
|
||
('adc1', '<i4'), ('adc2', '<i4'), ('adc3', '<i4'),
|
||
('gps_time', '<u4'), ('gps_latitude', '<f4'), ('gps_longitude', '<f4'),
|
||
('gps_altitude', '<f4')
|
||
])
|
||
CORRECTED_DTYPE_V2 = np.dtype([
|
||
('start_byte', '<u4'),
|
||
('corr_x', '<f4'), ('corr_y', '<f4'), ('corr_z', '<f4'),
|
||
('gps_time', '<u4'), ('gps_latitude', '<f4'), ('gps_longitude', '<f4'),
|
||
('gps_altitude', '<f4')
|
||
])
|
||
|
||
# ==========================================
|
||
# 2. 高性能表格模型
|
||
# ==========================================
|
||
class BigDataModel(QAbstractTableModel):
|
||
def __init__(self, data):
|
||
super(BigDataModel, self).__init__()
|
||
self._data = data
|
||
|
||
def rowCount(self, parent=None):
|
||
return self._data.shape[0]
|
||
|
||
def columnCount(self, parent=None):
|
||
return self._data.shape[1]
|
||
|
||
def data(self, index, role=Qt.ItemDataRole.DisplayRole):
|
||
if index.isValid() and role == Qt.ItemDataRole.DisplayRole:
|
||
val = self._data.iloc[index.row(), index.column()]
|
||
if isinstance(val, (float, np.floating)):
|
||
return f"{val:.6f}"
|
||
return str(val)
|
||
return None
|
||
|
||
def headerData(self, col, orientation, role):
|
||
if orientation == Qt.Orientation.Horizontal and role == Qt.ItemDataRole.DisplayRole:
|
||
return self._data.columns[col]
|
||
return None
|
||
|
||
# ==========================================
|
||
# 3. 后台线程:串口数据接收 (带异常安全关闭)
|
||
# ==========================================
|
||
class SerialReaderThread(QThread):
|
||
data_received = pyqtSignal(np.ndarray)
|
||
error_occurred = pyqtSignal(str)
|
||
|
||
def __init__(self, port, baudrate, dtype):
|
||
super().__init__()
|
||
self.port = port
|
||
self.baudrate = baudrate
|
||
self.dtype = dtype
|
||
self.is_running = False
|
||
self.serial_port = None
|
||
|
||
def run(self):
|
||
self.is_running = True
|
||
buffer = bytearray()
|
||
packet_size = self.dtype.itemsize
|
||
start_marker = b'\xff\xff\xff\xff'
|
||
|
||
try:
|
||
self.serial_port = serial.Serial(self.port, self.baudrate, timeout=1)
|
||
|
||
while self.is_running:
|
||
if self.serial_port and self.serial_port.in_waiting > 0:
|
||
data = self.serial_port.read(self.serial_port.in_waiting)
|
||
buffer.extend(data)
|
||
|
||
packets = []
|
||
while True:
|
||
idx = buffer.find(start_marker)
|
||
if idx == -1:
|
||
buffer = buffer[-3:] if len(buffer) >= 3 else buffer
|
||
break
|
||
|
||
if len(buffer) >= idx + packet_size:
|
||
packet_bytes = buffer[idx : idx + packet_size]
|
||
packets.append(packet_bytes)
|
||
buffer = buffer[idx + packet_size :]
|
||
else:
|
||
buffer = buffer[idx:]
|
||
break
|
||
|
||
if packets:
|
||
combined_bytes = b''.join(packets)
|
||
parsed_arr = np.frombuffer(combined_bytes, dtype=self.dtype)
|
||
self.data_received.emit(parsed_arr)
|
||
|
||
except Exception as e:
|
||
if self.is_running:
|
||
self.error_occurred.emit(str(e))
|
||
finally:
|
||
self.stop()
|
||
|
||
def stop(self):
|
||
self.is_running = False
|
||
if self.serial_port:
|
||
try:
|
||
self.serial_port.cancel_read()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
if self.serial_port.is_open:
|
||
self.serial_port.close()
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
self.serial_port = None
|
||
|
||
# ==========================================
|
||
# 4. 后台线程:GPS 动态模拟输出 (10Hz)
|
||
# ==========================================
|
||
class GPSSimulatorThread(QThread):
|
||
error_occurred = pyqtSignal(str)
|
||
|
||
def __init__(self, port, baudrate):
|
||
super().__init__()
|
||
self.port = port
|
||
self.baudrate = baudrate
|
||
self.is_running = False
|
||
self.serial_port = None
|
||
|
||
def get_nmea_checksum(self, sentence):
|
||
calc_cksum = 0
|
||
for char in sentence:
|
||
calc_cksum ^= ord(char)
|
||
return f"{calc_cksum:02X}"
|
||
|
||
def run(self):
|
||
self.is_running = True
|
||
try:
|
||
self.serial_port = serial.Serial(self.port, self.baudrate, timeout=1)
|
||
|
||
current_lat = 39.9042
|
||
current_lon = 116.3972
|
||
current_alt = 43.0
|
||
|
||
lat_step = 0.00001
|
||
lon_step = 0.00001
|
||
alt_step = 0.05
|
||
|
||
while self.is_running:
|
||
start_time = time.perf_counter()
|
||
|
||
now = time.time()
|
||
gm_time = time.gmtime(now)
|
||
ms = int((now % 1) * 1000)
|
||
time_str = time.strftime("%H%M%S", gm_time) + f".{ms:03d}"
|
||
date_str = time.strftime("%d%m%y", gm_time)
|
||
|
||
current_lat += lat_step
|
||
current_lon += lon_step
|
||
current_alt += alt_step
|
||
|
||
lat_deg = int(abs(current_lat))
|
||
lat_min = (abs(current_lat) - lat_deg) * 60
|
||
lat_str = f"{lat_deg:02d}{lat_min:07.4f}"
|
||
lat_dir = "N" if current_lat >= 0 else "S"
|
||
|
||
lon_deg = int(abs(current_lon))
|
||
lon_min = (abs(current_lon) - lon_deg) * 60
|
||
lon_str = f"{lon_deg:03d}{lon_min:07.4f}"
|
||
lon_dir = "E" if current_lon >= 0 else "W"
|
||
|
||
speed_knots = "19.4"
|
||
course_true = "45.0"
|
||
|
||
gga_core = f"GPGGA,{time_str},{lat_str},{lat_dir},{lon_str},{lon_dir},1,08,1.0,{current_alt:.1f},M,0.0,M,,"
|
||
rmc_core = f"GPRMC,{time_str},A,{lat_str},{lat_dir},{lon_str},{lon_dir},{speed_knots},{course_true},{date_str},0.0,E"
|
||
|
||
gga_sentence = f"${gga_core}*{self.get_nmea_checksum(gga_core)}\r\n"
|
||
rmc_sentence = f"${rmc_core}*{self.get_nmea_checksum(rmc_core)}\r\n"
|
||
|
||
if self.serial_port and self.serial_port.is_open:
|
||
self.serial_port.write(gga_sentence.encode('ascii'))
|
||
self.serial_port.write(rmc_sentence.encode('ascii'))
|
||
|
||
elapsed = time.perf_counter() - start_time
|
||
sleep_time = 0.1 - elapsed
|
||
if sleep_time > 0:
|
||
time.sleep(sleep_time)
|
||
|
||
except Exception as e:
|
||
if self.is_running:
|
||
self.error_occurred.emit(str(e))
|
||
finally:
|
||
self.stop()
|
||
|
||
def stop(self):
|
||
self.is_running = False
|
||
if self.serial_port:
|
||
try:
|
||
self.serial_port.cancel_write()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
if self.serial_port.is_open:
|
||
self.serial_port.close()
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
self.serial_port = None
|
||
|
||
# ==========================================
|
||
# 5. 主窗口
|
||
# ==========================================
|
||
class DataAnalyzerUI(QMainWindow):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.setWindowTitle("🚀 高性能数据分析工具 (含经纬度地图与海拔曲线)")
|
||
self.resize(1300, 900)
|
||
|
||
self.df = pd.DataFrame()
|
||
self.live_data_list = []
|
||
self.is_live_mode = False
|
||
self.curves = []
|
||
|
||
self.last_fps_time = 0
|
||
self.frame_count = 0
|
||
self.last_packet_count = 0
|
||
self.current_packet_count = 0
|
||
|
||
self.is_sim_running = False
|
||
self.sim_thread = None
|
||
|
||
pg.setConfigOptions(antialias=False)
|
||
self.init_ui()
|
||
|
||
self.plot_timer = QTimer()
|
||
self.plot_timer.timeout.connect(self.update_live_plot)
|
||
|
||
def init_ui(self):
|
||
main_widget = QWidget()
|
||
self.setCentralWidget(main_widget)
|
||
layout = QVBoxLayout(main_widget)
|
||
|
||
# --- 第一排工具栏 ---
|
||
top_bar1 = QHBoxLayout()
|
||
self.ver_group = QButtonGroup(self)
|
||
self.rb_v2 = QRadioButton("V2 (带GPS与海拔)")
|
||
self.rb_v2.setChecked(True)
|
||
self.ver_group.addButton(self.rb_v2)
|
||
|
||
self.mode_group = QButtonGroup(self)
|
||
self.rb_raw = QRadioButton("原始数据 (Raw)")
|
||
self.rb_corr = QRadioButton("校正数据 (Corr)")
|
||
self.rb_raw.setChecked(True)
|
||
self.mode_group.addButton(self.rb_raw)
|
||
self.mode_group.addButton(self.rb_corr)
|
||
|
||
btn_load = QPushButton("📂 打开文件")
|
||
btn_load.clicked.connect(self.load_file)
|
||
btn_export = QPushButton("💾 导出CSV")
|
||
btn_export.clicked.connect(self.export_csv)
|
||
|
||
self.chk_mouse_mode = QCheckBox("🔍 鼠标框选放大")
|
||
self.chk_mouse_mode.stateChanged.connect(self.toggle_mouse_mode)
|
||
|
||
btn_autoscale = QPushButton("⟲ 复位视图")
|
||
btn_autoscale.clicked.connect(self.reset_view)
|
||
|
||
top_bar1.addWidget(QLabel("版本:"))
|
||
top_bar1.addWidget(self.rb_v2)
|
||
top_bar1.addSpacing(15)
|
||
top_bar1.addWidget(QLabel("模式:"))
|
||
top_bar1.addWidget(self.rb_raw)
|
||
top_bar1.addWidget(self.rb_corr)
|
||
top_bar1.addSpacing(15)
|
||
top_bar1.addWidget(btn_load)
|
||
top_bar1.addWidget(btn_export)
|
||
top_bar1.addStretch()
|
||
top_bar1.addWidget(self.chk_mouse_mode)
|
||
top_bar1.addWidget(btn_autoscale)
|
||
|
||
# --- 第二排工具栏 ---
|
||
top_bar2 = QHBoxLayout()
|
||
self.cb_ports = QComboBox()
|
||
self.cb_baudrate = QComboBox()
|
||
self.cb_baudrate.addItems(["9600", "115200", "230400", "460800", "921600", "2000000"])
|
||
self.cb_baudrate.setCurrentText("115200")
|
||
btn_refresh_ports = QPushButton("🔄 刷新端口")
|
||
btn_refresh_ports.clicked.connect(self.refresh_ports)
|
||
self.btn_toggle_serial = QPushButton("▶ 打开接收串口")
|
||
self.btn_toggle_serial.setStyleSheet("background-color: #4CAF50; color: white; font-weight: bold;")
|
||
self.btn_toggle_serial.clicked.connect(self.toggle_serial)
|
||
|
||
top_bar2.addWidget(QLabel("🔌 接收串口:"))
|
||
top_bar2.addWidget(self.cb_ports)
|
||
top_bar2.addWidget(btn_refresh_ports)
|
||
top_bar2.addWidget(QLabel("波特率:"))
|
||
top_bar2.addWidget(self.cb_baudrate)
|
||
top_bar2.addWidget(self.btn_toggle_serial)
|
||
top_bar2.addStretch()
|
||
|
||
# --- 第三排工具栏 (GPS 模拟器) ---
|
||
top_bar3 = QHBoxLayout()
|
||
self.cb_sim_ports = QComboBox()
|
||
self.cb_sim_baudrate = QComboBox()
|
||
self.cb_sim_baudrate.addItems(["9600", "115200", "230400", "460800", "921600"])
|
||
self.cb_sim_baudrate.setCurrentText("115200")
|
||
self.btn_toggle_sim = QPushButton("🛰 开启GPS动态模拟 (10Hz)")
|
||
self.btn_toggle_sim.setStyleSheet("background-color: #FF9800; color: white; font-weight: bold;")
|
||
self.btn_toggle_sim.clicked.connect(self.toggle_gps_sim)
|
||
|
||
top_bar3.addWidget(QLabel("📡 输出串口:"))
|
||
top_bar3.addWidget(self.cb_sim_ports)
|
||
top_bar3.addWidget(QLabel("波特率:"))
|
||
top_bar3.addWidget(self.cb_sim_baudrate)
|
||
top_bar3.addWidget(self.btn_toggle_sim)
|
||
top_bar3.addStretch()
|
||
|
||
layout.addLayout(top_bar1)
|
||
layout.addLayout(top_bar2)
|
||
layout.addLayout(top_bar3)
|
||
self.refresh_ports()
|
||
|
||
# --- 监控栏 ---
|
||
info_layout = QHBoxLayout()
|
||
self.lbl_info = QLabel("请加载文件或打开串口接收数据...")
|
||
self.lbl_info.setStyleSheet("color: blue; font-weight: bold;")
|
||
self.lbl_fps = QLabel("📈 绘图帧率: -- FPS | 📥 接收率: -- 包/秒")
|
||
self.lbl_fps.setStyleSheet("color: #E91E63; font-weight: bold;")
|
||
info_layout.addWidget(self.lbl_info)
|
||
info_layout.addStretch()
|
||
info_layout.addWidget(self.lbl_fps)
|
||
layout.addLayout(info_layout)
|
||
|
||
# ==========================================
|
||
# 多标签内容区域设置
|
||
# ==========================================
|
||
self.tabs = QTabWidget()
|
||
layout.addWidget(self.tabs)
|
||
|
||
# [Tab 1] 主波形图
|
||
self.plot_widget = pg.PlotWidget()
|
||
self.plot_widget.setBackground('w')
|
||
self.plot_widget.showGrid(x=True, y=True, alpha=0.3)
|
||
self.plot_widget.addLegend()
|
||
self.plot_widget.setLabel('bottom', 'Data Points (Index)') # 更新了标签
|
||
self.plot_widget.setLabel('left', 'Value')
|
||
self.vb = self.plot_widget.plotItem.vb
|
||
self.tabs.addTab(self.plot_widget, "📈 波形图 (Plot)")
|
||
|
||
# [Tab 2] GPS 轨迹与海拔视图
|
||
traj_container = QWidget()
|
||
traj_layout = QVBoxLayout(traj_container)
|
||
traj_splitter = QSplitter(Qt.Orientation.Vertical)
|
||
|
||
self.traj_plot = pg.PlotWidget(title="🗺️ 实时轨迹 (经度 vs 纬度)")
|
||
self.traj_plot.setBackground('w')
|
||
self.traj_plot.showGrid(x=True, y=True, alpha=0.5)
|
||
self.traj_plot.setLabel('bottom', 'Longitude (经度)')
|
||
self.traj_plot.setLabel('left', 'Latitude (纬度)')
|
||
self.traj_curve = self.traj_plot.plot(pen=pg.mkPen('b', width=2), symbol='o', symbolSize=3, symbolBrush='b')
|
||
|
||
self.alt_plot = pg.PlotWidget(title="⛰️ 海拔高度 (Altitude)")
|
||
self.alt_plot.setBackground('w')
|
||
self.alt_plot.showGrid(x=True, y=True, alpha=0.5)
|
||
self.alt_plot.setLabel('bottom', 'Data Points (Index)') # 更新了标签
|
||
self.alt_plot.setLabel('left', 'Altitude (m)')
|
||
self.alt_curve = self.alt_plot.plot(pen=pg.mkPen('g', width=2))
|
||
|
||
traj_splitter.addWidget(self.traj_plot)
|
||
traj_splitter.addWidget(self.alt_plot)
|
||
traj_layout.addWidget(traj_splitter)
|
||
self.tabs.addTab(traj_container, "🗺️ 轨迹与海拔 (Trajectory)")
|
||
|
||
# [Tab 3] 数据表格
|
||
self.table_view = QTableView()
|
||
self.table_view.setAlternatingRowColors(True)
|
||
self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Interactive)
|
||
self.tabs.addTab(self.table_view, "🔢 数据表 (Table)")
|
||
|
||
def get_current_dtype(self):
|
||
is_v2 = self.rb_v2.isChecked()
|
||
is_raw = self.rb_raw.isChecked()
|
||
if is_v2:
|
||
return RAW_DTYPE_V2 if is_raw else CORRECTED_DTYPE_V2
|
||
return RAW_DTYPE_V1 if is_raw else CORRECTED_DTYPE_V1
|
||
|
||
def get_column_config(self):
|
||
is_v2 = self.rb_v2.isChecked()
|
||
is_raw = self.rb_raw.isChecked()
|
||
|
||
# 这里移除了 timestamp
|
||
if is_raw:
|
||
base_cols = ['adc1', 'adc2', 'adc3']
|
||
labels = ['ADC 1', 'ADC 2', 'ADC 3']
|
||
data_cols = ['adc1', 'adc2', 'adc3']
|
||
else:
|
||
base_cols = ['corr_x', 'corr_y', 'corr_z']
|
||
labels = ['X Axis', 'Y Axis', 'Z Axis']
|
||
data_cols = ['corr_x', 'corr_y', 'corr_z']
|
||
|
||
cols = base_cols + ['gps_time', 'gps_latitude', 'gps_longitude', 'gps_altitude'] if is_v2 else base_cols + ['checksum']
|
||
return cols, data_cols, labels
|
||
|
||
def load_file(self):
|
||
if self.is_live_mode:
|
||
QMessageBox.warning(self, "警告", "请先关闭串口后再加载文件。")
|
||
return
|
||
|
||
file_name, _ = QFileDialog.getOpenFileName(self, "选择文件", "", "Data (*.dat);;All (*)")
|
||
if not file_name: return
|
||
|
||
try:
|
||
dtype = self.get_current_dtype()
|
||
raw_data = np.fromfile(file_name, dtype=dtype)
|
||
if len(raw_data) == 0: return
|
||
|
||
self.df = pd.DataFrame(raw_data)
|
||
cols, data_cols, labels = self.get_column_config()
|
||
|
||
self.lbl_info.setText(f"文件加载成功 | {len(self.df)} 行 | 版本: {'V2' if self.rb_v2.isChecked() else 'V1'}")
|
||
self.lbl_fps.setText("📈 绘图帧率: -- FPS | 📥 接收率: -- 包/秒")
|
||
self.refresh_table(cols)
|
||
self.plot_static_data(data_cols, labels)
|
||
|
||
except Exception as e:
|
||
QMessageBox.critical(self, "解析错误", str(e))
|
||
|
||
def refresh_table(self, cols):
|
||
display_df = self.df[cols] if not self.df.empty else pd.DataFrame(columns=cols)
|
||
self.model = BigDataModel(display_df)
|
||
self.table_view.setModel(self.model)
|
||
self.table_view.resizeColumnsToContents()
|
||
|
||
def plot_static_data(self, data_cols, labels):
|
||
# 1. 主波形图静态渲染
|
||
self.plot_widget.clear()
|
||
self.curves.clear()
|
||
colors = ['#FF0000', '#00AA00', '#0000FF']
|
||
|
||
# 移除了 timestamp,改为使用数据点索引
|
||
x_data = np.arange(len(self.df))
|
||
|
||
for i, col in enumerate(data_cols):
|
||
y_data = self.df[col].values
|
||
curve = pg.PlotCurveItem(x=x_data, y=y_data, pen=pg.mkPen(color=colors[i], width=1.5),
|
||
name=labels[i], skipFiniteCheck=True, autoDownsample=True, clipToView=True)
|
||
self.plot_widget.addItem(curve)
|
||
|
||
self.plot_widget.setLabel('bottom', 'Data Points (Index)')
|
||
self.reset_view()
|
||
|
||
# 2. 轨迹和海拔静态渲染
|
||
if self.rb_v2.isChecked() and 'gps_longitude' in self.df.columns:
|
||
lons = self.df['gps_longitude'].values
|
||
lats = self.df['gps_latitude'].values
|
||
alts = self.df['gps_altitude'].values if 'gps_altitude' in self.df.columns else np.zeros_like(lons)
|
||
|
||
valid_idx = (lons != 0.0) & (lats != 0.0)
|
||
if np.any(valid_idx):
|
||
self.traj_curve.setData(lons[valid_idx], lats[valid_idx])
|
||
else:
|
||
self.traj_curve.setData(lons, lats)
|
||
|
||
self.alt_curve.setData(alts)
|
||
self.traj_plot.autoRange()
|
||
self.alt_plot.autoRange()
|
||
else:
|
||
self.traj_curve.setData([], [])
|
||
self.alt_curve.setData([])
|
||
|
||
def refresh_ports(self):
|
||
self.cb_ports.clear()
|
||
self.cb_sim_ports.clear()
|
||
ports = serial.tools.list_ports.comports()
|
||
for p in ports:
|
||
port_name = f"{p.device} - {p.description}"
|
||
self.cb_ports.addItem(port_name, p.device)
|
||
self.cb_sim_ports.addItem(port_name, p.device)
|
||
|
||
def toggle_serial(self):
|
||
if not self.is_live_mode:
|
||
port = self.cb_ports.currentData()
|
||
if not port:
|
||
QMessageBox.warning(self, "提示", "未找到有效串口")
|
||
return
|
||
|
||
baud = int(self.cb_baudrate.currentText())
|
||
dtype = self.get_current_dtype()
|
||
|
||
self.df = pd.DataFrame()
|
||
self.live_data_list = []
|
||
|
||
self.plot_widget.clear()
|
||
self.curves.clear()
|
||
self.traj_curve.setData([], [])
|
||
self.alt_curve.setData([])
|
||
|
||
self.plot_widget.setLabel('bottom', 'Latest Points (Index)')
|
||
|
||
colors = ['#FF0000', '#00AA00', '#0000FF']
|
||
_, _, labels = self.get_column_config()
|
||
for i in range(len(labels)):
|
||
curve = pg.PlotCurveItem(pen=pg.mkPen(color=colors[i], width=1.5), name=labels[i])
|
||
self.plot_widget.addItem(curve)
|
||
self.curves.append(curve)
|
||
|
||
self.current_packet_count = 0
|
||
self.last_packet_count = 0
|
||
self.frame_count = 0
|
||
self.last_fps_time = time.perf_counter()
|
||
self.lbl_fps.setText("📈 绘图帧率: 计算中... | 📥 接收率: 计算中...")
|
||
|
||
self.serial_thread = SerialReaderThread(port, baud, dtype)
|
||
self.serial_thread.data_received.connect(self.on_live_data_received)
|
||
self.serial_thread.error_occurred.connect(self.on_serial_error)
|
||
self.serial_thread.start()
|
||
|
||
self.plot_timer.start(50)
|
||
|
||
self.is_live_mode = True
|
||
self.btn_toggle_serial.setText("⏹ 关闭串口停止接收")
|
||
self.btn_toggle_serial.setStyleSheet("background-color: #f44336; color: white; font-weight: bold;")
|
||
|
||
self.rb_v2.setEnabled(False)
|
||
self.rb_raw.setEnabled(False)
|
||
self.rb_corr.setEnabled(False)
|
||
|
||
else:
|
||
self.serial_thread.stop()
|
||
self.serial_thread.wait()
|
||
self.plot_timer.stop()
|
||
|
||
self.is_live_mode = False
|
||
self.btn_toggle_serial.setText("▶ 打开接收串口")
|
||
self.btn_toggle_serial.setStyleSheet("background-color: #4CAF50; color: white; font-weight: bold;")
|
||
self.lbl_fps.setText("📈 绘图帧率: -- FPS | 📥 接收率: -- 包/秒")
|
||
|
||
self.rb_v2.setEnabled(True)
|
||
self.rb_raw.setEnabled(True)
|
||
self.rb_corr.setEnabled(True)
|
||
|
||
if self.live_data_list:
|
||
full_array = np.concatenate(self.live_data_list)
|
||
self.df = pd.DataFrame(full_array)
|
||
cols, _, _ = self.get_column_config()
|
||
self.refresh_table(cols)
|
||
self.lbl_info.setText(f"串口采集完毕。总计收集 {len(self.df)} 行数据。")
|
||
|
||
def on_live_data_received(self, data_array):
|
||
self.live_data_list.append(data_array)
|
||
self.current_packet_count += len(data_array)
|
||
self.lbl_info.setText(f"🟢 正在接收数据... 已接收: {self.current_packet_count} 帧")
|
||
|
||
def update_live_plot(self):
|
||
self.frame_count += 1
|
||
current_time = time.perf_counter()
|
||
elapsed = current_time - self.last_fps_time
|
||
|
||
if elapsed >= 1.0:
|
||
fps = self.frame_count / elapsed
|
||
pps = (self.current_packet_count - self.last_packet_count) / elapsed
|
||
self.lbl_fps.setText(f"📈 绘图帧率: {fps:.1f} FPS | 📥 接收率: {pps:.0f} 包/秒")
|
||
|
||
self.last_fps_time = current_time
|
||
self.frame_count = 0
|
||
self.last_packet_count = self.current_packet_count
|
||
|
||
if not self.live_data_list:
|
||
return
|
||
|
||
MAX_POINTS = 5000
|
||
recent_chunks = []
|
||
point_count = 0
|
||
for arr in reversed(self.live_data_list):
|
||
recent_chunks.append(arr)
|
||
point_count += len(arr)
|
||
if point_count >= MAX_POINTS:
|
||
break
|
||
|
||
recent_data = np.concatenate(recent_chunks[::-1])
|
||
if len(recent_data) > MAX_POINTS:
|
||
recent_data = recent_data[-MAX_POINTS:]
|
||
|
||
# 1. 更新主波形图 (自动以接收点索引作为 X 轴)
|
||
_, data_cols, _ = self.get_column_config()
|
||
for i, col in enumerate(data_cols):
|
||
y_data = recent_data[col]# / 429496729.0
|
||
self.curves[i].setData(y_data)
|
||
|
||
# 2. 更新轨迹和海拔
|
||
if self.rb_v2.isChecked() and 'gps_longitude' in recent_data.dtype.names:
|
||
lons = recent_data['gps_longitude']
|
||
lats = recent_data['gps_latitude']
|
||
alts = recent_data['gps_altitude'] if 'gps_altitude' in recent_data.dtype.names else np.zeros_like(lons)
|
||
|
||
valid_idx = (lons != 0.0) & (lats != 0.0)
|
||
if np.any(valid_idx):
|
||
self.traj_curve.setData(lons[valid_idx], lats[valid_idx])
|
||
else:
|
||
self.traj_curve.setData(lons, lats)
|
||
|
||
self.alt_curve.setData(alts)
|
||
|
||
def on_serial_error(self, err_msg):
|
||
self.toggle_serial()
|
||
QMessageBox.critical(self, "接收串口错误", f"发生错误:\n{err_msg}")
|
||
|
||
def toggle_gps_sim(self):
|
||
if not self.is_sim_running:
|
||
port = self.cb_sim_ports.currentData()
|
||
if not port:
|
||
QMessageBox.warning(self, "提示", "未找到有效的输出串口")
|
||
return
|
||
|
||
if self.is_live_mode and port == self.cb_ports.currentData():
|
||
reply = QMessageBox.question(self, "警告",
|
||
"模拟输出端口与当前接收端口相同,可能会导致端口冲突。确定要继续吗?",
|
||
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No)
|
||
if reply == QMessageBox.StandardButton.No: return
|
||
|
||
baud = int(self.cb_sim_baudrate.currentText())
|
||
self.sim_thread = GPSSimulatorThread(port, baud)
|
||
self.sim_thread.error_occurred.connect(self.on_sim_error)
|
||
self.sim_thread.start()
|
||
|
||
self.is_sim_running = True
|
||
self.btn_toggle_sim.setText("⏹ 关闭GPS动态模拟")
|
||
self.btn_toggle_sim.setStyleSheet("background-color: #f44336; color: white; font-weight: bold;")
|
||
else:
|
||
self.sim_thread.stop()
|
||
self.sim_thread.wait()
|
||
|
||
self.is_sim_running = False
|
||
self.btn_toggle_sim.setText("🛰 开启GPS动态模拟 (10Hz)")
|
||
self.btn_toggle_sim.setStyleSheet("background-color: #FF9800; color: white; font-weight: bold;")
|
||
|
||
def on_sim_error(self, err_msg):
|
||
self.toggle_gps_sim()
|
||
QMessageBox.critical(self, "输出串口错误", f"模拟器串口发生错误:\n{err_msg}")
|
||
|
||
def toggle_mouse_mode(self, state):
|
||
mode = self.vb.RectMode if state == 2 else self.vb.PanMode
|
||
self.vb.setMouseMode(mode)
|
||
self.traj_plot.plotItem.vb.setMouseMode(mode)
|
||
self.alt_plot.plotItem.vb.setMouseMode(mode)
|
||
|
||
def reset_view(self):
|
||
self.plot_widget.autoRange()
|
||
self.traj_plot.autoRange()
|
||
self.alt_plot.autoRange()
|
||
|
||
def export_csv(self):
|
||
if self.df.empty: return
|
||
path, _ = QFileDialog.getSaveFileName(self, "保存", "export.csv", "CSV (*.csv)")
|
||
if path:
|
||
self.df.to_csv(path, index=False)
|
||
QMessageBox.information(self, "完成", "导出成功")
|
||
|
||
if __name__ == "__main__":
|
||
app = QApplication(sys.argv)
|
||
w = DataAnalyzerUI()
|
||
w.show()
|
||
sys.exit(app.exec()) |