import sys import time import numpy as np import pandas as pd import serial import serial.tools.list_ports from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QFileDialog, QTableView, QLabel, QRadioButton, QButtonGroup, QTabWidget, QMessageBox, QHeaderView, QCheckBox, QComboBox, QSplitter) from PyQt6.QtCore import Qt, QAbstractTableModel, QThread, pyqtSignal, QTimer import pyqtgraph as pg # ========================================== # 1. 数据结构定义 (移除了 timestamp) # ========================================== RAW_DTYPE_V1 = np.dtype([ ('start_byte', ' 0: data = self.serial_port.read(self.serial_port.in_waiting) buffer.extend(data) packets = [] while True: idx = buffer.find(start_marker) if idx == -1: buffer = buffer[-3:] if len(buffer) >= 3 else buffer break if len(buffer) >= idx + packet_size: packet_bytes = buffer[idx : idx + packet_size] packets.append(packet_bytes) buffer = buffer[idx + packet_size :] else: buffer = buffer[idx:] break if packets: combined_bytes = b''.join(packets) parsed_arr = np.frombuffer(combined_bytes, dtype=self.dtype) self.data_received.emit(parsed_arr) except Exception as e: if self.is_running: self.error_occurred.emit(str(e)) finally: self.stop() def stop(self): self.is_running = False if self.serial_port: try: self.serial_port.cancel_read() except Exception: pass try: if self.serial_port.is_open: self.serial_port.close() except Exception: pass finally: self.serial_port = None # ========================================== # 4. 后台线程:GPS 动态模拟输出 (10Hz) # ========================================== class GPSSimulatorThread(QThread): error_occurred = pyqtSignal(str) def __init__(self, port, baudrate): super().__init__() self.port = port self.baudrate = baudrate self.is_running = False self.serial_port = None def get_nmea_checksum(self, sentence): calc_cksum = 0 for char in sentence: calc_cksum ^= ord(char) return f"{calc_cksum:02X}" def run(self): self.is_running = True try: self.serial_port = serial.Serial(self.port, self.baudrate, timeout=1) current_lat = 39.9042 current_lon = 116.3972 current_alt = 43.0 lat_step = 0.00001 lon_step = 0.00001 alt_step = 0.05 while self.is_running: start_time = time.perf_counter() now = time.time() gm_time = time.gmtime(now) ms = int((now % 1) * 1000) time_str = time.strftime("%H%M%S", gm_time) + f".{ms:03d}" date_str = time.strftime("%d%m%y", gm_time) current_lat += lat_step current_lon += lon_step current_alt += alt_step lat_deg = int(abs(current_lat)) lat_min = (abs(current_lat) - lat_deg) * 60 lat_str = f"{lat_deg:02d}{lat_min:07.4f}" lat_dir = "N" if current_lat >= 0 else "S" lon_deg = int(abs(current_lon)) lon_min = (abs(current_lon) - lon_deg) * 60 lon_str = f"{lon_deg:03d}{lon_min:07.4f}" lon_dir = "E" if current_lon >= 0 else "W" speed_knots = "19.4" course_true = "45.0" gga_core = f"GPGGA,{time_str},{lat_str},{lat_dir},{lon_str},{lon_dir},1,08,1.0,{current_alt:.1f},M,0.0,M,," rmc_core = f"GPRMC,{time_str},A,{lat_str},{lat_dir},{lon_str},{lon_dir},{speed_knots},{course_true},{date_str},0.0,E" gga_sentence = f"${gga_core}*{self.get_nmea_checksum(gga_core)}\r\n" rmc_sentence = f"${rmc_core}*{self.get_nmea_checksum(rmc_core)}\r\n" if self.serial_port and self.serial_port.is_open: self.serial_port.write(gga_sentence.encode('ascii')) self.serial_port.write(rmc_sentence.encode('ascii')) elapsed = time.perf_counter() - start_time sleep_time = 0.1 - elapsed if sleep_time > 0: time.sleep(sleep_time) except Exception as e: if self.is_running: self.error_occurred.emit(str(e)) finally: self.stop() def stop(self): self.is_running = False if self.serial_port: try: self.serial_port.cancel_write() except Exception: pass try: if self.serial_port.is_open: self.serial_port.close() except Exception: pass finally: self.serial_port = None # ========================================== # 5. 主窗口 # ========================================== class DataAnalyzerUI(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("🚀 高性能数据分析工具 (含经纬度地图与海拔曲线)") self.resize(1300, 900) self.df = pd.DataFrame() self.live_data_list = [] self.is_live_mode = False self.curves = [] self.last_fps_time = 0 self.frame_count = 0 self.last_packet_count = 0 self.current_packet_count = 0 self.is_sim_running = False self.sim_thread = None pg.setConfigOptions(antialias=False) self.init_ui() self.plot_timer = QTimer() self.plot_timer.timeout.connect(self.update_live_plot) def init_ui(self): main_widget = QWidget() self.setCentralWidget(main_widget) layout = QVBoxLayout(main_widget) # --- 第一排工具栏 --- top_bar1 = QHBoxLayout() self.ver_group = QButtonGroup(self) self.rb_v2 = QRadioButton("V2 (带GPS与海拔)") self.rb_v2.setChecked(True) self.ver_group.addButton(self.rb_v2) self.mode_group = QButtonGroup(self) self.rb_raw = QRadioButton("原始数据 (Raw)") self.rb_corr = QRadioButton("校正数据 (Corr)") self.rb_raw.setChecked(True) self.mode_group.addButton(self.rb_raw) self.mode_group.addButton(self.rb_corr) btn_load = QPushButton("📂 打开文件") btn_load.clicked.connect(self.load_file) btn_export = QPushButton("💾 导出CSV") btn_export.clicked.connect(self.export_csv) self.chk_mouse_mode = QCheckBox("🔍 鼠标框选放大") self.chk_mouse_mode.stateChanged.connect(self.toggle_mouse_mode) btn_autoscale = QPushButton("⟲ 复位视图") btn_autoscale.clicked.connect(self.reset_view) top_bar1.addWidget(QLabel("版本:")) top_bar1.addWidget(self.rb_v2) top_bar1.addSpacing(15) top_bar1.addWidget(QLabel("模式:")) top_bar1.addWidget(self.rb_raw) top_bar1.addWidget(self.rb_corr) top_bar1.addSpacing(15) top_bar1.addWidget(btn_load) top_bar1.addWidget(btn_export) top_bar1.addStretch() top_bar1.addWidget(self.chk_mouse_mode) top_bar1.addWidget(btn_autoscale) # --- 第二排工具栏 --- top_bar2 = QHBoxLayout() self.cb_ports = QComboBox() self.cb_baudrate = QComboBox() self.cb_baudrate.addItems(["9600", "115200", "230400", "460800", "921600", "2000000"]) self.cb_baudrate.setCurrentText("115200") btn_refresh_ports = QPushButton("🔄 刷新端口") btn_refresh_ports.clicked.connect(self.refresh_ports) self.btn_toggle_serial = QPushButton("▶ 打开接收串口") self.btn_toggle_serial.setStyleSheet("background-color: #4CAF50; color: white; font-weight: bold;") self.btn_toggle_serial.clicked.connect(self.toggle_serial) top_bar2.addWidget(QLabel("🔌 接收串口:")) top_bar2.addWidget(self.cb_ports) top_bar2.addWidget(btn_refresh_ports) top_bar2.addWidget(QLabel("波特率:")) top_bar2.addWidget(self.cb_baudrate) top_bar2.addWidget(self.btn_toggle_serial) top_bar2.addStretch() # --- 第三排工具栏 (GPS 模拟器) --- top_bar3 = QHBoxLayout() self.cb_sim_ports = QComboBox() self.cb_sim_baudrate = QComboBox() self.cb_sim_baudrate.addItems(["9600", "115200", "230400", "460800", "921600"]) self.cb_sim_baudrate.setCurrentText("115200") self.btn_toggle_sim = QPushButton("🛰 开启GPS动态模拟 (10Hz)") self.btn_toggle_sim.setStyleSheet("background-color: #FF9800; color: white; font-weight: bold;") self.btn_toggle_sim.clicked.connect(self.toggle_gps_sim) top_bar3.addWidget(QLabel("📡 输出串口:")) top_bar3.addWidget(self.cb_sim_ports) top_bar3.addWidget(QLabel("波特率:")) top_bar3.addWidget(self.cb_sim_baudrate) top_bar3.addWidget(self.btn_toggle_sim) top_bar3.addStretch() layout.addLayout(top_bar1) layout.addLayout(top_bar2) layout.addLayout(top_bar3) self.refresh_ports() # --- 监控栏 --- info_layout = QHBoxLayout() self.lbl_info = QLabel("请加载文件或打开串口接收数据...") self.lbl_info.setStyleSheet("color: blue; font-weight: bold;") self.lbl_fps = QLabel("📈 绘图帧率: -- FPS | 📥 接收率: -- 包/秒") self.lbl_fps.setStyleSheet("color: #E91E63; font-weight: bold;") info_layout.addWidget(self.lbl_info) info_layout.addStretch() info_layout.addWidget(self.lbl_fps) layout.addLayout(info_layout) # ========================================== # 多标签内容区域设置 # ========================================== self.tabs = QTabWidget() layout.addWidget(self.tabs) # [Tab 1] 主波形图 self.plot_widget = pg.PlotWidget() self.plot_widget.setBackground('w') self.plot_widget.showGrid(x=True, y=True, alpha=0.3) self.plot_widget.addLegend() self.plot_widget.setLabel('bottom', 'Data Points (Index)') # 更新了标签 self.plot_widget.setLabel('left', 'Value') self.vb = self.plot_widget.plotItem.vb self.tabs.addTab(self.plot_widget, "📈 波形图 (Plot)") # [Tab 2] GPS 轨迹与海拔视图 traj_container = QWidget() traj_layout = QVBoxLayout(traj_container) traj_splitter = QSplitter(Qt.Orientation.Vertical) self.traj_plot = pg.PlotWidget(title="🗺️ 实时轨迹 (经度 vs 纬度)") self.traj_plot.setBackground('w') self.traj_plot.showGrid(x=True, y=True, alpha=0.5) self.traj_plot.setLabel('bottom', 'Longitude (经度)') self.traj_plot.setLabel('left', 'Latitude (纬度)') self.traj_curve = self.traj_plot.plot(pen=pg.mkPen('b', width=2), symbol='o', symbolSize=3, symbolBrush='b') self.alt_plot = pg.PlotWidget(title="⛰️ 海拔高度 (Altitude)") self.alt_plot.setBackground('w') self.alt_plot.showGrid(x=True, y=True, alpha=0.5) self.alt_plot.setLabel('bottom', 'Data Points (Index)') # 更新了标签 self.alt_plot.setLabel('left', 'Altitude (m)') self.alt_curve = self.alt_plot.plot(pen=pg.mkPen('g', width=2)) traj_splitter.addWidget(self.traj_plot) traj_splitter.addWidget(self.alt_plot) traj_layout.addWidget(traj_splitter) self.tabs.addTab(traj_container, "🗺️ 轨迹与海拔 (Trajectory)") # [Tab 3] 数据表格 self.table_view = QTableView() self.table_view.setAlternatingRowColors(True) self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Interactive) self.tabs.addTab(self.table_view, "🔢 数据表 (Table)") def get_current_dtype(self): is_v2 = self.rb_v2.isChecked() is_raw = self.rb_raw.isChecked() if is_v2: return RAW_DTYPE_V2 if is_raw else CORRECTED_DTYPE_V2 return RAW_DTYPE_V1 if is_raw else CORRECTED_DTYPE_V1 def get_column_config(self): is_v2 = self.rb_v2.isChecked() is_raw = self.rb_raw.isChecked() # 这里移除了 timestamp if is_raw: base_cols = ['adc1', 'adc2', 'adc3'] labels = ['ADC 1', 'ADC 2', 'ADC 3'] data_cols = ['adc1', 'adc2', 'adc3'] else: base_cols = ['corr_x', 'corr_y', 'corr_z'] labels = ['X Axis', 'Y Axis', 'Z Axis'] data_cols = ['corr_x', 'corr_y', 'corr_z'] cols = base_cols + ['gps_time', 'gps_latitude', 'gps_longitude', 'gps_altitude'] if is_v2 else base_cols + ['checksum'] return cols, data_cols, labels def load_file(self): if self.is_live_mode: QMessageBox.warning(self, "警告", "请先关闭串口后再加载文件。") return file_name, _ = QFileDialog.getOpenFileName(self, "选择文件", "", "Data (*.dat);;All (*)") if not file_name: return try: dtype = self.get_current_dtype() raw_data = np.fromfile(file_name, dtype=dtype) if len(raw_data) == 0: return self.df = pd.DataFrame(raw_data) cols, data_cols, labels = self.get_column_config() self.lbl_info.setText(f"文件加载成功 | {len(self.df)} 行 | 版本: {'V2' if self.rb_v2.isChecked() else 'V1'}") self.lbl_fps.setText("📈 绘图帧率: -- FPS | 📥 接收率: -- 包/秒") self.refresh_table(cols) self.plot_static_data(data_cols, labels) except Exception as e: QMessageBox.critical(self, "解析错误", str(e)) def refresh_table(self, cols): display_df = self.df[cols] if not self.df.empty else pd.DataFrame(columns=cols) self.model = BigDataModel(display_df) self.table_view.setModel(self.model) self.table_view.resizeColumnsToContents() def plot_static_data(self, data_cols, labels): # 1. 主波形图静态渲染 self.plot_widget.clear() self.curves.clear() colors = ['#FF0000', '#00AA00', '#0000FF'] # 移除了 timestamp,改为使用数据点索引 x_data = np.arange(len(self.df)) for i, col in enumerate(data_cols): y_data = self.df[col].values curve = pg.PlotCurveItem(x=x_data, y=y_data, pen=pg.mkPen(color=colors[i], width=1.5), name=labels[i], skipFiniteCheck=True, autoDownsample=True, clipToView=True) self.plot_widget.addItem(curve) self.plot_widget.setLabel('bottom', 'Data Points (Index)') self.reset_view() # 2. 轨迹和海拔静态渲染 if self.rb_v2.isChecked() and 'gps_longitude' in self.df.columns: lons = self.df['gps_longitude'].values lats = self.df['gps_latitude'].values alts = self.df['gps_altitude'].values if 'gps_altitude' in self.df.columns else np.zeros_like(lons) valid_idx = (lons != 0.0) & (lats != 0.0) if np.any(valid_idx): self.traj_curve.setData(lons[valid_idx], lats[valid_idx]) else: self.traj_curve.setData(lons, lats) self.alt_curve.setData(alts) self.traj_plot.autoRange() self.alt_plot.autoRange() else: self.traj_curve.setData([], []) self.alt_curve.setData([]) def refresh_ports(self): self.cb_ports.clear() self.cb_sim_ports.clear() ports = serial.tools.list_ports.comports() for p in ports: port_name = f"{p.device} - {p.description}" self.cb_ports.addItem(port_name, p.device) self.cb_sim_ports.addItem(port_name, p.device) def toggle_serial(self): if not self.is_live_mode: port = self.cb_ports.currentData() if not port: QMessageBox.warning(self, "提示", "未找到有效串口") return baud = int(self.cb_baudrate.currentText()) dtype = self.get_current_dtype() self.df = pd.DataFrame() self.live_data_list = [] self.plot_widget.clear() self.curves.clear() self.traj_curve.setData([], []) self.alt_curve.setData([]) self.plot_widget.setLabel('bottom', 'Latest Points (Index)') colors = ['#FF0000', '#00AA00', '#0000FF'] _, _, labels = self.get_column_config() for i in range(len(labels)): curve = pg.PlotCurveItem(pen=pg.mkPen(color=colors[i], width=1.5), name=labels[i]) self.plot_widget.addItem(curve) self.curves.append(curve) self.current_packet_count = 0 self.last_packet_count = 0 self.frame_count = 0 self.last_fps_time = time.perf_counter() self.lbl_fps.setText("📈 绘图帧率: 计算中... | 📥 接收率: 计算中...") self.serial_thread = SerialReaderThread(port, baud, dtype) self.serial_thread.data_received.connect(self.on_live_data_received) self.serial_thread.error_occurred.connect(self.on_serial_error) self.serial_thread.start() self.plot_timer.start(50) self.is_live_mode = True self.btn_toggle_serial.setText("⏹ 关闭串口停止接收") self.btn_toggle_serial.setStyleSheet("background-color: #f44336; color: white; font-weight: bold;") self.rb_v2.setEnabled(False) self.rb_raw.setEnabled(False) self.rb_corr.setEnabled(False) else: self.serial_thread.stop() self.serial_thread.wait() self.plot_timer.stop() self.is_live_mode = False self.btn_toggle_serial.setText("▶ 打开接收串口") self.btn_toggle_serial.setStyleSheet("background-color: #4CAF50; color: white; font-weight: bold;") self.lbl_fps.setText("📈 绘图帧率: -- FPS | 📥 接收率: -- 包/秒") self.rb_v2.setEnabled(True) self.rb_raw.setEnabled(True) self.rb_corr.setEnabled(True) if self.live_data_list: full_array = np.concatenate(self.live_data_list) self.df = pd.DataFrame(full_array) cols, _, _ = self.get_column_config() self.refresh_table(cols) self.lbl_info.setText(f"串口采集完毕。总计收集 {len(self.df)} 行数据。") def on_live_data_received(self, data_array): self.live_data_list.append(data_array) self.current_packet_count += len(data_array) self.lbl_info.setText(f"🟢 正在接收数据... 已接收: {self.current_packet_count} 帧") def update_live_plot(self): self.frame_count += 1 current_time = time.perf_counter() elapsed = current_time - self.last_fps_time if elapsed >= 1.0: fps = self.frame_count / elapsed pps = (self.current_packet_count - self.last_packet_count) / elapsed self.lbl_fps.setText(f"📈 绘图帧率: {fps:.1f} FPS | 📥 接收率: {pps:.0f} 包/秒") self.last_fps_time = current_time self.frame_count = 0 self.last_packet_count = self.current_packet_count if not self.live_data_list: return MAX_POINTS = 5000 recent_chunks = [] point_count = 0 for arr in reversed(self.live_data_list): recent_chunks.append(arr) point_count += len(arr) if point_count >= MAX_POINTS: break recent_data = np.concatenate(recent_chunks[::-1]) if len(recent_data) > MAX_POINTS: recent_data = recent_data[-MAX_POINTS:] # 1. 更新主波形图 (自动以接收点索引作为 X 轴) _, data_cols, _ = self.get_column_config() for i, col in enumerate(data_cols): y_data = recent_data[col]# / 429496729.0 self.curves[i].setData(y_data) # 2. 更新轨迹和海拔 if self.rb_v2.isChecked() and 'gps_longitude' in recent_data.dtype.names: lons = recent_data['gps_longitude'] lats = recent_data['gps_latitude'] alts = recent_data['gps_altitude'] if 'gps_altitude' in recent_data.dtype.names else np.zeros_like(lons) valid_idx = (lons != 0.0) & (lats != 0.0) if np.any(valid_idx): self.traj_curve.setData(lons[valid_idx], lats[valid_idx]) else: self.traj_curve.setData(lons, lats) self.alt_curve.setData(alts) def on_serial_error(self, err_msg): self.toggle_serial() QMessageBox.critical(self, "接收串口错误", f"发生错误:\n{err_msg}") def toggle_gps_sim(self): if not self.is_sim_running: port = self.cb_sim_ports.currentData() if not port: QMessageBox.warning(self, "提示", "未找到有效的输出串口") return if self.is_live_mode and port == self.cb_ports.currentData(): reply = QMessageBox.question(self, "警告", "模拟输出端口与当前接收端口相同,可能会导致端口冲突。确定要继续吗?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No) if reply == QMessageBox.StandardButton.No: return baud = int(self.cb_sim_baudrate.currentText()) self.sim_thread = GPSSimulatorThread(port, baud) self.sim_thread.error_occurred.connect(self.on_sim_error) self.sim_thread.start() self.is_sim_running = True self.btn_toggle_sim.setText("⏹ 关闭GPS动态模拟") self.btn_toggle_sim.setStyleSheet("background-color: #f44336; color: white; font-weight: bold;") else: self.sim_thread.stop() self.sim_thread.wait() self.is_sim_running = False self.btn_toggle_sim.setText("🛰 开启GPS动态模拟 (10Hz)") self.btn_toggle_sim.setStyleSheet("background-color: #FF9800; color: white; font-weight: bold;") def on_sim_error(self, err_msg): self.toggle_gps_sim() QMessageBox.critical(self, "输出串口错误", f"模拟器串口发生错误:\n{err_msg}") def toggle_mouse_mode(self, state): mode = self.vb.RectMode if state == 2 else self.vb.PanMode self.vb.setMouseMode(mode) self.traj_plot.plotItem.vb.setMouseMode(mode) self.alt_plot.plotItem.vb.setMouseMode(mode) def reset_view(self): self.plot_widget.autoRange() self.traj_plot.autoRange() self.alt_plot.autoRange() def export_csv(self): if self.df.empty: return path, _ = QFileDialog.getSaveFileName(self, "保存", "export.csv", "CSV (*.csv)") if path: self.df.to_csv(path, index=False) QMessageBox.information(self, "完成", "导出成功") if __name__ == "__main__": app = QApplication(sys.argv) w = DataAnalyzerUI() w.show() sys.exit(app.exec())