Source code for petal_app_manager.proxies.external

"""
petalappmanager.proxies.external
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Thread-based proxies for long-running I/O back-ends (MAVLink, ROS 1, …).

Key changes vs. the first draft:
--------------------------------
* All per-key buffers are now :class:`collections.deque` with ``maxlen``.
  New data silently overwrites the oldest entry → bounded memory.
* Public API (``send``, ``register_handler``) is unchanged for petals.
* Docstrings preserved / expanded for clarity.
"""

from __future__ import annotations

import threading
import time
import socket
import errno
import struct
from abc import abstractmethod
from collections import defaultdict, deque
from typing import (
    Any, 
    Callable, 
    Deque, 
    Dict, 
    List, 
    Mapping, 
    Tuple, 
    Generator,
    Awaitable,
    Optional,
    Union
)
import contextlib
import logging
from pathlib import Path
import asyncio, shutil
from pydantic import BaseModel, Field
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor

from .base import BaseProxy
from .. import Config
from ..models.mavlink import (
    RebootStatusCode,
    RebootAutopilotResponse,
    FormatStorageResponse, 
    FormatStorageStatusCode
)
from pymavlink import mavutil, mavftp
from pymavlink.mavftp_op import FTP_OP
from pymavlink.dialects.v20 import all as mavlink_dialect

import os
# import rospy   # ← uncomment in ROS-enabled environments

import dotenv

_PARAM_TYPE_NAME_TO_ID = {
    "UINT8":  mavutil.mavlink.MAV_PARAM_TYPE_UINT8,
    "INT8":   mavutil.mavlink.MAV_PARAM_TYPE_INT8,
    "UINT16": mavutil.mavlink.MAV_PARAM_TYPE_UINT16,
    "INT16":  mavutil.mavlink.MAV_PARAM_TYPE_INT16,
    "UINT32": mavutil.mavlink.MAV_PARAM_TYPE_UINT32,
    "INT32":  mavutil.mavlink.MAV_PARAM_TYPE_INT32,
    "UINT64": mavutil.mavlink.MAV_PARAM_TYPE_UINT64,
    "INT64":  mavutil.mavlink.MAV_PARAM_TYPE_INT64,
    "REAL32": mavutil.mavlink.MAV_PARAM_TYPE_REAL32,
    "REAL64": mavutil.mavlink.MAV_PARAM_TYPE_REAL64,
}

_INT_TYPES = {
    mavutil.mavlink.MAV_PARAM_TYPE_UINT8,
    mavutil.mavlink.MAV_PARAM_TYPE_INT8,
    mavutil.mavlink.MAV_PARAM_TYPE_UINT16,
    mavutil.mavlink.MAV_PARAM_TYPE_INT16,
    mavutil.mavlink.MAV_PARAM_TYPE_UINT32,
    mavutil.mavlink.MAV_PARAM_TYPE_INT32,
}

_FLOAT_TYPES = {
    mavutil.mavlink.MAV_PARAM_TYPE_REAL32,
    mavutil.mavlink.MAV_PARAM_TYPE_REAL64,
}

ParamSpec = Union[
    Any,                            # value only
    Tuple[Any, Union[str, int]],    # (value, "UINT16") or (value, MAV_PARAM_TYPE_INT32)
    Dict[str, Any],                 # {"value": ..., "type": "INT16"}
]

def _parse_param_type(ptype: Optional[Any]) -> Optional[int]:
    """
    Accept:
      - None
      - int (already a MAV_PARAM_TYPE_* value)
      - strings like "UINT16", "int32", "MAV_PARAM_TYPE_INT32"
    """
    if ptype is None:
        return None
    if isinstance(ptype, int):
        return ptype
    if isinstance(ptype, str):
        s = ptype.strip().upper()
        s = s.replace("MAV_PARAM_TYPE_", "")
        if s in _PARAM_TYPE_NAME_TO_ID:
            return _PARAM_TYPE_NAME_TO_ID[s]
    raise ValueError(f"Unsupported param type specifier: {ptype!r}")

def _check_int_range(v: int, bits: int, signed: bool) -> None:
    if signed:
        lo, hi = -(1 << (bits - 1)), (1 << (bits - 1)) - 1
    else:
        lo, hi = 0, (1 << bits) - 1
    if not (lo <= v <= hi):
        raise ValueError(f"Value {v} out of range for {'INT' if signed else 'UINT'}{bits} [{lo}, {hi}]")

def _u32_to_f32_bits(u32: int) -> float:
    return struct.unpack("<f", struct.pack("<I", u32 & 0xFFFFFFFF))[0]

def _f32_bits_to_u32(f: float) -> int:
    return struct.unpack("<I", struct.pack("<f", float(f)))[0]

def _sign_extend(value: int, bits: int) -> int:
    sign_bit = 1 << (bits - 1)
    mask = (1 << bits) - 1
    value &= mask
    return (value ^ sign_bit) - sign_bit

[docs] def setup_file_only_logger(name: str, log_file: str, level: str = "INFO") -> logging.Logger: """Setup a logger that only writes to files, not console.""" logger = logging.getLogger(name) logger.setLevel(getattr(logging, level.upper())) # Clear any existing handlers to avoid console output logger.handlers.clear() # Create file handler file_handler = logging.FileHandler(log_file) file_handler.setLevel(getattr(logging, level.upper())) # Create formatter formatter = logging.Formatter( '%(asctime)s%(name)s%(levelname)s%(message)s' ) file_handler.setFormatter(formatter) # Add handler to logger logger.addHandler(file_handler) # Prevent propagation to root logger (which might log to console) logger.propagate = False return logger
# --------------------------------------------------------------------------- # # Public dataclasses returned to petals / REST # # --------------------------------------------------------------------------- #
[docs] class ULogInfo(BaseModel): """Metadata for a ULog that resides on the PX4 SD-card.""" index : int # 0-based index in the list remote_path: str size_bytes : int utc : int # epoch seconds
# Progress callback signature used by download_ulog ProgressCB = Callable[[float], Awaitable[None]] # 0.0 - 1.0
[docs] class DownloadCancelledException(Exception): """Raised when a download is cancelled by the user.""" pass
[docs] class FTPDeleteError(Exception): """Raised when a MAVFTP delete operation fails with a known FTP error code.""" def __init__(self, path: str, ftp_code: int, message: str): self.path = path self.ftp_code = ftp_code super().__init__(message)
# ──────────────────────────────────────────────────────────────────────────────
[docs] class ExternalProxy(BaseProxy): """ Base class for I/O drivers that must *poll* or *listen* continuously. A dedicated thread calls :py:meth:`_io_read_once` / :py:meth:`_io_write_once` in a tight loop while the FastAPI event-loop thread stays unblocked. * **Send buffers** - ``self._send[key]`` (deque, newest → right side) Outbound messages are enqueued here via :py:meth:`send`. The I/O thread drains these buffers by calling :py:meth:`_io_write_once` with all pending messages. * **Handlers** - ``self._handlers[key]`` Callbacks registered via :py:meth:`register_handler` are stored here. When new messages arrive via :py:meth:`_io_read_once`, they are processed directly in the I/O recv thread, which schedules async handlers on the event loop. """ # ──────────────────────────────────────────────────────── public helpers ── def __init__(self, maxlen: int, sleep_time_ms: float = 1.0) -> None: """ Parameters ---------- maxlen : Maximum number of messages kept *per key* in both send/recv maps. A value of 0 or ``None`` means *unbounded* (not recommended). """ self._maxlen = maxlen if sleep_time_ms < 0: sleep_time_ms = 0 self._sleep_time_ms = sleep_time_ms self._sleep_time_reader_ms = sleep_time_ms self._send: Dict[str, Deque[Any]] = {} self._handlers: Dict[str, List[Callable[[Any], None]]] = ( defaultdict(list) ) self._handler_configs: Dict[str, Dict[Callable[[Any], None], Dict[str, Any]]] = ( defaultdict(dict) ) self._last_message_times: Dict[str, Dict[str, float]] = defaultdict(dict) # Thread management (I/O threads only) self._send_running = threading.Event() self._recv_running = threading.Event() self._io_thread_send: threading.Thread | None = None self._io_thread_recv: threading.Thread | None = None self._loop: asyncio.AbstractEventLoop | None = None self._log = logging.getLogger(self.__class__.__name__)
[docs] def register_handler(self, key: str, fn: Callable[[Any], Awaitable[None]], duplicate_filter_interval: Optional[float] = None, queue_length: Optional[int] = None, cpu_heavy: bool = False, ) -> None: """ Attach an async callback *fn* so it fires for **every** message appended to ``_recv[key]``. The callback is scheduled on the main event loop for thread-safe execution. Parameters ---------- key : str The key to register the handler for. fn : Callable[[Any], Awaitable[None]] The async handler function to call for each message. duplicate_filter_interval : Optional[float] If specified, duplicate messages received within this interval (in seconds) will be filtered out and the handler will not be called. None disables filtering. queue_length : Optional[int] If specified, sets the maximum length of the message buffer for *key*. New messages overwrite the oldest when the buffer is full. If None, the default maxlen from proxy initialization is used. cpu_heavy : bool If True, the handler's CPU-bound work will be offloaded to a thread-pool executor managed by the proxy, preventing event-loop starvation. The handler must still be ``async def``, but it may call ``await loop.run_in_executor(...)`` internally—or simply set this flag and the proxy will wrap the entire handler invocation in ``run_in_executor`` automatically. Raises ------ TypeError If *fn* is not an async function (coroutine). """ if not asyncio.iscoroutinefunction(fn): raise TypeError( f"Handler must be an async function (coroutine). " f"Got {type(fn).__name__} instead. " f"Define your handler with 'async def' instead of 'def'." ) self._handlers[key].append(fn) self._handler_configs[key][fn] = { 'duplicate_filter_interval': duplicate_filter_interval, 'cpu_heavy': cpu_heavy, }
[docs] def unregister_handler(self, key: str, fn: Callable[[Any], Awaitable[None]]) -> None: """ Remove the callback *fn* from the broadcast list attached to *key*. If *fn* was not registered, the call is silently ignored. When the last callback for *key* is removed, the key itself is pruned to keep the dict size small. """ callbacks = self._handlers.get(key) if not callbacks: return # nothing registered under that key try: callbacks.remove(fn) except ValueError: self._log.warning( "Tried to unregister handler %s for key '%s' but it was not found.", fn, key ) return # fn was not in the list; ignore # Clean up handler config if key in self._handler_configs and fn in self._handler_configs[key]: del self._handler_configs[key][fn] if not callbacks: # last handler -> prune everything for that key del self._handlers[key] if key in self._handler_configs: del self._handler_configs[key]
[docs] def send(self, key: str, msg: Any, burst_count: Optional[int] = None, burst_interval: Optional[float] = None) -> None: """ Enqueue *msg* for transmission. The newest message is kept if the buffer is already full. Parameters ---------- key : str The key to send the message on. msg : Any The message to send. burst_count : Optional[int] If specified, send the message this many times in a burst. burst_interval : Optional[float] If burst_count is specified, wait this many seconds between each message. If None, all messages are sent immediately. """ if burst_count is None or burst_count <= 1: # Single message send self._send.setdefault(key, deque(maxlen=self._maxlen)).append(msg) else: # Burst send if burst_interval is None or burst_interval <= 0: # Send all messages immediately send_queue = self._send.setdefault(key, deque(maxlen=self._maxlen)) for _ in range(burst_count): send_queue.append(msg) else: # Schedule burst with intervals using a background task if self._loop is not None: try: # Check if we're in the same thread as the event loop current_loop = None try: current_loop = asyncio.get_running_loop() except RuntimeError: current_loop = None if current_loop is self._loop: # We're in the event loop thread, create task directly task = asyncio.create_task( self._send_burst(key, msg, burst_count, burst_interval) ) # Store the task reference to prevent garbage collection if not hasattr(self, '_burst_tasks'): self._burst_tasks = set() self._burst_tasks.add(task) task.add_done_callback(self._burst_tasks.discard) else: # We're in a different thread, schedule on proxy's loop def schedule_burst(): try: task = asyncio.create_task( self._send_burst(key, msg, burst_count, burst_interval) ) if not hasattr(self, '_burst_tasks'): self._burst_tasks = set() self._burst_tasks.add(task) task.add_done_callback(self._burst_tasks.discard) except Exception as e: self._log.error(f"Failed to schedule burst task: {e}") self._loop.call_soon_threadsafe(schedule_burst) except Exception as e: # If task creation fails, fall back to immediate send self._log.warning(f"Failed to create burst task: {e}, sending immediately") send_queue = self._send.setdefault(key, deque(maxlen=self._maxlen)) for _ in range(burst_count): send_queue.append(msg) else: # If no loop is available, fall back to immediate send self._log.warning("No event loop available for burst with interval, sending immediately") send_queue = self._send.setdefault(key, deque(maxlen=self._maxlen)) for _ in range(burst_count): send_queue.append(msg)
async def _send_burst(self, key: str, msg: Any, count: int, interval: float) -> None: """Send a burst of messages with specified interval.""" send_queue = self._send.setdefault(key, deque(maxlen=self._maxlen)) # Send messages with proper intervals for i in range(count): send_queue.append(msg) self._log.debug(f"Burst message {i+1}/{count} queued for key '{key}'") if i < count - 1: # Don't sleep after the last message await asyncio.sleep(interval) # ───────────────────────────────────────────── FastAPI life-cycle hooks ──
[docs] async def start(self) -> None: """Create the I/O threads and begin polling/writing.""" self._loop = asyncio.get_running_loop() self._burst_tasks = set() # Initialize burst tasks tracking self._send_running.set() self._recv_running.set() # Start I/O threads for send/recv self._io_thread_recv = threading.Thread(target=self._recv_body, daemon=True, name=f"{self.__class__.__name__}-Recv") self._io_thread_send = threading.Thread(target=self._send_body, daemon=True, name=f"{self.__class__.__name__}-Send") self._io_thread_recv.start() self._io_thread_send.start()
[docs] async def stop(self) -> None: """Ask the I/O threads to exit and join them (best-effort, 5 s timeout).""" self._send_running.clear() self._recv_running.clear() # Cancel any pending burst tasks if hasattr(self, '_burst_tasks'): for task in self._burst_tasks.copy(): if not task.done(): task.cancel() # Wait for tasks to complete cancellation if self._burst_tasks: await asyncio.gather(*self._burst_tasks, return_exceptions=True) self._burst_tasks.clear() # Stop I/O send thread if self._io_thread_send and self._io_thread_send.is_alive(): self._io_thread_send.join(timeout=5) self._io_thread_send = None # Stop I/O recv thread if self._io_thread_recv and self._io_thread_recv.is_alive(): self._io_thread_recv.join(timeout=5) self._io_thread_recv = None
# ─────────────────────────────────────────────── subclass responsibilities ─ @abstractmethod def _io_read_once(self, timeout: int=0) -> List[Tuple[str, Any]]: """ Retrieve **zero or more** `(key, message)` tuples from the device / middleware *without blocking*. Returning an empty list is perfectly fine. """ @abstractmethod def _io_write_once(self, batches: Mapping[str, List[Any]]) -> None: """ Push pending outbound messages to the device / middleware. ``batches`` maps *key* → list of messages drained from ``_send[key]``. """ # ─────────────────────────────────────────── internal worker main-loop ── def _send_body(self) -> None: """I/O thread body - drains send queues.""" while self._send_running.is_set(): pending: Dict[str, List[Any]] = defaultdict(list) for key, dq in list(self._send.items()): while dq: pending[key].append(dq.popleft()) if pending: self._io_write_once(pending) else: # Sleep briefly if there's nothing to send to avoid busy-waiting time.sleep(self._sleep_time_ms / 1000.0) def _recv_body(self) -> None: """I/O thread body - polls recv and processes messages with handlers directly.""" while self._recv_running.is_set(): for key, msg in self._io_read_once(timeout=self._sleep_time_reader_ms/1000.0): # Process directly in recv thread (handlers are async, dispatched to event loop) self._process_message_with_handlers(key, msg) # ─────────────────────────────────────────── message processing ── def _invoke_callback_safely( self, callback: Callable, key: str, msg: Any, *, cpu_heavy: bool = False, ): """Safely invoke an async callback by scheduling it on the main event loop. All callbacks are required to be async functions. When *cpu_heavy* is ``True`` the coroutine is wrapped so that its body executes inside ``loop.run_in_executor`` (thread pool), keeping the event loop free for other work. """ try: if self._loop and not self._loop.is_closed(): if cpu_heavy: async def _offloaded(): await self._loop.run_in_executor(self._exe, lambda: asyncio.run(callback(msg))) asyncio.run_coroutine_threadsafe(_offloaded(), self._loop) else: asyncio.run_coroutine_threadsafe(callback(msg), self._loop) else: self._log.warning( "[ExternalProxy] Cannot invoke async callback for key '%s': " "event loop not available", key, ) except Exception as exc: self._log.error( "[ExternalProxy] Error invoking callback for key '%s': %s", key, exc, ) def _process_message_with_handlers(self, key: str, msg: Any): """ Process a single message by invoking all registered handlers for the key. This runs in the I/O recv thread and handles duplicate filtering. Handlers are async and scheduled on the event loop. """ current_time = time.time() for cb in self._handlers.get(key, []): try: # Check if duplicate filtering is enabled for this handler handler_config = self._handler_configs.get(key, {}).get(cb, {}) filter_interval = handler_config.get('duplicate_filter_interval') should_call_handler = True if filter_interval is not None: # Convert message to string for comparison msg_str = str(msg) handler_key = f"{key}_{id(cb)}" # Check if we've seen this exact message recently for this handler if handler_key in self._last_message_times: last_msg_str, last_time = self._last_message_times[handler_key] if (msg_str == last_msg_str and current_time - last_time < filter_interval): should_call_handler = False self._log.debug( "[ExternalProxy] Filtered duplicate message for handler %s on key '%s'", cb, key ) # Update last message time for this handler if should_call_handler: self._last_message_times[handler_key] = (msg_str, current_time) if should_call_handler: is_cpu_heavy = handler_config.get('cpu_heavy', False) self._invoke_callback_safely(cb, key, msg, cpu_heavy=is_cpu_heavy) self._log.debug( "[ExternalProxy] handler %s called for key '%s': %s", cb, key, msg ) except Exception as exc: # never kill the loop self._log.error( "[ExternalProxy] handler %s raised: %s", cb, exc )
# ──────────────────────────────────────────────────────────────────────────────
[docs] class MavLinkExternalProxy(ExternalProxy): """ Threaded MAVLink driver using `pymavlink`. Buffers used ------------ * ``send["mav"]`` - outbound :class:`MAVLink_message` * ``recv["mav"]`` - any inbound message * ``recv[str(msg.get_msgId())]`` - by numeric ID * ``recv[msg.get_type()]`` - by string type """ def __init__( self, endpoint: str, baud: int, source_system_id: int, source_component_id: int, maxlen: int, mavlink_worker_sleep_ms: float = 1.0, mavlink_heartbeat_send_frequency: float = 5.0, root_sd_path: str = 'fs/microsd/log', ): super().__init__(maxlen=maxlen, sleep_time_ms=mavlink_worker_sleep_ms) self.endpoint = endpoint self.baud = baud self.source_system_id = source_system_id self.source_component_id = source_component_id self.mavlink_heartbeat_send_frequency = mavlink_heartbeat_send_frequency self.root_sd_path = root_sd_path self.master: mavutil.mavfile | None = None # Set up file-only logging log_dir = Config.PETAL_LOG_DIR log_path = Path(log_dir, "app-mavlinkexternalproxy.log") self._log_msgs = setup_file_only_logger( name="MavLinkExternalProxyMsgs", log_file=log_path, level="INFO" ) self._log = logging.getLogger("MavLinkExternalProxy") self._loop: asyncio.AbstractEventLoop | None = None self._exe = ThreadPoolExecutor(max_workers=1, thread_name_prefix="MavLinkExternalProxy") self.connected = False self._last_heartbeat_time = time.time() self.leaf_fc_connected = False self._last_leaf_fc_heartbeat_time = time.time() self._connection_check_interval = 5.0 # Check connection every 5 seconds self._heartbeat_timeout = 10.0 # Consider disconnected if no heartbeat for 60s self._leaf_fc_heartbeat_timeout = 5.0 # Consider Leaf FC disconnected if no heartbeat for 30s self._reconnect_interval = 2.0 # Wait 2s between reconnection attempts self._heartbeat_task = None self._connection_monitor_task = None self._reconnect_pending = False self._mav_lock = threading.Lock() self._download_lock = threading.Lock() # Prevent concurrent downloads # Rate limiting for logging self._last_log_time = {} self._log_interval = { 'HEARTBEAT': 10.0, # Log heartbeats every 10 seconds max 'MISSION_CURRENT': 5.0, # Log mission current every 5 seconds max 'ATTITUDE': 30.0, # Log attitude every 30 seconds max 'POSITION': 30.0, # Log position every 30 seconds max 'DEFAULT': 2.0 # Default interval for other messages } # Messages to suppress completely (only show at DEBUG level) self._suppress_messages = { 'SERVO_OUTPUT_RAW', 'ACTUATOR_MOTORS', 'ATTITUDE_QUATERNION', 'LOCAL_POSITION_NED', 'GLOBAL_POSITION_INT' } def _norm_name(self, x): """Normalize parameter name by removing null padding.""" try: return x.decode("ascii").rstrip("\x00") except AttributeError: return str(x).rstrip("\x00") def _decode_param_value(self, msg): """ Decode a PARAM_VALUE message. If type==INT32, reinterpret the float bits as int32. If type==UINT32, reinterpret the float bits as uint32. Parameters ---------- msg : MAVLink_param_value_message The PARAM_VALUE message to decode. Returns ------- Tuple[str, Any] The parameter name and decoded value. """ name = self._norm_name(msg.param_id) if msg.param_type in _INT_TYPES: raw_u32 = _f32_bits_to_u32(msg.param_value) if msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT8: val = raw_u32 & 0xFF elif msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT8: val = _sign_extend(raw_u32 & 0xFF, 8) elif msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT16: val = raw_u32 & 0xFFFF elif msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT16: val = _sign_extend(raw_u32 & 0xFFFF, 16) elif msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT32: val = raw_u32 elif msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT32: val = _sign_extend(raw_u32, 32) else: # Shouldn't happen due to _INT_TYPES, but keep safe val = raw_u32 return name, val # REAL32 (what PX4 mostly uses) if msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_REAL32: return name, float(msg.param_value) # Classic PARAM_* can't reliably carry REAL64/INT64/UINT64; if they appear, just pass through float field return name, float(msg.param_value) def _encode_param_value(self, value: Any, param_type: int) -> float: """ Encode a parameter value for transmission. For INT32 types, encode the int32 bits as float32 for wire transmission. For UINT32 types, encode the uint32 bits as float32 for wire transmission. Returns the float value to put in param_value field. Parameters ---------- value : Any The parameter value to encode. param_type : int The MAV_PARAM_TYPE_* type of the parameter. Returns ------- float The encoded float value for transmission. """ if param_type in _INT_TYPES: v = int(value) if param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT8: _check_int_range(v, 8, signed=False) u32 = v elif param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT8: _check_int_range(v, 8, signed=True) u32 = v & 0xFF elif param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT16: _check_int_range(v, 16, signed=False) u32 = v elif param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT16: _check_int_range(v, 16, signed=True) u32 = v & 0xFFFF elif param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT32: _check_int_range(v, 32, signed=False) u32 = v elif param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT32: _check_int_range(v, 32, signed=True) u32 = v & 0xFFFFFFFF else: u32 = v & 0xFFFFFFFF return _u32_to_f32_bits(u32) if param_type == mavutil.mavlink.MAV_PARAM_TYPE_REAL32: return float(value) # Classic PARAM_SET can't carry 64-bit values; refuse explicitly if param_type in ( mavutil.mavlink.MAV_PARAM_TYPE_UINT64, mavutil.mavlink.MAV_PARAM_TYPE_INT64, mavutil.mavlink.MAV_PARAM_TYPE_REAL64, ): raise ValueError("64-bit PARAM_SET not supported; use PARAM_EXT_* for 64-bit/REAL64 parameters") # Fallback return float(value) def _should_log_message(self, msg_type: str) -> bool: """Determine if a message should be logged based on rate limiting""" import time current_time = time.time() # Suppress high-frequency messages completely at INFO level if msg_type in self._suppress_messages: return False # Get the appropriate interval for this message type interval = self._log_interval.get(msg_type, self._log_interval['DEFAULT']) # Check if enough time has passed since last log last_log = self._last_log_time.get(msg_type, 0.0) if current_time - last_log >= interval: self._last_log_time[msg_type] = current_time return True return False @property def target_system(self) -> int: """Return the target system ID of the MAVLink connection.""" if self.master: return self.master.target_system return 0 @property def target_component(self) -> int: """Return the target component ID of the MAVLink connection.""" if self.master: return self.master.target_component return 0 # ------------------------ life-cycle --------------------- #
[docs] async def start(self): """Open the MAVLink connection then launch the worker thread.""" self._loop = asyncio.get_running_loop() # Schedule initial connection attempt in background (non-blocking) # This allows the server to start immediately without waiting for MAVLink asyncio.create_task(self._initial_connection_attempt()) # Start the worker thread first await super().start() # Start connection monitoring and heartbeat tasks self._connection_monitor_task = asyncio.create_task(self._monitor_connection()) # send heartbeat at configured frequency if self.mavlink_heartbeat_send_frequency is not None: try: frequency = float(self.mavlink_heartbeat_send_frequency) if frequency <= 0: raise ValueError("Heartbeat frequency must be positive") except ValueError as exc: self._log.error(f"Invalid self.mavlink_heartbeat_send_frequency: {exc}") frequency = 5.0 self._heartbeat_task = asyncio.create_task(self._send_heartbeat_periodically(frequency=frequency))
async def _initial_connection_attempt(self): """Attempt initial MAVLink connection in the background.""" try: self._log.info("Attempting initial MAVLink connection to %s", self.endpoint) await self._establish_connection() if self.connected: self._log.info("Initial MAVLink connection successful") else: self._log.info("Initial MAVLink connection failed - will retry in background") except Exception as e: self._log.warning(f"Initial MAVLink connection attempt failed: {e} - will retry in background") async def _establish_connection(self): """Establish MAVLink connection and wait for heartbeat.""" try: if self.master: # Check if any FTP operations are in progress before closing connection if self._download_lock.locked() or self._mav_lock.locked(): self._log.warning("Cannot establish new connection - FTP operation in progress") return try: self.master.close() except: pass # Ignore errors when closing old connection # Run the blocking connection establishment in a separate thread self.master = await self._loop.run_in_executor( self._exe, self._create_mavlink_connection ) # Try to get a heartbeat with timeout - also run in executor try: heartbeat_received = await self._loop.run_in_executor( self._exe, self._wait_for_heartbeat ) if heartbeat_received: self.connected = True self._last_heartbeat_time = time.time() self._log.info("MAVLink connection established - Heartbeat from sys %s, comp %s", self.master.target_system, self.master.target_component) # Register heartbeat handler to track connection health if self._on_heartbeat_received not in self._handlers.get(str(mavlink_dialect.MAVLINK_MSG_ID_HEARTBEAT), []): self.register_handler(str(mavlink_dialect.MAVLINK_MSG_ID_HEARTBEAT), self._on_heartbeat_received) if self._on_leaf_fc_heartbeat_received not in self._handlers.get(str(mavlink_dialect.MAVLINK_MSG_ID_LEAF_HEARTBEAT), []): self.register_handler(str(mavlink_dialect.MAVLINK_MSG_ID_LEAF_HEARTBEAT), self._on_leaf_fc_heartbeat_received) else: self.connected = False self._log.warning("No heartbeat received from MAVLink endpoint %s", self.endpoint) except (OSError, socket.error) as e: self.connected = False self._log.warning(f"Socket error during heartbeat wait: {e}") except Exception as e: self.connected = False self._log.warning(f"Error waiting for heartbeat: {e}") except Exception as e: self.connected = False self._log.error(f"Error establishing MAVLink connection: {str(e)}") if self.master: try: self.master.close() except: pass self.master = None def _create_mavlink_connection(self): """Create MAVLink connection in a separate thread.""" return mavutil.mavlink_connection( self.endpoint, baud=self.baud, dialect="all", source_system=self.source_system_id, source_component=self.source_component_id ) def _wait_for_heartbeat(self): """Wait for heartbeat in a separate thread.""" if self.master: return self.master.wait_heartbeat(timeout=5) return False async def _on_heartbeat_received(self, msg): """Handler for incoming heartbeat messages to track connection health.""" self._last_heartbeat_time = time.time() if not self.connected: self.connected = True self._log.info("MAVLink connection re-established") async def _on_leaf_fc_heartbeat_received(self, msg): """Handler for incoming heartbeat messages to track connection health.""" self._last_leaf_fc_heartbeat_time = time.time() if not self.leaf_fc_connected: self.leaf_fc_connected = True self._log.info("Leaf FC connection re-established") async def _monitor_connection(self): """Monitor connection health and trigger reconnection if needed.""" while self._recv_running.is_set(): try: # Skip monitoring if _mav_lock is held (FTP operation in progress) if self._download_lock.locked(): self._last_leaf_fc_heartbeat_time += self._connection_check_interval self._last_heartbeat_time += self._connection_check_interval self._log.debug("Skipping connection monitoring - FTP operation in progress") await asyncio.sleep(self._connection_check_interval) continue current_time = time.time() # Check if we haven't received a Leaf FC heartbeat recently if abs(current_time - self._last_leaf_fc_heartbeat_time) > self._leaf_fc_heartbeat_timeout: if self.leaf_fc_connected: self._log.warning("No Leaf FC heartbeat received for %.1fs - Leaf FC connection lost", current_time - self._last_leaf_fc_heartbeat_time) self.leaf_fc_connected = False else: self._log.warning("No Leaf FC heartbeat received for %.1fs - still disconnected", current_time - self._last_leaf_fc_heartbeat_time) # Check if we haven't received a heartbeat recently if abs(current_time - self._last_heartbeat_time) > self._heartbeat_timeout: if self.connected: self._log.warning("No heartbeat received for %.1fs - connection lost", current_time - self._last_heartbeat_time) self.connected = False else: self._log.warning("No heartbeat received for %.1fs - still disconnected", current_time - self._last_heartbeat_time) # Attempt reconnection if not connected - BUT only if no FTP operations are in progress if not self.connected and self._recv_running.is_set(): # Double-check that no FTP operations are running before attempting reconnection if self._download_lock.locked() or self._mav_lock.locked(): self._log.debug("Delaying reconnection - FTP operation in progress") await asyncio.sleep(self._connection_check_interval) continue self._log.info("Attempting to reconnect to MAVLink...") await self._establish_connection() if not self.connected: await asyncio.sleep(self._reconnect_interval) # Check connection health periodically await asyncio.sleep(self._connection_check_interval) except asyncio.CancelledError: break except Exception as e: self._log.error(f"Error in connection monitor: {str(e)}") await asyncio.sleep(self._reconnect_interval) def _schedule_reconnect(self) -> None: """Called from the FTP thread when it detects a dead FD.""" if not self._recv_running.is_set(): return # avoid stampeding: only schedule once if getattr(self, "_reconnect_pending", False): return self._reconnect_pending = True async def _task(): try: # Wait for any ongoing FTP operations to complete before reconnecting while self._download_lock.locked() or self._mav_lock.locked(): self._log.debug("Waiting for FTP operations to complete before reconnecting...") await asyncio.sleep(0.5) await self._establish_connection() except Exception: # force a fresh BlockingParser next time only on failure self._parser = None raise finally: self._reconnect_pending = False asyncio.run_coroutine_threadsafe(_task(), self._loop) async def _send_heartbeat_periodically(self, frequency: float = 5.0): """Periodically send a MAVLink heartbeat message.""" interval = 1.0 / frequency while self._send_running.is_set(): try: if self.connected and self.master: await self.send_heartbeat() else: self._log_msgs.debug("Skipping heartbeat send - not connected") except Exception as exc: self._log_msgs.error(f"Failed to send heartbeat: {exc}") # Don't mark as disconnected just for heartbeat send failure await asyncio.sleep(interval)
[docs] async def send_heartbeat(self): """Send a MAVLink heartbeat message.""" if not self.master: raise RuntimeError("MAVLink master not initialized") if not self.connected: raise RuntimeError("MAVLink not connected") msg = self.master.mav.heartbeat_encode( mavutil.mavlink.MAV_TYPE_GCS, # GCS type mavutil.mavlink.MAV_AUTOPILOT_INVALID, # Autopilot type 0, # Base mode 0, # Custom mode mavutil.mavlink.MAV_STATE_ACTIVE # System state ) self.send("mav", msg) self._log_msgs.debug("Sent MAVLink heartbeat")
[docs] async def stop(self): """Stop the worker and close the link.""" # Cancel monitoring tasks if self._connection_monitor_task: self._connection_monitor_task.cancel() try: await self._connection_monitor_task except asyncio.CancelledError: pass if self._heartbeat_task: self._heartbeat_task.cancel() try: await self._heartbeat_task except asyncio.CancelledError: pass # Stop the worker thread await super().stop() # Close MAVLink connection if self.master: self.master.close() self.master = None self.connected = False
# ------------------- I/O primitives --------------------- # def _io_read_once(self, timeout: float = 0.0) -> List[Tuple[str, Any]]: if not self.master or not self.connected: return [] out: List[Tuple[str, Any]] = [] try: while True: with self._mav_lock: msg = self.master.recv_match(blocking=True, timeout=timeout) if msg is None: break msg_type = msg.get_type() msg_id = msg.get_msgId() out.append(("mav", msg)) out.append((str(msg_id), msg)) out.append((msg_type, msg)) except (OSError, socket.error) as e: # Handle connection errors gracefully if e.errno in [errno.EBADF, errno.ECONNRESET, errno.ECONNREFUSED]: self._log_msgs.debug(f"MAVLink connection lost during read: {e}") # Don't mark as disconnected here, let the heartbeat monitor handle it else: self._log_msgs.error(f"Unexpected error reading MAVLink messages: {e}") time.sleep(timeout) except Exception as e: self._log_msgs.error(f"Error reading MAVLink messages: {e}") # Don't mark as disconnected here, let the heartbeat monitor handle it time.sleep(timeout) return out def _io_write_once(self, batches): """Send queued MAVLink messages.""" if not self.master or not self.connected: return for key, msgs in batches.items(): for msg in msgs: try: msg_type = msg.get_type() if hasattr(msg, 'get_type') else 'UNKNOWN' msg_id = msg.get_msgId() if hasattr(msg, 'get_msgId') else 'N/A' # Only log if rate limiting allows if self._should_log_message(msg_type): self._log_msgs.info(f"📤 MAVLink TX: {msg_type} (ID: {msg_id}) - {msg}") with self._mav_lock: self.master.mav.send(msg) except (OSError, socket.error) as e: if e.errno in [errno.EBADF, errno.ECONNRESET, errno.ECONNREFUSED, errno.EPIPE]: self._log_msgs.debug(f"MAVLink connection lost during write: {e}") # Don't mark as disconnected here, let the heartbeat monitor handle it break # Stop trying to send more messages else: self._log_msgs.error(f"Unexpected error sending MAVLink message {key}: {e}") except Exception as exc: self._log_msgs.error( "Failed to send MAVLink message %s: %s", key, exc ) # Don't mark as disconnected here, let the heartbeat monitor handle it # ------------------- helpers exposed to petals --------- #
[docs] def build_req_msg_long(self, message_id: int) -> mavutil.mavlink.MAVLink_command_long_message: """ Build a MAVLink command to request a specific message type. Parameters ---------- message_id : int The numeric ID of the MAVLink message to request. Returns ------- mavutil.mavlink.MAVLink_command_long_message The MAVLink command message to request the specified message. Raises ------ RuntimeError If MAVLink connection is not established. """ if not self.master or not self.connected: raise RuntimeError("MAVLink connection not established") cmd = self.master.mav.command_long_encode( self.master.target_system, self.master.target_component, mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE, 0, # confirmation float(message_id), # param1: Message ID to be streamed 0, 0, 0, 0, 0, 0 ) return cmd
[docs] def build_req_msg_log_request(self, message_id: int) -> mavutil.mavlink.MAVLink_log_request_list_message: """ Build a MAVLink command to request a specific log message. Parameters ---------- message_id : int The numeric ID of the log message to request. Returns ------- mavutil.mavlink.MAVLink_log_request_list_message The MAVLink command message to request the specified log. Raises ------ RuntimeError If MAVLink connection is not established. """ if not self.master or not self.connected: raise RuntimeError("MAVLink connection not established") cmd = self.master.mav.log_request_list_encode( self.master.target_system, self.master.target_component, 0, # start id 0xFFFF # end id ) return cmd
[docs] def build_req_msg_log_data( self, log_id: int, ofs: int = 0, count: int = 0xFFFFFFFF, ) -> mavutil.mavlink.MAVLink_log_request_data_message: """ Build LOG_REQUEST_DATA for a given log. Parameters ---------- log_id : int The log id from LOG_ENTRY.id ofs : int Offset into the log (usually 0 for first request) count : int Number of bytes requested. For PX4/ArduPilot it's common to use 0xFFFFFFFF to say "send the whole log". """ if not self.master or not self.connected: raise RuntimeError("MAVLink connection not established") return self.master.mav.log_request_data_encode( self.master.target_system, self.master.target_component, log_id, ofs, count, )
[docs] def build_param_request_read(self, name: str, index: int = -1): """ Build MAVLink PARAM_REQUEST_READ for a named or indexed parameter. If index == -1, the 'name' is used; otherwise PX4 will ignore name. """ if not self.master or not self.connected: raise RuntimeError("MAVLink connection not established") # pymavlink will pad/trim to 16 chars; PX4 expects ASCII return self.master.mav.param_request_read_encode( self.master.target_system, self.master.target_component, name.encode("ascii"), index )
[docs] def build_param_request_list(self): """Build MAVLink PARAM_REQUEST_LIST to fetch the full table.""" if not self.master or not self.connected: raise RuntimeError("MAVLink connection not established") return self.master.mav.param_request_list_encode( self.master.target_system, self.master.target_component )
[docs] def build_param_set(self, name: str, value: Any, param_type: int): """ Build MAVLink PARAM_SET for setting a parameter. Handles INT32 encoding where int32 values are encoded as float32 bits for wire transmission. """ if not self.master or not self.connected: raise RuntimeError("MAVLink connection not established") # Use the encoding method for proper INT32 handling encoded_value = self._encode_param_value(value, param_type) return self.master.mav.param_set_encode( self.master.target_system, self.master.target_component, name.encode("ascii"), encoded_value, # properly encoded value param_type # mavutil.mavlink.MAV_PARAM_TYPE_* )
[docs] def build_reboot_command( self, reboot_autopilot: bool = True, reboot_onboard_computer: bool = False, ) -> mavutil.mavlink.MAVLink_command_long_message: """ Build a MAVLink command to reboot the autopilot and/or onboard computer. Parameters ---------- reboot_autopilot : bool If True, reboot the autopilot (PX4/ArduPilot). Default is True. reboot_onboard_computer : bool If True, reboot the onboard computer. Default is False. Returns ------- mavutil.mavlink.MAVLink_command_long_message The MAVLink COMMAND_LONG message for reboot. Raises ------ RuntimeError If MAVLink connection is not established. """ if not self.master or not self.connected: raise RuntimeError("MAVLink connection not established") # param1: 1=reboot autopilot, 0=do nothing # param2: 1=reboot onboard computer, 0=do nothing param1 = 1.0 if reboot_autopilot else 0.0 param2 = 1.0 if reboot_onboard_computer else 0.0 return self.master.mav.command_long_encode( self.master.target_system, self.master.target_component, mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, 0, # confirmation param1, # param1: reboot autopilot param2, # param2: reboot onboard computer 0, 0, 0, 0, 0 # param3..param7 unused )
[docs] def build_format_storage_command( self, storage_id: int = 0, ) -> mavutil.mavlink.MAVLink_command_long_message: """ Build a MAVLink command to format the SD card (MAV_CMD_PREFLIGHT_STORAGE). Parameters ---------- storage_id : int Storage device ID (0 = primary SD card). Default is 0. Returns ------- mavutil.mavlink.MAVLink_command_long_message The MAVLink COMMAND_LONG message for format storage. Raises ------ RuntimeError If MAVLink connection is not established. """ if not self.master or not self.connected: raise RuntimeError("MAVLink connection not established") return self.master.mav.command_long_encode( self.master.target_system, self.master.target_component, mavutil.mavlink.MAV_CMD_PREFLIGHT_STORAGE, # 245 0, # confirmation 2, # param1: 2 = format float(storage_id), # param2: storage id / device 0, 0, 0, 0, 0 # param3..param7 unused )
[docs] async def format_sd_card( self, storage_id: int = 0, timeout: float = 10.0, ) -> "FormatStorageResponse": """ Send a format storage command to the autopilot and wait for ACK. Parameters ---------- storage_id : int Storage device ID (0 = primary SD card). Default is 0. timeout : float Maximum time to wait for acknowledgment. Default is 10.0 seconds (formatting can take a few seconds). Returns ------- FormatStorageResponse Structured response indicating success/failure and reason. """ if not self.connected: return FormatStorageResponse( success=False, status_code=FormatStorageStatusCode.FAIL_NOT_CONNECTED, reason="MAVLink connection not established.", ack_result=None, ) cmd = self.build_format_storage_command(storage_id=storage_id) result = {"ack_received": False, "result": None} def _collector(pkt) -> bool: if pkt.get_type() != "COMMAND_ACK": return False if pkt.command == mavutil.mavlink.MAV_CMD_PREFLIGHT_STORAGE: result["ack_received"] = True result["result"] = pkt.result return True return False COMMAND_ACK_ID = str(mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_ACK) _ACK_TO_STATUS = { mavutil.mavlink.MAV_RESULT_DENIED: FormatStorageStatusCode.FAIL_ACK_DENIED, mavutil.mavlink.MAV_RESULT_TEMPORARILY_REJECTED: FormatStorageStatusCode.FAIL_ACK_TEMPORARILY_REJECTED, mavutil.mavlink.MAV_RESULT_UNSUPPORTED: FormatStorageStatusCode.FAIL_ACK_UNSUPPORTED, mavutil.mavlink.MAV_RESULT_FAILED: FormatStorageStatusCode.FAIL_ACK_FAILED, mavutil.mavlink.MAV_RESULT_IN_PROGRESS: FormatStorageStatusCode.FAIL_ACK_IN_PROGRESS, getattr(mavutil.mavlink, "MAV_RESULT_CANCELLED", -999): FormatStorageStatusCode.FAIL_ACK_CANCELLED, } _ACK_NAME = { mavutil.mavlink.MAV_RESULT_ACCEPTED: "ACCEPTED", mavutil.mavlink.MAV_RESULT_TEMPORARILY_REJECTED: "TEMPORARILY_REJECTED", mavutil.mavlink.MAV_RESULT_DENIED: "DENIED", mavutil.mavlink.MAV_RESULT_UNSUPPORTED: "UNSUPPORTED", mavutil.mavlink.MAV_RESULT_FAILED: "FAILED", mavutil.mavlink.MAV_RESULT_IN_PROGRESS: "IN_PROGRESS", getattr(mavutil.mavlink, "MAV_RESULT_CANCELLED", -999): "CANCELLED", } try: await self.send_and_wait( match_key=COMMAND_ACK_ID, request_msg=cmd, collector=_collector, timeout=timeout, ) except TimeoutError: self._log.warning("Format storage command sent but no ACK received within timeout.") return FormatStorageResponse( success=False, status_code=FormatStorageStatusCode.FAIL_TIMEOUT, reason=f"No ACK received within {timeout}s timeout.", ack_result=None, ) # ACK path if result["ack_received"]: ack_val = result["result"] ack_name = _ACK_NAME.get(ack_val, f"UNKNOWN({ack_val})") if ack_val == mavutil.mavlink.MAV_RESULT_ACCEPTED: self._log.info("Format storage command accepted by autopilot") return FormatStorageResponse( success=True, status_code=FormatStorageStatusCode.OK_ACK_ACCEPTED, reason="Autopilot acknowledged the format storage command (ACCEPTED).", ack_result=ack_val, ) status = _ACK_TO_STATUS.get(ack_val, FormatStorageStatusCode.FAIL_ACK_UNKNOWN) self._log.warning(f"Format storage command rejected with result: {ack_val} ({ack_name})") return FormatStorageResponse( success=False, status_code=status, reason=f"Autopilot rejected the format storage command: {ack_name}.", ack_result=ack_val, ) # Rare: send_and_wait returned without TimeoutError but collector never matched return FormatStorageResponse( success=False, status_code=FormatStorageStatusCode.FAIL_NO_ACK_MATCH, reason="send_and_wait returned but no matching COMMAND_ACK for format storage was observed.", ack_result=None, )
[docs] def build_motor_value_command( self, motor_idx: int, motor_value: float, timeout: float ) -> mavutil.mavlink.MAVLink_command_long_message: """Build MAV_CMD_ACTUATOR_TEST command for a motor.""" # param1 = throttle value (0-1 or NaN) # param2 = timeout in seconds # param5 = motor mapping (110x where x is motor index return self.master.mav.command_long_encode( 1, # TODO: investigate best practice 1, # TODO: investigate best practice mavutil.mavlink.MAV_CMD_ACTUATOR_TEST, 0, # confirmation motor_value, # param1: Motor value (0-1 or NaN) timeout, # param2: Timeout in seconds 0, # Reserved 0, # Reserved float(1100 + motor_idx), # param5: Motor mapping (110x) 0, # Reserved 0 # Reserved )
[docs] def build_request_message_command(self) -> mavutil.mavlink.MAVLink_command_long_message: """ Build a MAVLink command to request a specific message once using MAV_CMD_REQUEST_MESSAGE (common.xml). Returns ------- mavutil.mavlink.MAVLink_command_long_message The MAVLink COMMAND_LONG message requesting the given message. Raises ------ RuntimeError If MAVLink connection is not established. """ if not self.master or not self.connected: raise RuntimeError("MAVLink connection not established") return self.master.mav.command_long_encode( self.master.target_system, self.master.target_component, mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE, 0, # confirmation float(mavutil.mavlink.MAVLINK_MSG_ID_SYSTEM_TIME), # param1: requested message id 0, 0, 0, 0, 0, 0 # param2..param7 unused )
[docs] def build_shell_serial_control_msgs( self, text: str, device: int = 10, # PX4 mavlink_shell.py uses devnum=10 respond: bool = True, exclusive: bool = True, ) -> list[mavutil.mavlink.MAVLink_serial_control_message]: """ Build SERIAL_CONTROL messages that write `text` to the PX4 MAVLink shell. Splits into <=70 byte chunks (MAVLink SERIAL_CONTROL data field). Parameters ---------- text : str The text to send to the MAVLink shell. device : int The device number to use (default 10 for PX4 mavlink_shell). respond : bool If True, set the RESPOND flag (default True). exclusive : bool If True, set the EXCLUSIVE flag (default True). """ if not self.master or not self.connected: raise RuntimeError("MAVLink connection not established") flags = 0 if respond: flags |= mavutil.mavlink.SERIAL_CONTROL_FLAG_RESPOND if exclusive: flags |= mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE b = text.encode("utf-8") msgs = [] # MAVLink shell implementation chunks at 70 bytes for i in range(0, len(b), 70): chunk = b[i:i+70] data = list(chunk) + [0] * (70 - len(chunk)) msgs.append( self.master.mav.serial_control_encode( device, # device flags, # flags 0, # timeout (PX4 ignores it) 0, # baudrate (0 = no change) len(chunk), # count data, # data[70] ) ) return msgs
[docs] async def send_shell_command( self, command: str, settle_time: float = 0.5, timeout: float = 3.0, ) -> str: """ Send a shell command to PX4 via MAVLink SERIAL_CONTROL and wait for PX4 to acknowledge it. Uses :py:meth:`send_and_wait` so the command is transmitted through the normal I/O queue **and** we block until PX4 replies with at least one ``SERIAL_CONTROL`` packet (flag ``REPLY``), confirming the shell received the command. A *settle_time* pause is added afterwards so PX4 has time to act on it (e.g. release file handles). Parameters ---------- command : str The shell command to send (e.g. ``"logger stop\\n"``). settle_time : float Extra time (seconds) to wait **after** PX4 acknowledges the command, giving it time to act (e.g. close file handles). timeout : float Maximum time (seconds) to wait for a reply before raising ``TimeoutError``. Returns ------- str The text content of the first SERIAL_CONTROL reply from PX4, or an empty string if no reply was received. """ if not self.connected: raise RuntimeError("MAVLink connection not established") msgs = self.build_shell_serial_control_msgs(command) if not msgs: return "" # We only need to send_and_wait on the *last* chunk; PX4 will # reply once it has the full command (terminated by \n). # Send all chunks except the last one directly. for msg in msgs[:-1]: self.send("mav", msg) serial_control_id = str(mavutil.mavlink.MAVLINK_MSG_ID_SERIAL_CONTROL) reply_received = False reply_chunks: list[str] = [] def _collector(pkt) -> bool: """Return True once we get any SERIAL_CONTROL reply from PX4.""" nonlocal reply_received flags = getattr(pkt, "flags", 0) if flags & mavutil.mavlink.SERIAL_CONTROL_FLAG_REPLY: # Capture reply text for diagnostics count = getattr(pkt, "count", 0) data = getattr(pkt, "data", []) if count > 0 and data: reply_chunks.append( bytes(data[:count]).decode("utf-8", errors="replace") ) reply_received = True return True return False try: await self.send_and_wait( match_key=serial_control_id, request_msg=msgs[-1], collector=_collector, timeout=timeout, ) reply_text = "".join(reply_chunks).strip() self._log.info( f"Shell command {command.strip()!r} acknowledged by PX4" + (f" — reply: {reply_text!r}" if reply_text else "") ) except TimeoutError: reply_text = "" self._log.warning( f"No SERIAL_CONTROL reply for shell command " f"{command.strip()!r} within {timeout}s — " f"proceeding anyway (command may still have been received)" ) # Give PX4 time to act on the command (e.g. release file handles) if settle_time > 0: await asyncio.sleep(settle_time) return reply_text
[docs] async def stop_px4_logging(self, settle_time: float = 2.0, timeout: float = 5.0) -> None: """ Stop PX4's SD-card logger so that log files can be deleted. Sends ``logger stop\\n`` via the MAVLink shell and waits for PX4 to acknowledge. The logger module releases its file handles, making files deletable via MAVFTP. """ self._log.info("Stopping PX4 logger to allow file deletion…") await self.send_shell_command("logger stop\n", settle_time=settle_time, timeout=timeout) self._log.info("PX4 logger stopped.")
[docs] async def start_px4_logging(self, settle_time: float = 1.0, timeout: float = 5.0) -> None: """ Restart PX4's SD-card logger after file operations. Sends ``logger start\\n`` via the MAVLink shell. """ self._log.info("Restarting PX4 logger…") await self.send_shell_command("logger start\n", settle_time=settle_time, timeout=timeout) self._log.info("PX4 logger restarted.")
[docs] async def delete_file_via_shell( self, remote_path: str, timeout: float = 5.0, command: str = "rm", ) -> bool: """ Delete a file (or empty directory) on PX4 using a NuttX shell command. This is a fallback for when MAVFTP ``cmd_rm`` fails with ``FileProtected`` (code 9). The shell ``rm`` bypasses the MAVFTP server's protection checks and deletes the file directly via NuttX ``unlink()``. Parameters ---------- remote_path : str Full path on the vehicle, e.g. ``"fs/microsd/log/2025-09-01/07_50_39.ulg"``. The path must start without a leading ``/``. timeout : float Maximum seconds to wait for PX4 shell acknowledgment. command : str NuttX shell command to use. ``"rm"`` for files, ``"rmdir"`` for empty directories. Returns ------- bool ``True`` if the command completed (no error detected in the reply text), ``False`` otherwise. """ # NuttX rm/rmdir expects an absolute path starting with / abs_path = remote_path if remote_path.startswith("/") else f"/{remote_path}" self._log.info(f"Shell {command} fallback: {command} {abs_path}") try: reply = await self.send_shell_command( f"{command} {abs_path}\n", settle_time=0.3, timeout=timeout ) # NuttX rm prints nothing on success; on error it prints # something like "rm: /path: No such file or directory" lower = reply.lower() if reply and ( "no such" in lower or "error" in lower or "failed" in lower or "invalid" in lower or "not empty" in lower ): self._log.warning( f"Shell {command} for {remote_path} returned error: {reply!r}" ) return False self._log.info(f"Shell {command} completed for {remote_path}") return True except Exception as e: self._log.error(f"Shell {command} failed for {remote_path}: {e}") return False
[docs] async def reboot_autopilot( self, reboot_onboard_computer: bool = False, timeout: float = 3.0, ) -> RebootAutopilotResponse: """ Send a reboot command to the autopilot (PX4/ArduPilot). This sends MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN and waits for a COMMAND_ACK response. Parameters ---------- reboot_onboard_computer : bool If True, also reboot the onboard computer. Default is False. timeout : float Maximum time to wait for acknowledgment. Default is 3.0 seconds. Returns ------- RebootAutopilotResponse Structured response indicating success/failure and reason. Raises ------ RuntimeError If MAVLink connection is not established. TimeoutError If no acknowledgment is received within the timeout. Notes ----- After sending this command, the connection to the autopilot will be lost as it reboots. The proxy will attempt to reconnect automatically. """ if not self.connected: raise RuntimeError("MAVLink connection not established") cmd = self.build_reboot_command( reboot_autopilot=True, reboot_onboard_computer=reboot_onboard_computer, ) result = {"ack_received": False, "result": None} def _collector(pkt) -> bool: if pkt.get_type() != "COMMAND_ACK": return False if pkt.command == mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN: result["ack_received"] = True result["result"] = pkt.result return True return False COMMAND_ACK_ID = str(mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_ACK) # Map ACK result codes -> status codes (for failures) _ACK_TO_STATUS = { mavutil.mavlink.MAV_RESULT_DENIED: RebootStatusCode.FAIL_ACK_DENIED, mavutil.mavlink.MAV_RESULT_TEMPORARILY_REJECTED: RebootStatusCode.FAIL_ACK_TEMPORARILY_REJECTED, mavutil.mavlink.MAV_RESULT_UNSUPPORTED: RebootStatusCode.FAIL_ACK_UNSUPPORTED, mavutil.mavlink.MAV_RESULT_FAILED: RebootStatusCode.FAIL_ACK_FAILED, mavutil.mavlink.MAV_RESULT_IN_PROGRESS: RebootStatusCode.FAIL_ACK_IN_PROGRESS, getattr(mavutil.mavlink, "MAV_RESULT_CANCELLED", -999): RebootStatusCode.FAIL_ACK_CANCELLED, } _ACK_NAME = { mavutil.mavlink.MAV_RESULT_ACCEPTED: "ACCEPTED", mavutil.mavlink.MAV_RESULT_TEMPORARILY_REJECTED: "TEMPORARILY_REJECTED", mavutil.mavlink.MAV_RESULT_DENIED: "DENIED", mavutil.mavlink.MAV_RESULT_UNSUPPORTED: "UNSUPPORTED", mavutil.mavlink.MAV_RESULT_FAILED: "FAILED", mavutil.mavlink.MAV_RESULT_IN_PROGRESS: "IN_PROGRESS", getattr(mavutil.mavlink, "MAV_RESULT_CANCELLED", -999): "CANCELLED", } try: await self.send_and_wait( match_key=COMMAND_ACK_ID, request_msg=cmd, collector=_collector, timeout=timeout, ) except TimeoutError: # No ACK: verify reboot using heartbeat timestamps (populated by your heartbeat handler) self._log.warning( "Reboot command sent but no ACK received; verifying via heartbeat drop/return..." ) last_hb = getattr(self, "_last_heartbeat_time", None) if last_hb is None: self._log.warning( "No heartbeat timestamp available (_last_heartbeat_time is None); reboot not confirmed." ) return RebootAutopilotResponse( success=False, status_code=RebootStatusCode.FAIL_NO_HEARTBEAT_TRACKING, reason="No ACK received and heartbeat tracking is unavailable (_last_heartbeat_time is None).", ack_result=None, ) async def _wait_for_heartbeat_drop( wait_window_s: float = 2.0, gap_s: float = 1.0, poll_s: float = 0.05, ) -> bool: deadline = time.time() + wait_window_s while time.time() < deadline: last = getattr(self, "_last_heartbeat_time", None) if last is not None and (time.time() - last) >= gap_s: return True await asyncio.sleep(poll_s) return False async def _wait_for_heartbeat_return( since_ts: float, wait_window_s: float = 30.0, poll_s: float = 0.05, ) -> bool: deadline = time.time() + wait_window_s while time.time() < deadline: last = getattr(self, "_last_heartbeat_time", None) if last is not None and last > since_ts: return True await asyncio.sleep(poll_s) return False dropped = await _wait_for_heartbeat_drop() if not dropped: self._log.warning("No ACK and no heartbeat drop observed; reboot not confirmed.") return RebootAutopilotResponse( success=False, status_code=RebootStatusCode.FAIL_REBOOT_NOT_CONFIRMED_NO_HB_DROP, reason="No ACK received and heartbeat did not drop within the expected window.", ack_result=None, ) drop_mark = getattr(self, "_last_heartbeat_time", last_hb) or last_hb returned = await _wait_for_heartbeat_return(since_ts=drop_mark) if not returned: self._log.warning("Heartbeat dropped but did not return within the reboot window; reboot not confirmed.") return RebootAutopilotResponse( success=False, status_code=RebootStatusCode.FAIL_REBOOT_NOT_CONFIRMED_HB_NO_RETURN, reason="Heartbeat drop observed but heartbeat did not return within the reboot window.", ack_result=None, ) self._log.info("Reboot confirmed via heartbeat drop + return.") return RebootAutopilotResponse( success=True, status_code=RebootStatusCode.OK_REBOOT_CONFIRMED_NO_ACK, reason="No ACK received, but reboot confirmed via heartbeat drop + return.", ack_result=None, ) # ACK path if result["ack_received"]: ack_val = result["result"] ack_name = _ACK_NAME.get(ack_val, f"UNKNOWN({ack_val})") if ack_val == mavutil.mavlink.MAV_RESULT_ACCEPTED: self._log.info("Reboot command accepted by autopilot") return RebootAutopilotResponse( success=True, status_code=RebootStatusCode.OK_ACK_ACCEPTED, reason="Autopilot acknowledged the reboot command (ACCEPTED).", ack_result=ack_val, ) status = _ACK_TO_STATUS.get(ack_val, RebootStatusCode.FAIL_ACK_UNKNOWN) self._log.warning(f"Reboot command rejected with result: {ack_val} ({ack_name})") return RebootAutopilotResponse( success=False, status_code=status, reason=f"Autopilot rejected the reboot command: {ack_name}.", ack_result=ack_val, ) # Rare: send_and_wait returned without TimeoutError but collector never matched return RebootAutopilotResponse( success=False, status_code=RebootStatusCode.FAIL_NO_ACK_MATCH, reason="send_and_wait returned but no matching COMMAND_ACK for reboot command was observed.", ack_result=None, )
[docs] async def reboot_autopilot_confirmed( self, reboot_onboard_computer: bool = False, ack_timeout: float = 3.0, hb_drop_window_s: float = 5.0, hb_drop_gap_s: float = 1.5, hb_return_window_s: float = 30.0, hb_poll_interval_s: float = 0.05, sitl_mode: bool = False, ) -> RebootAutopilotResponse: """ Send a reboot command and confirm success via heartbeat drop + return. Unlike :meth:`reboot_autopilot`, heartbeat confirmation is the **primary** verification method, not a fallback. The flow is: 1. Send MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN and wait for COMMAND_ACK. - If the ACK is a rejection, return immediately with failure. - If the ACK is accepted *or* times out, proceed to step 2. 2. Wait for heartbeat to **drop** (gap ≥ ``hb_drop_gap_s`` within ``hb_drop_window_s``). 3. Wait for heartbeat to **return** (new heartbeat after the drop within ``hb_return_window_s``). 4. Return success only when heartbeat returns, confirming the autopilot has rebooted and is alive again. Parameters ---------- reboot_onboard_computer : bool If True, also reboot the onboard computer. ack_timeout : float Maximum time (s) to wait for COMMAND_ACK. hb_drop_window_s : float Time window (s) to detect heartbeat drop after command. hb_drop_gap_s : float Minimum gap (s) without a heartbeat to consider it dropped. hb_return_window_s : float Time window (s) to wait for heartbeat to return after drop. hb_poll_interval_s : float Polling interval (s) for heartbeat checks. sitl_mode : bool If True, treat a DENIED ACK as acceptable instead of failing immediately. This allows SITL testing where the user manually kills and restarts the ``px4_sitl`` process to trigger the heartbeat drop + return cycle. Returns ------- RebootAutopilotResponse """ if not self.connected: raise RuntimeError("MAVLink connection not established") # ── Step 1: Send reboot command and attempt to get ACK ──────────── cmd = self.build_reboot_command( reboot_autopilot=True, reboot_onboard_computer=reboot_onboard_computer, ) result = {"ack_received": False, "result": None} def _collector(pkt) -> bool: if pkt.get_type() != "COMMAND_ACK": return False if pkt.command == mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN: result["ack_received"] = True result["result"] = pkt.result return True return False COMMAND_ACK_ID = str(mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_ACK) _ACK_TO_STATUS = { mavutil.mavlink.MAV_RESULT_DENIED: RebootStatusCode.FAIL_ACK_DENIED, mavutil.mavlink.MAV_RESULT_TEMPORARILY_REJECTED: RebootStatusCode.FAIL_ACK_TEMPORARILY_REJECTED, mavutil.mavlink.MAV_RESULT_UNSUPPORTED: RebootStatusCode.FAIL_ACK_UNSUPPORTED, mavutil.mavlink.MAV_RESULT_FAILED: RebootStatusCode.FAIL_ACK_FAILED, mavutil.mavlink.MAV_RESULT_IN_PROGRESS: RebootStatusCode.FAIL_ACK_IN_PROGRESS, getattr(mavutil.mavlink, "MAV_RESULT_CANCELLED", -999): RebootStatusCode.FAIL_ACK_CANCELLED, } _ACK_NAME = { mavutil.mavlink.MAV_RESULT_ACCEPTED: "ACCEPTED", mavutil.mavlink.MAV_RESULT_TEMPORARILY_REJECTED: "TEMPORARILY_REJECTED", mavutil.mavlink.MAV_RESULT_DENIED: "DENIED", mavutil.mavlink.MAV_RESULT_UNSUPPORTED: "UNSUPPORTED", mavutil.mavlink.MAV_RESULT_FAILED: "FAILED", mavutil.mavlink.MAV_RESULT_IN_PROGRESS: "IN_PROGRESS", getattr(mavutil.mavlink, "MAV_RESULT_CANCELLED", -999): "CANCELLED", } ack_received = False ack_val = None try: await self.send_and_wait( match_key=COMMAND_ACK_ID, request_msg=cmd, collector=_collector, timeout=ack_timeout, ) except TimeoutError: self._log.warning( "Reboot command sent but no ACK received; proceeding to heartbeat confirmation..." ) if result["ack_received"]: ack_received = True ack_val = result["result"] ack_name = _ACK_NAME.get(ack_val, f"UNKNOWN({ack_val})") # If ACK is a rejection, fail immediately — no point waiting for heartbeat # Exception: in SITL mode, DENIED is expected (simulator cannot reboot) # so we skip the failure and proceed to heartbeat confirmation, # allowing the user to manually kill/restart px4_sitl. if ack_val != mavutil.mavlink.MAV_RESULT_ACCEPTED: if sitl_mode and ack_val == mavutil.mavlink.MAV_RESULT_DENIED: self._log.warning( f"SITL_MODE: Reboot ACK was DENIED (expected for SITL). " f"Proceeding to heartbeat confirmation — manually kill and " f"restart px4_sitl within {hb_drop_window_s}s to trigger " f"heartbeat drop..." ) else: status = _ACK_TO_STATUS.get(ack_val, RebootStatusCode.FAIL_ACK_UNKNOWN) self._log.warning(f"Reboot command rejected with result: {ack_val} ({ack_name})") return RebootAutopilotResponse( success=False, status_code=status, reason=f"Autopilot rejected the reboot command: {ack_name}.", ack_result=ack_val, ) else: self._log.info("Reboot command accepted by autopilot, proceeding to heartbeat confirmation...") # ── Step 2: Heartbeat drop confirmation (primary check) ─────────── last_hb = getattr(self, "_last_heartbeat_time", None) if last_hb is None: self._log.warning("No heartbeat timestamp available; cannot confirm reboot via heartbeat.") return RebootAutopilotResponse( success=False, status_code=RebootStatusCode.FAIL_NO_HEARTBEAT_TRACKING, reason="Heartbeat tracking is unavailable (_last_heartbeat_time is None). Cannot confirm reboot.", ack_result=ack_val, ) async def _wait_for_heartbeat_drop() -> bool: deadline = time.time() + hb_drop_window_s while time.time() < deadline: last = getattr(self, "_last_heartbeat_time", None) if last is not None and (time.time() - last) >= hb_drop_gap_s: return True await asyncio.sleep(hb_poll_interval_s) return False dropped = await _wait_for_heartbeat_drop() if not dropped: self._log.warning("Heartbeat did not drop within the expected window; reboot not confirmed.") return RebootAutopilotResponse( success=False, status_code=RebootStatusCode.FAIL_REBOOT_NOT_CONFIRMED_NO_HB_DROP, reason=( f"Heartbeat did not drop within {hb_drop_window_s}s window " f"(gap threshold: {hb_drop_gap_s}s). Reboot not confirmed." ), ack_result=ack_val, ) drop_mark = getattr(self, "_last_heartbeat_time", last_hb) or last_hb self._log.info("Heartbeat drop detected, waiting for heartbeat to return...") # ── Step 3: Heartbeat return confirmation ───────────────────────── async def _wait_for_heartbeat_return(since_ts: float) -> bool: deadline = time.time() + hb_return_window_s while time.time() < deadline: last = getattr(self, "_last_heartbeat_time", None) if last is not None and last > since_ts: return True await asyncio.sleep(hb_poll_interval_s) return False returned = await _wait_for_heartbeat_return(since_ts=drop_mark) if not returned: self._log.warning("Heartbeat dropped but did not return; reboot not confirmed.") return RebootAutopilotResponse( success=False, status_code=RebootStatusCode.FAIL_REBOOT_NOT_CONFIRMED_HB_NO_RETURN, reason=( f"Heartbeat drop observed but heartbeat did not return within " f"{hb_return_window_s}s. Autopilot may still be rebooting." ), ack_result=ack_val, ) # ── Step 4: Confirmed ───────────────────────────────────────────── if ack_received: self._log.info("Reboot confirmed: ACK accepted + heartbeat drop + return.") return RebootAutopilotResponse( success=True, status_code=RebootStatusCode.OK_ACK_ACCEPTED_HB_CONFIRMED, reason="Reboot confirmed: COMMAND_ACK accepted and heartbeat drop + return observed.", ack_result=ack_val, ) else: self._log.info("Reboot confirmed via heartbeat drop + return (no ACK received).") return RebootAutopilotResponse( success=True, status_code=RebootStatusCode.OK_HB_CONFIRMED_NO_ACK, reason="Reboot confirmed via heartbeat drop + return (no COMMAND_ACK received).", ack_result=None, )
[docs] async def get_param(self, name: str, timeout: float = 3.0) -> Dict[str, Any]: """ Request a single PARAM_VALUE for `name` and return a dict: {"name": str, "value": Union[int,float], "raw": float, "type": int, "count": int, "index": int} Raises TimeoutError if no reply within timeout. """ if not self.connected: raise RuntimeError("MAVLink connection not established") req = self.build_param_request_read(name, index=-1) result = {"got": False, "data": None} def _collector(pkt) -> bool: # Ensure we only process PARAM_VALUE if pkt.get_type() != "PARAM_VALUE": return False # Use the decoding method for proper INT32 handling pkt_name, decoded_value = self._decode_param_value(pkt) if pkt_name != name: return False result["got"] = True result["data"] = { "name": pkt_name, "value": decoded_value, "raw": float(pkt.param_value), "type": pkt.param_type, "count": pkt.param_count, "index": pkt.param_index, } return True # You dispatch by both msg ID string and type; using type keeps it readable. await self.send_and_wait( match_key="PARAM_VALUE", request_msg=req, collector=_collector, timeout=timeout, ) if not result["got"]: raise TimeoutError(f"No PARAM_VALUE received for {name}") return result["data"]
[docs] async def get_all_params(self, timeout: float = 10.0): """ Request entire parameter list and return: { "<NAME>": {"value": int|float, "raw": float, "type": int, "index": int, "count": int}, ... } """ if not self.connected: raise RuntimeError("MAVLink connection not established") req = self.build_param_request_list() params = {} seen = set() expected_total = {"val": None} def _collector(pkt) -> bool: if pkt.get_type() != "PARAM_VALUE": return False if expected_total["val"] is None: expected_total["val"] = pkt.param_count # Use the decoding method for proper INT32 handling name, decoded_value = self._decode_param_value(pkt) if (name, pkt.param_index) in seen: # duplicate frame—ignore; can happen with lossy links pass else: seen.add((name, pkt.param_index)) params[name] = { "value": decoded_value, "raw": float(pkt.param_value), "type": pkt.param_type, "index": pkt.param_index, "count": pkt.param_count, } # Stop when we've collected all expected params return (expected_total["val"] is not None) and (len(params) >= expected_total["val"]) await self.send_and_wait( match_key="PARAM_VALUE", request_msg=req, collector=_collector, timeout=timeout, ) return params
[docs] async def set_param(self, name: str, value: Any, ptype: Optional[int] = None, timeout: float = 3.0) -> Dict[str, Any]: """ Set a parameter and confirm by reading back. `value` can be int or float. Returns the confirmed PARAM_VALUE dict (same shape as get_param()). Uses proper INT32 encoding where int32 values are encoded as float32 bits for wire transmission. >>> ["MAV_PARAM_TYPE"] = { >>> [1] = "MAV_PARAM_TYPE_UINT8", >>> [2] = "MAV_PARAM_TYPE_INT8", >>> [3] = "MAV_PARAM_TYPE_UINT16", >>> [4] = "MAV_PARAM_TYPE_INT16", >>> [5] = "MAV_PARAM_TYPE_UINT32", >>> [6] = "MAV_PARAM_TYPE_INT32", >>> [7] = "MAV_PARAM_TYPE_UINT64", >>> [8] = "MAV_PARAM_TYPE_INT64", >>> [9] = "MAV_PARAM_TYPE_REAL32", >>> [10] = "MAV_PARAM_TYPE_REAL64", >>> } """ # Pick a MAV_PARAM_TYPE based on Python type (simple heuristic) if ptype is None: if isinstance(value, int): ptype = mavutil.mavlink.MAV_PARAM_TYPE_INT32 elif isinstance(value, float): ptype = mavutil.mavlink.MAV_PARAM_TYPE_REAL32 else: self._log.warning(f"Unsupported parameter type for {name}: {type(value)}") raise ValueError(f"Unsupported parameter type for {name}: {type(value)}") # Build the PARAM_SET message with proper encoding req = self.build_param_set(name, value, ptype) # Send the parameter set command self.send("mav", req) # Wait for confirmation by reading back the parameter try: return await self.get_param(name, timeout=timeout) except TimeoutError: # Fall back to an explicit read if the echo was missed return await self.get_param(name, timeout=timeout)
[docs] async def send_and_wait( self, *, match_key: str, request_msg: mavutil.mavlink.MAVLink_message, collector: Callable[[mavutil.mavlink.MAVLink_message], bool], timeout: float = 3.0, queue_length: Optional[int] = None, cancel_event: Optional[threading.Event] = None, ) -> None: """ Transmit *request_msg*, register a handler on *match_key* and keep feeding incoming packets to *collector* until it returns **True** or *timeout* expires. Parameters ---------- match_key : The key used when the proxy dispatches inbound messages (numeric ID as string, e.g. `"147"`). request_msg : Encoded MAVLink message to send - COMMAND_LONG, LOG_REQUEST_LIST, ... collector : Callback that receives each matching packet. Must return **True** once the desired condition is satisfied; returning **False** keeps waiting. timeout : Maximum seconds to block. queue_length : Optional maximum queue length for the handler. If the queue exceeds this length, older packets will be dropped. If None, the default queue length is used. cancel_event : Optional threading.Event that can be set to cancel the wait. Raises ------ RuntimeError If MAVLink connection is not established. TimeoutError If no matching response is received within the timeout. """ if not self.connected: raise RuntimeError("MAVLink connection not established") loop = asyncio.get_running_loop() done = asyncio.Event() async def _handler(pkt): try: if collector(pkt): loop.call_soon_threadsafe(done.set) except Exception as exc: self._log.error(f"[collector] raised: {exc}") self.register_handler( key = match_key, fn = _handler, queue_length = queue_length ) # send the request message after registering the handler self.send("mav", request_msg) # create an asyncio task to monitor cancel_event if provided if cancel_event is not None: def _cancel_checker(): while not done.is_set(): if cancel_event.is_set(): loop.call_soon_threadsafe(done.set) break time.sleep(0.1) cancel_checker = threading.Thread(target=_cancel_checker, daemon=True, name="_mavlink_send_and_wait_cancel_checker") cancel_checker.start() try: await asyncio.wait_for(done.wait(), timeout=timeout) except asyncio.TimeoutError: raise TimeoutError(f"No reply/condition for {match_key} in {timeout}s") finally: if cancel_event is not None: cancel_checker.join(timeout=0.1) self.unregister_handler(match_key, _handler)
[docs] async def get_log_entries( self, *, msg_id: str, request_msg: mavutil.mavlink.MAVLink_message, timeout: float = 8.0, stale_timeout: float = 2.0, ) -> Dict[int, Dict[str, int]]: """ Send LOG_REQUEST_LIST and gather all LOG_ENTRY packets. Handles two edge cases that PX4 firmware exhibits: 1. **No logs on the vehicle** — PX4 responds with a single ``LOG_ENTRY`` where ``num_logs == 0``. The MAVLink spec requires this sentinel packet; we detect it and return an empty dict immediately instead of waiting for the timeout. 2. **Corrupted / missing log entries** — PX4 may advertise ``num_logs = N`` but only deliver fewer than *N* ``LOG_ENTRY`` packets (corrupted slots are silently skipped). A staleness timer (``stale_timeout``) detects when no new entry has arrived for a while and returns whatever was collected so far. Parameters ---------- msg_id : str The MAVLink message ID string for LOG_ENTRY (e.g. ``"118"``). request_msg : mavutil.mavlink.MAVLink_message The encoded LOG_REQUEST_LIST message to send. timeout : float Hard upper-bound on how long to wait (seconds). stale_timeout : float If no new LOG_ENTRY arrives within this many seconds *after* the first one, collection stops early and whatever entries were gathered are returned. Only applies when ``num_logs > 0`` and fewer than ``num_logs`` entries have been received. Default 2.0 s. Returns ------- Dict[int, Dict[str, int]] ``{log_id: {"size": int, "utc": int}, …}`` """ entries: Dict[int, Dict[str, int]] = {} expected_total: Dict[str, Optional[int]] = {"val": None} last_entry_time: Dict[str, float] = {"t": time.time()} def _collector(pkt) -> bool: if expected_total["val"] is None: expected_total["val"] = pkt.num_logs # ── No logs on the vehicle ──────────────────────────────── # PX4 sends LOG_ENTRY(id=0, num_logs=0) as a sentinel. if expected_total["val"] == 0: self._log.info("Vehicle reports num_logs=0 — no logs on SD card.") return True # signal done immediately entries[pkt.id] = {"size": pkt.size, "utc": pkt.time_utc} last_entry_time["t"] = time.time() return len(entries) >= expected_total["val"] # ── Staleness monitor ───────────────────────────────────────── # If PX4 stops sending LOG_ENTRY packets before we reach the # advertised num_logs (e.g. corrupted logs), the collector will # never return True and we'd wait the full timeout. This task # detects the stall and signals completion early via cancel_event. cancel_evt = threading.Event() async def _stale_watchdog(): while not cancel_evt.is_set(): await asyncio.sleep(0.25) if expected_total["val"] is not None and expected_total["val"] > 0: elapsed = time.time() - last_entry_time["t"] if elapsed >= stale_timeout and len(entries) > 0: got = len(entries) want = expected_total["val"] self._log.warning( f"LOG_ENTRY stream stalled — received {got}/{want} entries " f"(no new entry for {elapsed:.1f}s). " f"Returning partial results; {want - got} entries may be corrupted/missing." ) cancel_evt.set() # unblocks send_and_wait watchdog_task = asyncio.create_task(_stale_watchdog()) try: await self.send_and_wait( match_key=msg_id, request_msg=request_msg, collector=_collector, timeout=timeout, cancel_event=cancel_evt, ) except TimeoutError: # If we collected *some* entries before the hard timeout, return # them instead of propagating the error. if entries: got = len(entries) want = expected_total.get("val") self._log.warning( f"Hard timeout ({timeout}s) reached while collecting LOG_ENTRY packets. " f"Returning {got}/{want} entries collected so far." ) else: raise # genuinely got nothing — let caller handle it finally: cancel_evt.set() watchdog_task.cancel() try: await watchdog_task except asyncio.CancelledError: pass return entries
async def _request_chunk( self, log_id: int, ofs: int, chunk_size: int, max_retries: int = 3, timeout: float = 5.0, ) -> mavutil.mavlink.MAVLink_log_data_message: LOG_DATA_ID = str(mavutil.mavlink.MAVLINK_MSG_ID_LOG_DATA) # Holder for this specific chunk holder_lock = threading.Lock() holder: dict[str, mavutil.mavlink.MAVLink_log_data_message] = {} def _collector(pkt) -> bool: # Only accept LOG_DATA for the correct log id if getattr(pkt, "id", None) != log_id: return False # Enforce the expected offset to avoid eating someone else's chunk if int(pkt.ofs) != ofs: return False with holder_lock: holder["pkt"] = pkt return True # Build LOG_REQUEST_DATA for *this* chunk req = self.master.mav.log_request_data_encode( self.master.target_system, self.master.target_component, log_id, ofs, chunk_size, ) # Try a few times per chunk attempt = 0 while True: try: await self.send_and_wait( match_key=LOG_DATA_ID, request_msg=req, collector=_collector, timeout=timeout, ) break # got the chunk except TimeoutError: attempt += 1 if attempt >= max_retries: raise TimeoutError( f"Timeout while waiting for LOG_DATA " f"log_id={log_id} ofs={ofs} after {max_retries} attempts" ) self._log.warning( f"Timeout for LOG_DATA (log_id={log_id}, ofs={ofs}), " f"retry {attempt}/{max_retries}" ) # re-send the same request and loop again return holder["pkt"] def _request_log_sync( self, *, log_id: int, completed_event: threading.Event, timeout: float = 60.0, size_bytes: Optional[int] = None, cancel_event: threading.Event | None = None, callback: Optional[Callable[[int], None]] = None, end_of_buffer_timeout: float = 3.0, ) -> bytes: """ Synchronous helper: request LOG_DATA for a given log_id and return raw bytes. Assembles data by offset so out-of-order / duplicate packets are tolerated. """ self._log.info(f"Attempting to download Log ID: {log_id}") # 1) Send LOG_REQUEST_DATA (from offset 0, full log) self.master.mav.log_request_data_send( self.master.target_system, self.master.target_component, log_id, 0, # offset 0xFFFFFFFF # count: all data from offset ) self._log.info(f"Requested data for Log ID {log_id}, waiting for data...") # Chunks keyed by offset received_chunks: dict[int, bytes] = {} max_end = 0 # highest offset+length we've seen total_unique_bytes = 0 # bytes from non-duplicate chunks completed_event.clear() start_time = time.time() last_data_time = start_time with self._mav_lock: while True: now = time.time() # Global timeout if timeout is not None and (now - start_time) > timeout: self._log.error("Timeout while downloading log data (global timeout).") raise TimeoutError("Log data download timed out.") # End-of-buffer timeout (only after we've actually received something) if ( end_of_buffer_timeout is not None and total_unique_bytes > 0 and (now - last_data_time) > end_of_buffer_timeout ): self._log.info( f"No new data for {end_of_buffer_timeout:.1f}s, " "assuming end of log." ) break # Cancellation support if cancel_event is not None and cancel_event.is_set(): self._log.warning("Log download cancelled.") break # Use a small timeout so the loop can check the conditions above msg = self.master.recv_match( type="LOG_DATA", blocking=True, timeout=1.0, ) if msg is None: # No message in this 1-second interval; loop again continue if msg.id != log_id: # Not our log; ignore continue ofs = int(msg.ofs) count = int(msg.count) data_bytes = bytes(msg.data[:count]) end = ofs + count # Duplicate chunk (same starting offset): ignore if ofs in received_chunks: self._log.debug( f"Duplicate LOG_DATA chunk at ofs={ofs}, count={count}, ignoring." ) # We *could* update last_data_time here since link is still active, # but usually you care about new data only. continue # New chunk received_chunks[ofs] = data_bytes total_unique_bytes += len(data_bytes) last_data_time = time.time() if end > max_end: max_end = end # Progress callback (sync) if callback is not None: try: callback(total_unique_bytes) except Exception as e: self._log.warning(f"Callback raised exception: {e!r}") # If we know the size, we can stop once we have at least that range if size_bytes is not None and max_end >= size_bytes: self._log.info( f"Reached expected size ({size_bytes} bytes) for log {log_id}." ) break # --- Assemble the final buffer --- if size_bytes is not None: total_size = size_bytes else: total_size = max_end # best guess from largest end offset if total_size <= 0: self._log.warning( f"No LOG_DATA received for log {log_id}, returning empty bytes." ) completed_event.set() return b"" result = bytearray(total_size) sorted_ofs = sorted(received_chunks.keys()) current = 0 for ofs in sorted_ofs: chunk = received_chunks[ofs] end = ofs + len(chunk) # Detect gaps between chunks if ofs > current: self._log.warning( f"Gap detected in log {log_id}: {current} -> {ofs} " "(missing bytes in this range)." ) # Skip chunks fully beyond the expected size if ofs >= total_size: self._log.warning( f"Chunk at ofs={ofs} (len={len(chunk)}) is beyond total_size={total_size}, skipping." ) continue # Trim chunk if it extends past total_size if end > total_size: chunk = chunk[: total_size - ofs] end = total_size result[ofs:end] = chunk current = max(current, end) self._log.info( f"Total unique bytes received for log {log_id}: {total_unique_bytes}, " f"assembled length={len(result)}" ) completed_event.set() return bytes(result)
[docs] async def download_log( self, *, log_id: int, completed_event: threading.Event, timeout: float = 60.0, buffer: Optional[bytearray] = None, callback: Optional[Callable[[int], Awaitable[None]]] = None, end_of_buffer_timeout: float = 10.0, size_bytes: Optional[int] = None ) -> bytes: """ Download one log file via LOG_REQUEST_DATA / LOG_DATA. Parameters ---------- log_id : The LOG_ENTRY.id of the log to download. completed_event : threading.Event that will be set when download completes. timeout : Total time allowed for the whole transfer. buffer : Optional bytearray to use as the download buffer. callback : Optional async function called as callback(received_bytes) after each LOG_DATA packet is processed. end_of_buffer_timeout : Timeout in seconds to wait for new data before aborting. size_bytes : Optional total size of the log in bytes., if known. Returns ------- bytes Raw log bytes (ULog/Dataflash). Raises ------ RuntimeError If MAVLink connection is not established. TimeoutError If no complete log is received within the timeout. """ if not self.connected: raise RuntimeError("MAVLink connection not established") cancel_event = threading.Event() # If you want to support an *async* callback, we wrap it so the sync loop = asyncio.get_running_loop() def sync_callback(received_bytes: int): if callback is not None: # Schedule the async callback safely from another thread loop.call_soon_threadsafe( asyncio.create_task, callback(received_bytes), ) sync_cb = sync_callback if callback is not None else None try: with self._download_lock: log_bytes: bytes = await asyncio.to_thread( self._request_log_sync, log_id=log_id, completed_event=completed_event, timeout=timeout, size_bytes=size_bytes, cancel_event=cancel_event, callback=sync_cb, end_of_buffer_timeout=end_of_buffer_timeout, ) except TimeoutError as exc: self._log.error(f"Failed to download log {log_id}: {exc}") raise except Exception as exc: self._log.error(f"Error downloading log {log_id}: {exc}") raise # process all messages in the buffer if buffer is None: buffer = bytearray() # sort messages and data by offset sorted_msgs = sorted(zip(msgs_ofs, msgs_data), key=lambda x: x[0]) msgs_ofs, msgs_data = zip(*sorted_msgs) if sorted_msgs else ([], []) # verify the integrity and reconstruct the log last_chunk_details = {"ofs": -1, "count": -1} bytes_received = 0 for ofs, data in zip(msgs_ofs, msgs_data): count = len(data) if count < 0: count = 0 # make sure the ofs is as expected (increasing in increments of count) if last_chunk_details["ofs"] != -1: expected_ofs = last_chunk_details["ofs"] + last_chunk_details["count"] else: expected_ofs = 0 if ofs != expected_ofs: self._log.warning( f"Log {log_id} chunk out of order or missing: " f"expected ofs={expected_ofs}, " f"got ofs={ofs}" ) # rerequest the missing chunk(s) with self._download_lock: try: pkt_expected = await self._request_chunk( log_id=log_id, ofs=expected_ofs, chunk_size=90, max_retries=3, timeout=timeout ) ofs = int(pkt_expected.ofs) count = int(pkt_expected.count) if count < 0: count = 0 data = bytes(pkt_expected.data[:count]) except TimeoutError as exc: self._log.error(f"Failed to download log {log_id} at ofs={expected_ofs}: {exc}") raise except Exception as exc: self._log.error(f"Error downloading log {log_id} at ofs={expected_ofs}: {exc}") raise # Append this chunk into the buffer needed_len = ofs + count if len(buffer) < needed_len: buffer.extend(b"\x00" * (needed_len - len(buffer))) buffer[ofs:ofs + count] = data bytes_received += count # Progress callback (in bytes) if callback is not None: try: await callback(bytes_received, "processing") except Exception as exc: self._log.warning(f"download_log callback raised: {exc}") last_chunk_details["ofs"] = ofs last_chunk_details["count"] = count completed_event.set() return bytes(buffer)
[docs] async def download_log_buffered( self, *, log_id: int, completed_event: threading.Event, size_bytes: Optional[int] = None, timeout: float = 3.0, chunk_size: int = 90, max_retries: int = 3, cancel_event: threading.Event | None = None, buffer: Optional[bytearray] = None, callback: Optional[Callable[[int], Awaitable[None]]] = None ) -> bytes: """ Download one log file via repeated LOG_REQUEST_DATA / LOG_DATA exchanges. Strategy: * For ofs = 0, chunk_size, 2*chunk_size, ... * send LOG_REQUEST_DATA(log_id, ofs, chunk_size) * wait for LOG_DATA(log_id, ofs, ...) * append data[:count] * stop when: * count == 0 (end-of-log by spec), or * ofs + count >= size_bytes (if known) Parameters ---------- log_id : LOG_ENTRY.id of the log to download. completed_event : threading.Event set when download is fully complete. size_bytes : Optional total size of the log from LOG_ENTRY.size; if given, used as termination condition and sanity cap. timeout : Timeout per chunk request (seconds). chunk_size : Requested count per chunk. For MAVLink v1, LOG_DATA carries 90 bytes. max_retries : Number of retries per chunk on timeout. cancel_event : Optional threading.Event to cancel the download. buffer : Optional bytearray; if None, a new one is created. callback : Optional sync or async callback called as callback(total_bytes_received) after each successful chunk. Returns ------- bytes Raw log data. """ if not self.connected: raise RuntimeError("MAVLink connection not established") if buffer is None: buffer = bytearray() LOG_DATA_ID = str(mavutil.mavlink.MAVLINK_MSG_ID_LOG_DATA) ofs = 0 while True: if cancel_event and cancel_event.is_set(): # Let caller decide what to do; could also raise here self._log.info(f"Download of log {log_id} cancelled at ofs={ofs}") break # If we know the size and we've already covered it, stop if size_bytes is not None and ofs >= size_bytes: break try: pkt = await self._request_chunk( log_id=log_id, ofs=ofs, chunk_size=chunk_size, max_retries=max_retries, timeout=timeout ) except TimeoutError as exc: self._log.error(f"Failed to download log {log_id} at ofs={ofs}: {exc}") raise count = int(pkt.count) if count < 0: count = 0 # Termination condition from spec: count == 0 => end-of-log if count == 0: break # Append this chunk into the buffer needed_len = ofs + count if len(buffer) < needed_len: buffer.extend(b"\x00" * (needed_len - len(buffer))) buffer[ofs:ofs + count] = bytes(pkt.data[:count]) # Progress callback (in bytes) if callback is not None: try: await callback(len(buffer)) except Exception as exc: self._log.warning(f"download_log callback raised: {exc}") ofs += count # If we know the total size and we've just covered it, stop if size_bytes is not None and ofs >= size_bytes: completed_event.set() break # If size_bytes is unknown and we received less than chunk_size, # it's probably the last chunk, so we can stop. if size_bytes is None and count < chunk_size: completed_event.set() break return bytes(buffer)
[docs] async def set_params_bulk_lossy( self, params_to_set: Dict[str, ParamSpec], *, timeout_total: float = 8.0, max_retries: int = 3, max_in_flight: int = 8, resend_interval: float = 0.8, inter_send_delay: float = 0.01, verify_ack_value: bool = True, ) -> Dict[str, Dict[str, Any]]: """ Lossy-link bulk PARAM_SET: - User can provide optional type per param as "UINT8", "INT16", "REAL32", etc. - If type omitted -> auto: int -> INT32, float -> REAL32 - Windowed sends + periodic resend + retry cap - Confirms via echoed PARAM_VALUE Returns: confirmed {name: meta_dict} """ if not self.connected: raise RuntimeError("MAVLink connection not established") loop = asyncio.get_running_loop() # Normalize input into: desired[name] = (value, ptype_int_or_None) desired: Dict[str, Tuple[Any, Optional[int]]] = {} for k, spec in params_to_set.items(): name = self._norm_name(k.encode("ascii")) if isinstance(spec, tuple) and len(spec) == 2: value, ptype = spec desired[name] = (value, _parse_param_type(ptype)) elif isinstance(spec, dict): if "value" not in spec: raise ValueError(f"Param '{k}' dict spec must include 'value'") value = spec["value"] ptype = spec.get("type", None) desired[name] = (value, _parse_param_type(ptype)) else: desired[name] = (spec, None) def infer_type(value: Any) -> int: if isinstance(value, int): return mavutil.mavlink.MAV_PARAM_TYPE_INT32 if isinstance(value, float): return mavutil.mavlink.MAV_PARAM_TYPE_REAL32 raise ValueError(f"Unsupported value type: {type(value)} (expected int or float)") def values_match(want: Any, got: Any) -> bool: if isinstance(want, int): try: return int(got) == int(want) except Exception: return False try: return abs(float(got) - float(want)) <= 1e-4 except Exception: return False pending = set(desired.keys()) attempts: Dict[str, int] = {n: 0 for n in pending} last_sent: Dict[str, float] = {n: 0.0 for n in pending} confirmed: Dict[str, Dict[str, Any]] = {} queue = list(desired.keys()) in_flight = 0 done_evt = asyncio.Event() async def _send_set(name: str) -> None: nonlocal in_flight value, ptype_opt = desired[name] ptype = ptype_opt if ptype_opt is not None else infer_type(value) msg = self.build_param_set(name, value, ptype) self.send("mav", msg) attempts[name] += 1 last_sent[name] = time.monotonic() in_flight += 1 if inter_send_delay > 0: await asyncio.sleep(inter_send_delay) async def _fill_window_from_queue() -> None: nonlocal in_flight while in_flight < max_in_flight and queue: n = queue.pop(0) if n in pending and attempts[n] < max_retries: await _send_set(n) async def _resender_loop() -> None: nonlocal in_flight while not done_evt.is_set(): now = time.monotonic() # initial sends await _fill_window_from_queue() # resend timed-out pendings if in_flight < max_in_flight: for n in list(pending): if attempts[n] >= max_retries: continue if (now - last_sent[n]) >= resend_interval and in_flight < max_in_flight: await _send_set(n) await asyncio.sleep(0.05) async def _handler(pkt): try: if pkt.get_type() != "PARAM_VALUE": return pname, decoded_value = self._decode_param_value(pkt) if pname not in pending: return def _apply(): nonlocal in_flight if pname not in pending: return want_value, _want_ptype = desired[pname] if verify_ack_value and not values_match(want_value, decoded_value): # not confirmed; free one slot and let retry logic resend in_flight = max(0, in_flight - 1) return confirmed[pname] = { "name": pname, "value": decoded_value, "raw": float(pkt.param_value), "type": pkt.param_type, "count": pkt.param_count, "index": pkt.param_index, } pending.remove(pname) in_flight = max(0, in_flight - 1) if not pending: done_evt.set() loop.call_soon_threadsafe(_apply) except Exception as exc: self._log.error(f"[bulk_set handler] raised: {exc}") self.register_handler(key="PARAM_VALUE", fn=_handler) try: # Start sending + resending await _fill_window_from_queue() resender_task = asyncio.create_task(_resender_loop()) try: await asyncio.wait_for(done_evt.wait(), timeout=timeout_total) except asyncio.TimeoutError: pass finally: resender_task.cancel() with contextlib.suppress(asyncio.CancelledError): await resender_task return confirmed finally: self.unregister_handler("PARAM_VALUE", _handler)
[docs] async def get_params_bulk_lossy( self, names: Iterable[str], *, timeout_total: float = 6.0, max_retries: int = 3, max_in_flight: int = 10, resend_interval: float = 0.7, inter_send_delay: float = 0.01, ) -> Dict[str, Dict[str, Any]]: """ Lossy-link bulk GET using PARAM_REQUEST_READ by name. Strategy: - Register ONE PARAM_VALUE handler. - Send read requests in a window (max_in_flight). - Periodically resend still-pending names (resend_interval) up to max_retries. - Stop when all received or timeout_total. Returns: { name: {"name","value","raw","type","count","index"}, ... } (Partial results if timeout_total hits.) """ if not self.connected: raise RuntimeError("MAVLink connection not established") loop = asyncio.get_running_loop() # Normalize requested names (same normalization used by _decode_param_value) names_list = [self._norm_name(n.encode("ascii")) for n in names] pending = set(names_list) attempts: Dict[str, int] = {n: 0 for n in names_list} last_sent: Dict[str, float] = {n: 0.0 for n in names_list} results: Dict[str, Dict[str, Any]] = {} done_evt = asyncio.Event() queue = list(names_list) in_flight = 0 async def _send_read(name: str) -> None: nonlocal in_flight msg = self.build_param_request_read(name, index=-1) self.send("mav", msg) attempts[name] += 1 last_sent[name] = time.monotonic() in_flight += 1 if inter_send_delay > 0: await asyncio.sleep(inter_send_delay) async def _fill_window_from_queue() -> None: nonlocal in_flight while in_flight < max_in_flight and queue: n = queue.pop(0) if n in pending and attempts[n] < max_retries: await _send_read(n) async def _resender_loop() -> None: nonlocal in_flight while not done_evt.is_set(): now = time.monotonic() # initial sends await _fill_window_from_queue() # resend timed-out pending items if in_flight < max_in_flight: for n in list(pending): if attempts[n] >= max_retries: continue if (now - last_sent[n]) >= resend_interval and in_flight < max_in_flight: await _send_read(n) await asyncio.sleep(0.05) async def _handler(pkt): try: if pkt.get_type() != "PARAM_VALUE": return pname, decoded_value = self._decode_param_value(pkt) if pname not in pending: return def _apply(): nonlocal in_flight if pname not in pending: return results[pname] = { "name": pname, "value": decoded_value, "raw": float(pkt.param_value), "type": pkt.param_type, "count": pkt.param_count, "index": pkt.param_index, } pending.remove(pname) # Free one in-flight slot (treat this response as completing one request) in_flight = max(0, in_flight - 1) if not pending: done_evt.set() loop.call_soon_threadsafe(_apply) except Exception as exc: self._log.error(f"[bulk_get handler] raised: {exc}") self.register_handler(key="PARAM_VALUE", fn=_handler) try: # Start sending + resending await _fill_window_from_queue() resender_task = asyncio.create_task(_resender_loop()) try: await asyncio.wait_for(done_evt.wait(), timeout=timeout_total) except asyncio.TimeoutError: # return partial results pass finally: resender_task.cancel() with contextlib.suppress(asyncio.CancelledError): await resender_task return results finally: self.unregister_handler("PARAM_VALUE", _handler)
[docs] class MavLinkFTPProxy(BaseProxy): """ Threaded MAVLink FTP driver using `pymavlink`. """ def __init__( self, mavlink_proxy: MavLinkExternalProxy, ): self._log = logging.getLogger("MavLinkFTPProxy") self._loop: asyncio.AbstractEventLoop | None = None self._exe = ThreadPoolExecutor(max_workers=1, thread_name_prefix="MavLinkFTPProxyWorker") self.mavlink_proxy: MavLinkExternalProxy = mavlink_proxy # ------------------------ life-cycle --------------------- #
[docs] async def start(self): """Open the MAVLink connection then launch the worker thread.""" self._loop = asyncio.get_running_loop() # Start the worker thread first await super().start()
# # Initialize parser if connection was successful # if self.mavlink_proxy.master: # await self._init_parser()
[docs] async def stop(self): """Stop the worker and close the link.""" await asyncio.sleep(0.1) # Ensure any pending writes are flushed
# ------------------- I/O primitives --------------------- # async def _init_parser(self): """Initialize the blocking parser.""" def create_parser(): return _BlockingParser( self._log, self.mavlink_proxy.master, self.mavlink_proxy, 0 ) self._parser = await self._loop.run_in_executor( self._exe, create_parser ) # ------------------- deletion helper with fallback ------------- # async def _delete_file_with_fallback(self, path: str) -> bool: """Delete a file on the vehicle, trying shell rm first then MAVFTP. On real NuttX hardware the shell ``rm`` command works and bypasses PX4's MAVFTP ``FileProtected`` restriction. In SITL builds the NuttX shell is not available (``Invalid command: rm``), so we fall back to MAVFTP ``cmd_rm`` which works there. The caller is responsible for stopping the PX4 logger before invoking this method. Returns ``True`` on success, ``False`` on failure. """ # 1) Try the NuttX shell rm (works on real hardware) ok = await self.mavlink_proxy.delete_file_via_shell(path) if ok: return True # 2) Fall back to MAVFTP cmd_rm (works on SITL) self._log.info(f"Shell rm failed for {path}, falling back to MAVFTP delete") if not hasattr(self, '_parser') or self._parser is None: await self._init_parser() def _ftp_delete(): try: self._parser._delete(path) return True except Exception as exc: self._log.warning(f"MAVFTP delete also failed for {path}: {exc}") return False return await self._loop.run_in_executor(self._exe, _ftp_delete) # ------------------- exposing blocking parser methods --------- #
[docs] async def list_ulogs(self, base: str = None, connection_timeout: float = 3.0) -> List[ULogInfo]: """Return metadata for every .ulg file on the vehicle.""" # Check connection and attempt to establish if needed if not self.mavlink_proxy.master or not self.mavlink_proxy.connected: self._log.warning("FTP connection not established, attempting to connect...") t_start = time.time() while True: await asyncio.sleep(1.0) # brief wait before re-checking if self.mavlink_proxy.master and self.mavlink_proxy.connected: break if time.time() - t_start > connection_timeout: self._log.error("Timeout waiting for MAVLink FTP connection") raise RuntimeError("MAVLink FTP connection could not be established") if base is None: base = self.mavlink_proxy.root_sd_path # Initialize parser if not already done (e.g., after reconnection) if not hasattr(self, '_parser') or self._parser is None: await self._init_parser() # Try to get log entries from the vehicle, but handle timeout gracefully entries = {} try: msg_id = str(mavutil.mavlink.MAVLINK_MSG_ID_LOG_ENTRY) msg = self.mavlink_proxy.build_req_msg_log_request(message_id=msg_id) entries = await self.mavlink_proxy.get_log_entries( msg_id=msg_id, request_msg=msg, timeout=5.0 ) except (TimeoutError, RuntimeError) as e: self._log.warning(f"Failed to get log entries from vehicle: {e}") self._log.info("Attempting to list files directly via FTP without log entries...") entries = {} # Attempt to list files via FTP try: raw = await self._loop.run_in_executor(self._exe, self._parser.list_ulogs, entries, base) return [ULogInfo(**item) for item in raw] except Exception as e: self._log.warning(f"Failed to list files via FTP: {e}") return []
[docs] async def download_ulog( self, remote_path: str, local_path: Path, completed_event: threading.Event, on_progress: ProgressCB | None = None, cancel_event: threading.Event | None = None, connection_timeout: float = 3.0, n_attempts: int = 3, ) -> Path: """ Fetch *remote_path* from the vehicle into *local_path*. Returns the Path actually written on success or None if cancelled. """ # Check connection and attempt to establish if needed last_exception = None for attempt in range(n_attempts): if not self.mavlink_proxy.master or not self.mavlink_proxy.connected: self._log.warning("FTP connection not established, attempting to connect...") t_start = time.time() while True: await asyncio.sleep(1.0) # brief wait before re-checking if self.mavlink_proxy.master and self.mavlink_proxy.connected: break if time.time() - t_start > connection_timeout: self._log.error("Timeout waiting for MAVLink FTP connection") raise RuntimeError("MAVLink FTP connection could not be established") # Initialize parser if not already done (e.g., after reconnection) if not hasattr(self, '_parser') or self._parser is None: await self._init_parser() try: result = await self._loop.run_in_executor( self._exe, self._parser.download_ulog, remote_path, local_path, on_progress, cancel_event ) if result: completed_event.set() return local_path else: raise RuntimeError("Download was cancelled or failed without exception") except Exception as e: self._log.error(f"Failed to download ulog via FTP on attempt {attempt + 1}/{n_attempts}: {e}") last_exception = e if last_exception is not None: self._log.error(f"All {n_attempts} attempts to download ulog failed.") raise last_exception
[docs] async def delete_file(self, remote_path: str, connection_timeout: float = 3.0, stop_logger: bool = True) -> None: """ Delete a single file at *remote_path* on the vehicle. Uses the NuttX shell ``rm`` command via MAVLink SERIAL_CONTROL, which bypasses PX4's MAVFTP ``FileProtected`` restriction. Parameters ---------- remote_path : str The path on the vehicle to delete. connection_timeout : float Maximum time to wait for MAVLink connection. stop_logger : bool If True, stop PX4 logging before deletion and restart it afterwards. Pass ``False`` when the caller has already stopped the logger (e.g. bulk-delete loop). """ if not self.mavlink_proxy.master or not self.mavlink_proxy.connected: self._log.warning("MAVLink connection not established, attempting to connect...") t_start = time.time() while True: await asyncio.sleep(1.0) if self.mavlink_proxy.master and self.mavlink_proxy.connected: break if time.time() - t_start > connection_timeout: self._log.error("Timeout waiting for MAVLink connection") raise RuntimeError("MAVLink connection could not be established") if stop_logger: await self.mavlink_proxy.stop_px4_logging() try: ok = await self._delete_file_with_fallback(remote_path) if not ok: raise RuntimeError(f"Delete failed for {remote_path} (shell rm and MAVFTP both failed)") finally: if stop_logger: await self.mavlink_proxy.start_px4_logging()
[docs] async def delete_all_logs( self, base: str = None, connection_timeout: float = 3.0, progress_callback: "Callable[[int, int], Any] | None" = None, ) -> Dict[str, Any]: """ Recursively delete every ``.ulg`` file under *base* on the vehicle. Uses MAVFTP only for *listing* directories, then deletes each file via the NuttX shell ``rm`` command (MAVLink SERIAL_CONTROL). This bypasses PX4's MAVFTP ``FileProtected`` (code 9) restriction which blocks ``cmd_rm`` on all log files. Stops the PX4 logger before deletion and restarts it afterwards. Args: base: Root directory to scan for .ulg files. connection_timeout: Seconds to wait for a MAVLink connection. progress_callback: Optional ``callback(current_index, total)`` invoked after each file deletion attempt. May be a regular function or an ``async`` coroutine. Returns a summary dict with deleted/failed counts. """ if not self.mavlink_proxy.master or not self.mavlink_proxy.connected: self._log.warning("MAVLink connection not established, attempting to connect...") t_start = time.time() while True: await asyncio.sleep(1.0) if self.mavlink_proxy.master and self.mavlink_proxy.connected: break if time.time() - t_start > connection_timeout: self._log.error("Timeout waiting for MAVLink connection") raise RuntimeError("MAVLink connection could not be established") if base is None: base = self.mavlink_proxy.root_sd_path if not hasattr(self, '_parser') or self._parser is None: await self._init_parser() # ── Discover .ulg files AND date directories in one pass ── def _scan(base_dir): """Return (file_list, date_dir_list).""" files = [] dirs = [] try: base_entries = self._parser._ls(base_dir, retries=2) except Exception as exc: # Empty directory → MAVFTP returns code 73 (timeout). # Treat as "nothing to delete". self._log.info( f"Base directory listing failed ({exc}) — " f"assuming empty, nothing to delete" ) return files, dirs for date, _, is_dir in base_entries: if not is_dir: continue sub = f"{base_dir}/{date}" dirs.append(sub) try: entries = self._parser._ls(sub, retries=1) except Exception as exc: self._log.warning(f"Skipping {sub}: {exc}") continue for name, size, child_is_dir in entries: if not child_is_dir and name.endswith(".ulg"): files.append((f"{sub}/{name}", size)) return files, dirs ulog_files, date_dirs = await self._loop.run_in_executor( self._exe, lambda: _scan(base) ) self._log.info( f"Found {len(ulog_files)} ULog file(s) under {base} — " f"deleting via shell rm" ) # Stop PX4 logger before deletion to release file handles await self.mavlink_proxy.stop_px4_logging() try: results: List[Dict[str, Any]] = [] deleted = 0 failed = 0 total = len(ulog_files) for idx, (remote_path, size) in enumerate(ulog_files, 1): ok = await self._delete_file_with_fallback(remote_path) if ok: results.append({ "path": remote_path, "size_bytes": size, "status": "deleted", }) deleted += 1 else: results.append({ "path": remote_path, "size_bytes": size, "status": "failed", "error": "Delete failed (shell rm and MAVFTP)", }) failed += 1 # Report per-file progress if progress_callback is not None: try: ret = progress_callback(idx, total) if asyncio.iscoroutine(ret): await ret except Exception: pass # never let a callback error abort deletion # ── Clean up ALL discovered date directories ───────── # Remove every date dir we found during listing, not # just those that had deleted files. This ensures dirs # left empty by a previous run are also removed. for d in sorted(date_dirs): try: ok = await self.mavlink_proxy.delete_file_via_shell( d, command="rmdir" ) if ok: self._log.info(f"Removed empty directory {d}") else: self._log.debug(f"rmdir {d} skipped (may not be empty)") except Exception as exc: self._log.debug(f"rmdir {d} failed: {exc}") summary = { "total": deleted + failed, "deleted": deleted, "failed": failed, "files": results, } self._log.info( f"Delete all logs complete: {deleted} deleted, " f"{failed} failed out of {deleted + failed} total" ) return summary finally: # Always restart the logger, even if deletion failed await self.mavlink_proxy.start_px4_logging()
[docs] async def detect_error_logs(self, remote_path: str, connection_timeout: float = 3.0) -> Dict[str, Any]: """ Detect ``fail_*.log`` error log files under *remote_path* on the vehicle. Lists files via MAVFTP without deleting anything. Returns a summary dict ``{total, files}`` where *files* is a list of ``{path, size_bytes}`` dicts. """ # Check connection and attempt to establish if needed if not self.mavlink_proxy.master or not self.mavlink_proxy.connected: self._log.warning("FTP connection not established, attempting to connect...") t_start = time.time() while True: await asyncio.sleep(1.0) if self.mavlink_proxy.master and self.mavlink_proxy.connected: break if time.time() - t_start > connection_timeout: self._log.error("Timeout waiting for MAVLink FTP connection") raise RuntimeError("MAVLink FTP connection could not be established") # Initialize parser if not already done if not hasattr(self, '_parser') or self._parser is None: await self._init_parser() # List fail_*.log files via MAVFTP def _list_fail_logs(): try: entries = self._parser._ls(remote_path, retries=2) return [ (f"{remote_path}/{name}", size) for name, size, is_dir in entries if not is_dir and name.startswith("fail_") and name.endswith(".log") ] except Exception as exc: self._log.warning(f"Failed to list error logs in {remote_path}: {exc}") return [] fail_logs = await self._loop.run_in_executor(self._exe, _list_fail_logs) files_info = [ {"path": path, "size_bytes": size} for path, size in fail_logs ] self._log.info( f"Detected {len(files_info)} fail_*.log file(s) under {remote_path}" ) return {"total": len(files_info), "files": files_info}
[docs] async def clear_error_logs(self, remote_path: str, connection_timeout: float = 3.0) -> Dict[str, Any]: """ Clear ``fail_*.log`` error logs under *remote_path* from the vehicle. Uses MAVFTP only for *listing*, then deletes each file via the NuttX shell ``rm`` command (same approach as ``delete_all_logs``). Returns a summary dict ``{total, deleted, failed}``. """ # Check connection and attempt to establish if needed if not self.mavlink_proxy.master or not self.mavlink_proxy.connected: self._log.warning("FTP connection not established, attempting to connect...") t_start = time.time() while True: await asyncio.sleep(1.0) if self.mavlink_proxy.master and self.mavlink_proxy.connected: break if time.time() - t_start > connection_timeout: self._log.error("Timeout waiting for MAVLink FTP connection") raise RuntimeError("MAVLink FTP connection could not be established") # Initialize parser if not already done if not hasattr(self, '_parser') or self._parser is None: await self._init_parser() # List fail_*.log files via MAVFTP def _list_fail_logs(): try: entries = self._parser._ls(remote_path, retries=2) return [ (f"{remote_path}/{name}", size) for name, size, is_dir in entries if not is_dir and name.startswith("fail_") and name.endswith(".log") ] except Exception as exc: self._log.warning(f"Failed to list error logs in {remote_path}: {exc}") return [] fail_logs = await self._loop.run_in_executor(self._exe, _list_fail_logs) if not fail_logs: self._log.info(f"No fail_*.log files found under {remote_path}") return {"total": 0, "deleted": 0, "failed": 0} self._log.info(f"Found {len(fail_logs)} fail_*.log file(s) under {remote_path}") deleted = 0 failed = 0 # Stop logger, delete via shell rm (with MAVFTP fallback), restart logger await self.mavlink_proxy.stop_px4_logging() try: for path, _size in fail_logs: try: ok = await self._delete_file_with_fallback(path) if ok: self._log.info(f"Deleted error log {path}") deleted += 1 else: self._log.warning(f"Delete failed for {path} (shell rm and MAVFTP)") failed += 1 except Exception as exc: self._log.warning(f"Error deleting {path}: {exc}") failed += 1 finally: await self.mavlink_proxy.start_px4_logging() self._log.info( f"Clear error logs complete: {deleted} deleted, " f"{failed} failed out of {deleted + failed} total" ) return {"total": deleted + failed, "deleted": deleted, "failed": failed}
[docs] async def list_directory(self, base: str = None, connection_timeout: float = 3.0) -> List[str]: """ List all files and directories under *base* on the vehicle. """ # Check connection and attempt to establish if needed if not self.mavlink_proxy.master or not self.mavlink_proxy.connected: self._log.warning("FTP connection not established, attempting to connect...") t_start = time.time() while True: await asyncio.sleep(1.0) # brief wait before re-checking if self.mavlink_proxy.master and self.mavlink_proxy.connected: break if time.time() - t_start > connection_timeout: self._log.error("Timeout waiting for MAVLink FTP connection") raise RuntimeError("MAVLink FTP connection could not be established") if base is None: base = self.mavlink_proxy.root_sd_path # Initialize parser if not already done (e.g., after reconnection) if not hasattr(self, '_parser') or self._parser is None: await self._init_parser() try: listing = await self._loop.run_in_executor(self._exe, self._parser.list_directory, base) return listing except Exception as e: self._log.warning(f"Failed to list directory via FTP: {e}") return []
# --------------------------------------------------------------------------- # # helper functions # # --------------------------------------------------------------------------- # def _match_ls_to_entries( ls_list: List[Tuple[str, int]], entry_dict: Dict[int, Dict[str, int]], threshold_size: int = 4096, ) -> Dict[str, Tuple[int, int]]: files = sorted([(n, s) for n, s in ls_list], key=lambda x: x[1], reverse=True) entries = sorted(entry_dict.items()) if len(files) != len(entries): raise ValueError("ls and entry counts differ; can't match safely") mapping = {} for log_id, info in entries: for i, (name, sz) in enumerate(files): if abs(sz - info['size']) <= threshold_size: files.pop(i) mapping[log_id] = (name, sz, info['utc']) break return mapping class _BlockingParser: """ Thin wrapper around pymavlink / MAVFTP - runs in a dedicated thread. All methods are synchronous and blocking; the proxy wraps them in run_in_executor so the event-loop stays responsive. """ # ---------- life-cycle -------------------------------------------------- # def __init__( self, logger: logging.Logger, master: mavutil.mavserial, mavlink_proxy: MavLinkExternalProxy, debug: int = 0 ): self._log = logger.getChild("BlockingParser") self.master = master self.proxy = mavlink_proxy self.root_sd_path = self.proxy.root_sd_path # try three times to init MAVFTP try: for _ in range(3): try: if self.master is None or not self.proxy.connected: raise RuntimeError("MAVLink master not initialized MAVFTP proxy failed") with self.proxy._mav_lock: self.ftp = mavftp.MAVFTP( self.master, self.master.target_system, self.master.target_component ) break except Exception as e: self._log.warning(f"MAVFTP init attempt failed: {e}") time.sleep(1) else: raise RuntimeError("MAVFTP init failed after 3 attempts") self._log.info("MAVFTP initialized successfully") self.ftp.ftp_settings.debug = debug self.ftp.ftp_settings.retry_time = 0.2 # 200 ms instead of 1 s self.ftp.ftp_settings.burst_read_size = 239 self.ftp.burst_size = 239 except Exception as e: self._log.error(f"Failed to initialize MAVFTP: {e}") @property def system_id(self): # convenience for log message in proxy.start() return self.master.target_system def close(self): self.master.close() # ---------- public helpers (blocking) ----------------------------------- # # 1) list_ulogs ---------------------------------------------------------- # def list_ulogs(self, entries: Dict[int, Dict[str, int]], base:str) -> List[ULogInfo]: """ Enumerate *.ulg under the SD-card and return a list of dicts that can be fed directly into ULogInfo(**dict). """ ulog_files = list(self._walk_ulogs(base)) if not ulog_files: return [] # If we have log entries from the vehicle, try to match them with files if entries: try: mapping = _match_ls_to_entries(ulog_files, entries) # sort the mapping by utc descending mapping = sorted( mapping.values(), key=lambda x: x[2], # sort by utc (index 2) reverse=True ) result = [] for i, (name, size, utc) in enumerate(mapping): result.append( dict(index=i, remote_path=name, size_bytes=size, utc=utc) ) return result except ValueError as e: self._log.warning(f"Failed to match files with log entries: {e}") # Fall through to basic file listing # If no entries or matching failed, return basic file info without UTC timestamps self._log.info("Returning basic file listing without log entry metadata") result = [] for i, (name, size) in enumerate(ulog_files): result.append( dict(index=i, remote_path=name, size_bytes=size, utc=0) # UTC=0 when unknown ) return result # 2) download_ulog ------------------------------------------------------- # def download_ulog( self, remote_path: str, local_path: Path, on_progress: ProgressCB | None = None, cancel_event: threading.Event | None = None, ): """Blocking download with retry + tmp-file recovery with cancellation support.""" # ------------------------------------------------------------------ # def _progress_cb(frac: float | None): if frac is None or on_progress is None: return # Check for cancellation if cancel_event and cancel_event.is_set(): # Use our custom exception to signal cancellation raise DownloadCancelledException("Download cancelled by user") asyncio.run_coroutine_threadsafe( on_progress(frac), loop=self.proxy._loop ) # ------------------------------------------------------------------ # try: self._log.info("Downloading %s%s", remote_path, local_path) local_path.parent.mkdir(parents=True, exist_ok=True) with self.proxy._mav_lock: with self.proxy._download_lock: ret = self.ftp.cmd_get( [remote_path, str(local_path.absolute())], progress_callback=lambda x: _progress_cb(x) ) if ret.return_code != 0: self._log.error(f"OpenFileRO failed: download failed with code {ret.return_code}") self._reset_ftp_state() return None # Check for cancellation before processing reply if cancel_event and cancel_event.is_set(): self._reset_ftp_state() if local_path.exists(): local_path.unlink() return None # Process the reply with a try-except to handle potential issues try: result = self.ftp.process_ftp_reply(ret.operation_name, timeout=0) if result.return_code != 0: self._log.error(f"OpenFileRO Download failed with code {result.return_code}") self._reset_ftp_state() if local_path.exists(): local_path.unlink() return None except DownloadCancelledException: # Handle cancellation gracefully self._log.info("Download cancelled by user") self._reset_ftp_state() if local_path.exists(): local_path.unlink() return None except (OSError, socket.error) as e: self._log.error(f"FTP error during download: {str(e)}") return None except Exception as e: self._log.error(f"Error processing FTP reply: {str(e)}") self._reset_ftp_state() return None if not local_path.exists(): # handle temp-file move failure tmp = Path(self.ftp.temp_filename) if tmp.exists(): shutil.move(tmp, local_path) self._log.warning("Temp file recovered to %s", local_path) self._reset_ftp_state() # for next download if not local_path.exists(): self._log.error("Failed to recover temp file to %s", local_path) return None self._log.info("Saved %s (%.1f KiB)", local_path.name, local_path.stat().st_size / 1024) return str(local_path) except DownloadCancelledException: # Handle cancellation gracefully at the outer level too self._log.info("Download cancelled by user") with self.proxy._mav_lock: self._reset_ftp_state() if local_path.exists(): local_path.unlink() raise except (OSError, socket.error) as e: # Handle connection errors (including "Bad file descriptor") self._log.error(f"Download error: {str(e)}") with self.proxy._mav_lock: with self.proxy._download_lock: self._reset_ftp_state() # Clean up partial file if local_path.exists(): local_path.unlink() # Re-raise the original exception raise except RuntimeError as e: self._log.error(f"Download error: {str(e)}") # Always reset FTP state on error with self.proxy._mav_lock: with self.proxy._download_lock: self._reset_ftp_state() # Clean up partial file if local_path.exists(): local_path.unlink() # Re-raise the original exception raise except Exception as e: self._log.error(f"Download error: {str(e)}") # Always reset FTP state on error with self.proxy._mav_lock: with self.proxy._download_lock: self._reset_ftp_state() # Clean up partial file if local_path.exists(): local_path.unlink() # Re-raise the original exception raise # 3) clear error logs ---------------------------------------------------- # def clear_error_logs(self, base: str = "fs/microsd") -> None: fail_logs = self._list_fail_logs(base) for log in fail_logs: try: self._log.info(f"Deleting error log {log.remote_path}") # Check if connection is still valid before attempting operation if not self.proxy.master or not self.proxy.connected: self._log.warning(f"Connection lost, skipping delete for {log.remote_path}") return self._delete(log.remote_path) time.sleep(0.1) # Give some time for the delete operation to complete except (OSError, socket.error) as e: # Handle connection errors gracefully if e.errno in [errno.EBADF, errno.ECONNRESET, errno.ECONNREFUSED, errno.EPIPE]: self._log.warning(f"Connection lost during delete operation: {e}") else: self._log.error(f"Unexpected error deleting log {log.remote_path}: {e}") except Exception as e: self._log.error(f"Error deleting log {log.remote_path}: {e}") self._log.info("Cleared all error logs") # 4) delete_file --------------------------------------------------------- # def delete_file(self, path: str) -> None: """ Delete a single file at *path* on the vehicle via MAVFTP. Wraps the private ``_delete`` helper with a public interface. """ self._log.info(f"Deleting file: {path}") if not self.proxy.master or not self.proxy.connected: raise RuntimeError("MAVLink connection not available for delete operation") self._delete(path) # 5) delete_all_logs ----------------------------------------------------- # def delete_all_logs(self, base: str = "fs/microsd/log") -> Dict[str, Any]: """ Recursively walk *base* (same logic as ``_walk_ulogs``) and delete every ``.ulg`` file found. Does NOT require log entries from the vehicle – purely filesystem-based. .. note:: The caller (``MavLinkFTPProxy.delete_all_logs``) is responsible for stopping PX4 logging **before** invoking this method. Without that, PX4 returns ``FileProtected`` (FTP code 9) for every file because the logger holds open file handles. Returns a summary dict with ``deleted``, ``failed``, and ``total`` counts plus a list of per-file results. """ self._log.info(f"Deleting all ULog files under {base}") if not self.proxy.master or not self.proxy.connected: raise RuntimeError("MAVLink connection not available for delete operation") # Reset FTP state before starting the delete loop to ensure a clean session with self.proxy._mav_lock: with self.proxy._download_lock: self._reset_ftp_state() results: List[Dict[str, Any]] = [] deleted = 0 failed = 0 for remote_path, size in self._walk_ulogs(base): try: self._delete(remote_path) results.append({"path": remote_path, "size_bytes": size, "status": "deleted"}) deleted += 1 time.sleep(0.1) # brief pause between deletes except FTPDeleteError as e: self._log.error(f"Failed to delete {remote_path}: {e}") results.append({ "path": remote_path, "size_bytes": size, "status": "failed", "error": str(e), "ftp_code": e.ftp_code, }) failed += 1 except Exception as e: self._log.error(f"Failed to delete {remote_path}: {e}") results.append({"path": remote_path, "size_bytes": size, "status": "failed", "error": str(e)}) failed += 1 summary = { "total": deleted + failed, "deleted": deleted, "failed": failed, "files": results, } self._log.info(f"Delete all logs complete: {deleted} deleted, {failed} failed out of {deleted + failed} total") return summary # 6) ls a directory ------------------------------------------------------ # def list_directory(self, base: str = "fs/microsd") -> List[Dict[str, Any]]: """List the contents of a directory on the vehicle's filesystem.""" try: self._log.info(f"Listing directory: {base}") # Check if connection is still valid before attempting operation if not self.proxy.master or not self.proxy.connected: self._log.warning(f"Connection lost, skipping ls for {base}") return [] return self._ls(base) except Exception as e: self._log.error(f"Error listing directory {base}: {e}") return [] # ---------- internal helpers ------------------------------------------- # def _reset_ftp_state(self): """Reset all FTP state to handle canceled transfers properly.""" self._log.warning("Resetting FTP state") try: # First try to terminate the current session self.ftp._MAVFTP__terminate_session() self.ftp.process_ftp_reply("TerminateSession") except Exception as e: self._log.warning(f"Error terminating session: {e}") try: # Then reset all sessions for good measure op = mavftp.OP_ResetSessions self.ftp._MAVFTP__send(FTP_OP(self.ftp.seq, self.ftp.session, op, 0, 0, 0, 0, None)) self.ftp.process_ftp_reply("ResetSessions") except Exception as e: self._log.warning(f"Error resetting sessions: {e}") # Reset internal dictionaries that could cause issues self.ftp.active_read_sessions = {} # These are the problematic data structures that cause the KeyError if hasattr(self.ftp, 'read_gap_times'): self.ftp.read_gap_times = {} if hasattr(self.ftp, 'read_gaps'): # This should be a list, not a dict self.ftp.read_gaps = [] # Reset session counter and tracking if hasattr(self.ftp, 'next_read_session'): self.ftp.next_read_session = 1 if hasattr(self.ftp, 'session'): self.ftp.session = 0 if hasattr(self.ftp, 'seq'): self.ftp.seq = 0 # Reset other stateful variables if hasattr(self.ftp, 'read_total'): self.ftp.read_total = 0 if hasattr(self.ftp, 'read_offset'): self.ftp.read_offset = 0 if hasattr(self.ftp, 'remote_file_size'): self.ftp.remote_file_size = 0 if hasattr(self.ftp, 'burst_state'): self.ftp.burst_state = 0 def _walk_ulogs(self, base="fs/microsd/log") -> Generator[Tuple[str, int], None, None]: dates = self._ls(base) for date, _, is_dir in dates: if not is_dir: continue try: entries = self._ls(f"{base}/{date}", retries=1) except Exception as exc: self._log.warning( f"Skipping {base}/{date}: listing failed ({exc})" ) continue for name, size, is_dir in entries: if not is_dir and name.endswith(".ulg"): yield f"{base}/{date}/{name}", size # plain MAVFTP ls def _ls(self, path: str, retries=5, delay=2.0): for n in range(1, retries + 1): try: # Check if connection and master are valid before attempting operation if not self.master or not self.proxy.connected: self._log.warning(f"Connection not available, skipping ls for {path} (attempt {n}/{retries})") if n >= retries: return [] # Return empty list if all retries exhausted time.sleep(delay) continue # Additional check: verify the file descriptor is still valid try: # Test if the socket is still open by checking its fileno if hasattr(self.master, 'port') and hasattr(self.master.port, 'fileno'): fd = self.master.port.fileno() if fd < 0: raise OSError("Invalid file descriptor") except (OSError, AttributeError): self._log.warning(f"File descriptor invalid, marking connection as lost (attempt {n}/{retries})") with self.proxy._mav_lock: with self.proxy._download_lock: self._reset_ftp_state() if n >= retries: return [] time.sleep(delay) continue with self.proxy._mav_lock: with self.proxy._download_lock: # Double-check connection inside the lock if not self.master or not self.proxy.connected: self._log.warning(f"Connection lost during lock acquisition for {path}") if n >= retries: return [] continue ack = self.ftp.cmd_list([path]) if ack.return_code == 0: return list(set((e.name, e.size_b, e.is_dir) for e in self.ftp.list_result)) else: # FTP command failed - check if it's a retryable error if ack.return_code == 1: # Error code 1 typically means "file/directory not found" or "permission denied" # This is not a connection issue, so don't retry self._log.warning(f"FTP ls failed: path '{path}' not found or not accessible (return code {ack.return_code})") return [] # Return empty list instead of raising error else: # Other error codes might be retryable self._log.warning(f"FTP ls command failed with return code {ack.return_code} (attempt {n}/{retries})") if n >= retries: raise RuntimeError(f"ls('{path}') failed after {retries} attempts: FTP return code {ack.return_code}") except (OSError, socket.error) as e: # Handle connection errors gracefully if e.errno in [errno.EBADF, errno.ECONNRESET, errno.ECONNREFUSED, errno.EPIPE]: self._log.warning(f"Connection lost during ls operation") if n >= retries: raise RuntimeError(f"ls('{path}') failed after {retries} attempts due to connection loss") else: self._log.error(f"Unexpected socket error during ls: {e}") raise except Exception as e: self._log.error(f"Error during ls operation (attempt {n}/{retries}): {e}") if n >= retries: raise RuntimeError(f"ls('{path}') failed after {retries} attempts: {e}") # If we reach here, the operation failed but we can retry if n < retries: self._log.info(f"Retrying ls operation for {path} in {delay}s (attempt {n+1}/{retries})") time.sleep(delay) raise RuntimeError(f"ls('{path}') failed {retries}×") def _list_fail_logs(self, base: str = "fs/microsd") -> List[ULogInfo]: """ List all fail_*.log files under the given *base* directory. without walking """ try: entries = self._ls(base) fail_logs = [ ULogInfo(index=i, remote_path=f"{base}/{name}", size_bytes=size, utc=0) for i, (name, size, is_dir) in enumerate(entries) if not is_dir and name.startswith("fail_") and name.endswith(".log") ] return fail_logs except RuntimeError as e: self._log.error(f"Failed to list fail logs in {base}: {e}") return [] def _delete(self, path: str, retries=2, delay=2.0): """ Delete a file or directory at *path* using MAVFTP. Retries on failure up to *retries* times with *delay* seconds between attempts. """ for n in range(1, retries + 1): try: # Check if connection and master are valid before attempting operation if not self.master or not self.proxy.connected: self._log.warning(f"Connection not available, skipping delete for {path} (attempt {n}/{retries})") if n >= retries: return # Give up after all retries time.sleep(delay) continue # Additional check: verify the file descriptor is still valid try: if hasattr(self.master, 'port') and hasattr(self.master.port, 'fileno'): fd = self.master.port.fileno() if fd < 0: raise OSError("Invalid file descriptor") except (OSError, AttributeError): self._log.warning(f"File descriptor invalid for delete, marking connection as lost (attempt {n}/{retries})") with self.proxy._mav_lock: with self.proxy._download_lock: self._reset_ftp_state() if n >= retries: return time.sleep(delay) continue with self.proxy._mav_lock: with self.proxy._download_lock: # Double-check connection inside the lock if not self.master or not self.proxy.connected: self._log.warning(f"Connection lost during lock acquisition for delete {path}") if n >= retries: return continue ack = self.ftp.cmd_rm([path]) if ack.return_code == 0: self._log.info(f"Successfully deleted {path}") return else: # Translate FTP error codes to human-readable messages _FTP_ERR_NAMES = { 1: "Fail", 2: "FailErrno", 3: "InvalidDataSize", 4: "InvalidSession", 5: "NoSessionsAvailable", 6: "EndOfFile", 7: "UnknownCommand", 8: "FileExists", 9: "FileProtected", 10: "FileNotFound", 73: "RemoteReplyTimeout", } err_name = _FTP_ERR_NAMES.get(ack.return_code, f"Unknown({ack.return_code})") detail = ( f"FTP delete failed: {err_name} (code {ack.return_code}) " f"(attempt {n}/{retries})" ) if ack.return_code == 9: # FileProtected detail += ( " — PX4 logger likely has the file locked. " "Ensure 'logger stop' is sent before deleting." ) self._log.warning(detail) if n >= retries: raise FTPDeleteError( path=path, ftp_code=ack.return_code, message=( f"delete('{path}') failed after {retries} attempts: " f"{err_name} (FTP code {ack.return_code})" ), ) except (OSError, socket.error) as e: # Handle connection errors gracefully if e.errno in [errno.EBADF, errno.ECONNRESET, errno.ECONNREFUSED, errno.EPIPE]: self._log.warning(f"Connection lost during delete operation (attempt {n}/{retries}): {e}") if n >= retries: raise RuntimeError(f"delete('{path}') failed after {retries} attempts due to connection loss") else: self._log.error(f"Unexpected socket error during delete: {e}") raise except FTPDeleteError: raise # propagate structured FTP errors as-is except Exception as e: self._log.error(f"Error during delete operation (attempt {n}/{retries}): {e}") if n >= retries: raise RuntimeError(f"delete('{path}') failed after {retries} attempts: {e}") # If we reach here, the operation failed but we can retry if n < retries: self._log.info(f"Retrying delete operation for {path} in {delay}s (attempt {n+1}/{retries})") time.sleep(delay)