"""
petalappmanager.proxies.external
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Thread-based proxies for long-running I/O back-ends (MAVLink, ROS 1, …).
Key changes vs. the first draft:
--------------------------------
* All per-key buffers are now :class:`collections.deque` with ``maxlen``.
New data silently overwrites the oldest entry → bounded memory.
* Public API (``send``, ``register_handler``) is unchanged for petals.
* Docstrings preserved / expanded for clarity.
"""
from __future__ import annotations
import threading
import time
import socket
import errno
import struct
from abc import abstractmethod
from collections import defaultdict, deque
from typing import (
Any,
Callable,
Deque,
Dict,
List,
Mapping,
Tuple,
Generator,
Awaitable,
Optional,
Union
)
import contextlib
import logging
from pathlib import Path
import asyncio, shutil
from pydantic import BaseModel, Field
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from .base import BaseProxy
from .. import Config
from ..models.mavlink import (
RebootStatusCode,
RebootAutopilotResponse,
FormatStorageResponse,
FormatStorageStatusCode
)
from pymavlink import mavutil, mavftp
from pymavlink.mavftp_op import FTP_OP
from pymavlink.dialects.v20 import all as mavlink_dialect
import os
# import rospy # ← uncomment in ROS-enabled environments
import dotenv
_PARAM_TYPE_NAME_TO_ID = {
"UINT8": mavutil.mavlink.MAV_PARAM_TYPE_UINT8,
"INT8": mavutil.mavlink.MAV_PARAM_TYPE_INT8,
"UINT16": mavutil.mavlink.MAV_PARAM_TYPE_UINT16,
"INT16": mavutil.mavlink.MAV_PARAM_TYPE_INT16,
"UINT32": mavutil.mavlink.MAV_PARAM_TYPE_UINT32,
"INT32": mavutil.mavlink.MAV_PARAM_TYPE_INT32,
"UINT64": mavutil.mavlink.MAV_PARAM_TYPE_UINT64,
"INT64": mavutil.mavlink.MAV_PARAM_TYPE_INT64,
"REAL32": mavutil.mavlink.MAV_PARAM_TYPE_REAL32,
"REAL64": mavutil.mavlink.MAV_PARAM_TYPE_REAL64,
}
_INT_TYPES = {
mavutil.mavlink.MAV_PARAM_TYPE_UINT8,
mavutil.mavlink.MAV_PARAM_TYPE_INT8,
mavutil.mavlink.MAV_PARAM_TYPE_UINT16,
mavutil.mavlink.MAV_PARAM_TYPE_INT16,
mavutil.mavlink.MAV_PARAM_TYPE_UINT32,
mavutil.mavlink.MAV_PARAM_TYPE_INT32,
}
_FLOAT_TYPES = {
mavutil.mavlink.MAV_PARAM_TYPE_REAL32,
mavutil.mavlink.MAV_PARAM_TYPE_REAL64,
}
ParamSpec = Union[
Any, # value only
Tuple[Any, Union[str, int]], # (value, "UINT16") or (value, MAV_PARAM_TYPE_INT32)
Dict[str, Any], # {"value": ..., "type": "INT16"}
]
def _parse_param_type(ptype: Optional[Any]) -> Optional[int]:
"""
Accept:
- None
- int (already a MAV_PARAM_TYPE_* value)
- strings like "UINT16", "int32", "MAV_PARAM_TYPE_INT32"
"""
if ptype is None:
return None
if isinstance(ptype, int):
return ptype
if isinstance(ptype, str):
s = ptype.strip().upper()
s = s.replace("MAV_PARAM_TYPE_", "")
if s in _PARAM_TYPE_NAME_TO_ID:
return _PARAM_TYPE_NAME_TO_ID[s]
raise ValueError(f"Unsupported param type specifier: {ptype!r}")
def _check_int_range(v: int, bits: int, signed: bool) -> None:
if signed:
lo, hi = -(1 << (bits - 1)), (1 << (bits - 1)) - 1
else:
lo, hi = 0, (1 << bits) - 1
if not (lo <= v <= hi):
raise ValueError(f"Value {v} out of range for {'INT' if signed else 'UINT'}{bits} [{lo}, {hi}]")
def _u32_to_f32_bits(u32: int) -> float:
return struct.unpack("<f", struct.pack("<I", u32 & 0xFFFFFFFF))[0]
def _f32_bits_to_u32(f: float) -> int:
return struct.unpack("<I", struct.pack("<f", float(f)))[0]
def _sign_extend(value: int, bits: int) -> int:
sign_bit = 1 << (bits - 1)
mask = (1 << bits) - 1
value &= mask
return (value ^ sign_bit) - sign_bit
[docs]
def setup_file_only_logger(name: str, log_file: str, level: str = "INFO") -> logging.Logger:
"""Setup a logger that only writes to files, not console."""
logger = logging.getLogger(name)
logger.setLevel(getattr(logging, level.upper()))
# Clear any existing handlers to avoid console output
logger.handlers.clear()
# Create file handler
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(getattr(logging, level.upper()))
# Create formatter
formatter = logging.Formatter(
'%(asctime)s — %(name)s — %(levelname)s — %(message)s'
)
file_handler.setFormatter(formatter)
# Add handler to logger
logger.addHandler(file_handler)
# Prevent propagation to root logger (which might log to console)
logger.propagate = False
return logger
# --------------------------------------------------------------------------- #
# Public dataclasses returned to petals / REST #
# --------------------------------------------------------------------------- #
[docs]
class ULogInfo(BaseModel):
"""Metadata for a ULog that resides on the PX4 SD-card."""
index : int # 0-based index in the list
remote_path: str
size_bytes : int
utc : int # epoch seconds
# Progress callback signature used by download_ulog
ProgressCB = Callable[[float], Awaitable[None]] # 0.0 - 1.0
[docs]
class DownloadCancelledException(Exception):
"""Raised when a download is cancelled by the user."""
pass
[docs]
class FTPDeleteError(Exception):
"""Raised when a MAVFTP delete operation fails with a known FTP error code."""
def __init__(self, path: str, ftp_code: int, message: str):
self.path = path
self.ftp_code = ftp_code
super().__init__(message)
# ──────────────────────────────────────────────────────────────────────────────
[docs]
class ExternalProxy(BaseProxy):
"""
Base class for I/O drivers that must *poll* or *listen* continuously.
A dedicated thread calls :py:meth:`_io_read_once` / :py:meth:`_io_write_once`
in a tight loop while the FastAPI event-loop thread stays unblocked.
* **Send buffers** - ``self._send[key]`` (deque, newest → right side)
Outbound messages are enqueued here via :py:meth:`send`.
The I/O thread drains these buffers by calling
:py:meth:`_io_write_once` with all pending messages.
* **Handlers** - ``self._handlers[key]``
Callbacks registered via :py:meth:`register_handler` are stored here.
When new messages arrive via :py:meth:`_io_read_once`, they are
processed directly in the I/O recv thread, which schedules
async handlers on the event loop.
"""
# ──────────────────────────────────────────────────────── public helpers ──
def __init__(self, maxlen: int, sleep_time_ms: float = 1.0) -> None:
"""
Parameters
----------
maxlen :
Maximum number of messages kept *per key* in both send/recv maps.
A value of 0 or ``None`` means *unbounded* (not recommended).
"""
self._maxlen = maxlen
if sleep_time_ms < 0:
sleep_time_ms = 0
self._sleep_time_ms = sleep_time_ms
self._sleep_time_reader_ms = sleep_time_ms
self._send: Dict[str, Deque[Any]] = {}
self._handlers: Dict[str, List[Callable[[Any], None]]] = (
defaultdict(list)
)
self._handler_configs: Dict[str, Dict[Callable[[Any], None], Dict[str, Any]]] = (
defaultdict(dict)
)
self._last_message_times: Dict[str, Dict[str, float]] = defaultdict(dict)
# Thread management (I/O threads only)
self._send_running = threading.Event()
self._recv_running = threading.Event()
self._io_thread_send: threading.Thread | None = None
self._io_thread_recv: threading.Thread | None = None
self._loop: asyncio.AbstractEventLoop | None = None
self._log = logging.getLogger(self.__class__.__name__)
[docs]
def register_handler(self,
key: str,
fn: Callable[[Any], Awaitable[None]],
duplicate_filter_interval: Optional[float] = None,
queue_length: Optional[int] = None,
cpu_heavy: bool = False,
) -> None:
"""
Attach an async callback *fn* so it fires for **every** message appended to ``_recv[key]``.
The callback is scheduled on the main event loop for thread-safe execution.
Parameters
----------
key : str
The key to register the handler for.
fn : Callable[[Any], Awaitable[None]]
The async handler function to call for each message.
duplicate_filter_interval : Optional[float]
If specified, duplicate messages received within this interval (in seconds)
will be filtered out and the handler will not be called. None disables filtering.
queue_length : Optional[int]
If specified, sets the maximum length of the message buffer for *key*.
New messages overwrite the oldest when the buffer is full.
If None, the default maxlen from proxy initialization is used.
cpu_heavy : bool
If True, the handler's CPU-bound work will be offloaded to a
thread-pool executor managed by the proxy, preventing event-loop
starvation. The handler must still be ``async def``, but it may
call ``await loop.run_in_executor(...)`` internally—or simply set
this flag and the proxy will wrap the entire handler invocation
in ``run_in_executor`` automatically.
Raises
------
TypeError
If *fn* is not an async function (coroutine).
"""
if not asyncio.iscoroutinefunction(fn):
raise TypeError(
f"Handler must be an async function (coroutine). "
f"Got {type(fn).__name__} instead. "
f"Define your handler with 'async def' instead of 'def'."
)
self._handlers[key].append(fn)
self._handler_configs[key][fn] = {
'duplicate_filter_interval': duplicate_filter_interval,
'cpu_heavy': cpu_heavy,
}
[docs]
def unregister_handler(self, key: str, fn: Callable[[Any], Awaitable[None]]) -> None:
"""
Remove the callback *fn* from the broadcast list attached to *key*.
If *fn* was not registered, the call is silently ignored.
When the last callback for *key* is removed, the key itself is pruned
to keep the dict size small.
"""
callbacks = self._handlers.get(key)
if not callbacks:
return # nothing registered under that key
try:
callbacks.remove(fn)
except ValueError:
self._log.warning(
"Tried to unregister handler %s for key '%s' but it was not found.",
fn, key
)
return # fn was not in the list; ignore
# Clean up handler config
if key in self._handler_configs and fn in self._handler_configs[key]:
del self._handler_configs[key][fn]
if not callbacks:
# last handler -> prune everything for that key
del self._handlers[key]
if key in self._handler_configs:
del self._handler_configs[key]
[docs]
def send(self, key: str, msg: Any, burst_count: Optional[int] = None,
burst_interval: Optional[float] = None) -> None:
"""
Enqueue *msg* for transmission. The newest message is kept if the
buffer is already full.
Parameters
----------
key : str
The key to send the message on.
msg : Any
The message to send.
burst_count : Optional[int]
If specified, send the message this many times in a burst.
burst_interval : Optional[float]
If burst_count is specified, wait this many seconds between each message.
If None, all messages are sent immediately.
"""
if burst_count is None or burst_count <= 1:
# Single message send
self._send.setdefault(key, deque(maxlen=self._maxlen)).append(msg)
else:
# Burst send
if burst_interval is None or burst_interval <= 0:
# Send all messages immediately
send_queue = self._send.setdefault(key, deque(maxlen=self._maxlen))
for _ in range(burst_count):
send_queue.append(msg)
else:
# Schedule burst with intervals using a background task
if self._loop is not None:
try:
# Check if we're in the same thread as the event loop
current_loop = None
try:
current_loop = asyncio.get_running_loop()
except RuntimeError:
current_loop = None
if current_loop is self._loop:
# We're in the event loop thread, create task directly
task = asyncio.create_task(
self._send_burst(key, msg, burst_count, burst_interval)
)
# Store the task reference to prevent garbage collection
if not hasattr(self, '_burst_tasks'):
self._burst_tasks = set()
self._burst_tasks.add(task)
task.add_done_callback(self._burst_tasks.discard)
else:
# We're in a different thread, schedule on proxy's loop
def schedule_burst():
try:
task = asyncio.create_task(
self._send_burst(key, msg, burst_count, burst_interval)
)
if not hasattr(self, '_burst_tasks'):
self._burst_tasks = set()
self._burst_tasks.add(task)
task.add_done_callback(self._burst_tasks.discard)
except Exception as e:
self._log.error(f"Failed to schedule burst task: {e}")
self._loop.call_soon_threadsafe(schedule_burst)
except Exception as e:
# If task creation fails, fall back to immediate send
self._log.warning(f"Failed to create burst task: {e}, sending immediately")
send_queue = self._send.setdefault(key, deque(maxlen=self._maxlen))
for _ in range(burst_count):
send_queue.append(msg)
else:
# If no loop is available, fall back to immediate send
self._log.warning("No event loop available for burst with interval, sending immediately")
send_queue = self._send.setdefault(key, deque(maxlen=self._maxlen))
for _ in range(burst_count):
send_queue.append(msg)
async def _send_burst(self, key: str, msg: Any, count: int, interval: float) -> None:
"""Send a burst of messages with specified interval."""
send_queue = self._send.setdefault(key, deque(maxlen=self._maxlen))
# Send messages with proper intervals
for i in range(count):
send_queue.append(msg)
self._log.debug(f"Burst message {i+1}/{count} queued for key '{key}'")
if i < count - 1: # Don't sleep after the last message
await asyncio.sleep(interval)
# ───────────────────────────────────────────── FastAPI life-cycle hooks ──
[docs]
async def start(self) -> None:
"""Create the I/O threads and begin polling/writing."""
self._loop = asyncio.get_running_loop()
self._burst_tasks = set() # Initialize burst tasks tracking
self._send_running.set()
self._recv_running.set()
# Start I/O threads for send/recv
self._io_thread_recv = threading.Thread(target=self._recv_body, daemon=True, name=f"{self.__class__.__name__}-Recv")
self._io_thread_send = threading.Thread(target=self._send_body, daemon=True, name=f"{self.__class__.__name__}-Send")
self._io_thread_recv.start()
self._io_thread_send.start()
[docs]
async def stop(self) -> None:
"""Ask the I/O threads to exit and join them (best-effort, 5 s timeout)."""
self._send_running.clear()
self._recv_running.clear()
# Cancel any pending burst tasks
if hasattr(self, '_burst_tasks'):
for task in self._burst_tasks.copy():
if not task.done():
task.cancel()
# Wait for tasks to complete cancellation
if self._burst_tasks:
await asyncio.gather(*self._burst_tasks, return_exceptions=True)
self._burst_tasks.clear()
# Stop I/O send thread
if self._io_thread_send and self._io_thread_send.is_alive():
self._io_thread_send.join(timeout=5)
self._io_thread_send = None
# Stop I/O recv thread
if self._io_thread_recv and self._io_thread_recv.is_alive():
self._io_thread_recv.join(timeout=5)
self._io_thread_recv = None
# ─────────────────────────────────────────────── subclass responsibilities ─
@abstractmethod
def _io_read_once(self, timeout: int=0) -> List[Tuple[str, Any]]:
"""
Retrieve **zero or more** `(key, message)` tuples from the device /
middleware *without blocking*.
Returning an empty list is perfectly fine.
"""
@abstractmethod
def _io_write_once(self, batches: Mapping[str, List[Any]]) -> None:
"""
Push pending outbound messages to the device / middleware.
``batches`` maps *key* → list of messages drained from ``_send[key]``.
"""
# ─────────────────────────────────────────── internal worker main-loop ──
def _send_body(self) -> None:
"""I/O thread body - drains send queues."""
while self._send_running.is_set():
pending: Dict[str, List[Any]] = defaultdict(list)
for key, dq in list(self._send.items()):
while dq:
pending[key].append(dq.popleft())
if pending:
self._io_write_once(pending)
else:
# Sleep briefly if there's nothing to send to avoid busy-waiting
time.sleep(self._sleep_time_ms / 1000.0)
def _recv_body(self) -> None:
"""I/O thread body - polls recv and processes messages with handlers directly."""
while self._recv_running.is_set():
for key, msg in self._io_read_once(timeout=self._sleep_time_reader_ms/1000.0):
# Process directly in recv thread (handlers are async, dispatched to event loop)
self._process_message_with_handlers(key, msg)
# ─────────────────────────────────────────── message processing ──
def _invoke_callback_safely(
self, callback: Callable, key: str, msg: Any, *, cpu_heavy: bool = False,
):
"""Safely invoke an async callback by scheduling it on the main event loop.
All callbacks are required to be async functions.
When *cpu_heavy* is ``True`` the coroutine is wrapped so that its body
executes inside ``loop.run_in_executor`` (thread pool), keeping the
event loop free for other work.
"""
try:
if self._loop and not self._loop.is_closed():
if cpu_heavy:
async def _offloaded():
await self._loop.run_in_executor(self._exe, lambda: asyncio.run(callback(msg)))
asyncio.run_coroutine_threadsafe(_offloaded(), self._loop)
else:
asyncio.run_coroutine_threadsafe(callback(msg), self._loop)
else:
self._log.warning(
"[ExternalProxy] Cannot invoke async callback for key '%s': "
"event loop not available",
key,
)
except Exception as exc:
self._log.error(
"[ExternalProxy] Error invoking callback for key '%s': %s",
key, exc,
)
def _process_message_with_handlers(self, key: str, msg: Any):
"""
Process a single message by invoking all registered handlers for the key.
This runs in the I/O recv thread and handles duplicate filtering.
Handlers are async and scheduled on the event loop.
"""
current_time = time.time()
for cb in self._handlers.get(key, []):
try:
# Check if duplicate filtering is enabled for this handler
handler_config = self._handler_configs.get(key, {}).get(cb, {})
filter_interval = handler_config.get('duplicate_filter_interval')
should_call_handler = True
if filter_interval is not None:
# Convert message to string for comparison
msg_str = str(msg)
handler_key = f"{key}_{id(cb)}"
# Check if we've seen this exact message recently for this handler
if handler_key in self._last_message_times:
last_msg_str, last_time = self._last_message_times[handler_key]
if (msg_str == last_msg_str and
current_time - last_time < filter_interval):
should_call_handler = False
self._log.debug(
"[ExternalProxy] Filtered duplicate message for handler %s on key '%s'",
cb, key
)
# Update last message time for this handler
if should_call_handler:
self._last_message_times[handler_key] = (msg_str, current_time)
if should_call_handler:
is_cpu_heavy = handler_config.get('cpu_heavy', False)
self._invoke_callback_safely(cb, key, msg, cpu_heavy=is_cpu_heavy)
self._log.debug(
"[ExternalProxy] handler %s called for key '%s': %s",
cb, key, msg
)
except Exception as exc: # never kill the loop
self._log.error(
"[ExternalProxy] handler %s raised: %s",
cb, exc
)
# ──────────────────────────────────────────────────────────────────────────────
[docs]
class MavLinkExternalProxy(ExternalProxy):
"""
Threaded MAVLink driver using `pymavlink`.
Buffers used
------------
* ``send["mav"]`` - outbound :class:`MAVLink_message`
* ``recv["mav"]`` - any inbound message
* ``recv[str(msg.get_msgId())]`` - by numeric ID
* ``recv[msg.get_type()]`` - by string type
"""
def __init__(
self,
endpoint: str,
baud: int,
source_system_id: int,
source_component_id: int,
maxlen: int,
mavlink_worker_sleep_ms: float = 1.0,
mavlink_heartbeat_send_frequency: float = 5.0,
root_sd_path: str = 'fs/microsd/log',
):
super().__init__(maxlen=maxlen, sleep_time_ms=mavlink_worker_sleep_ms)
self.endpoint = endpoint
self.baud = baud
self.source_system_id = source_system_id
self.source_component_id = source_component_id
self.mavlink_heartbeat_send_frequency = mavlink_heartbeat_send_frequency
self.root_sd_path = root_sd_path
self.master: mavutil.mavfile | None = None
# Set up file-only logging
log_dir = Config.PETAL_LOG_DIR
log_path = Path(log_dir, "app-mavlinkexternalproxy.log")
self._log_msgs = setup_file_only_logger(
name="MavLinkExternalProxyMsgs",
log_file=log_path,
level="INFO"
)
self._log = logging.getLogger("MavLinkExternalProxy")
self._loop: asyncio.AbstractEventLoop | None = None
self._exe = ThreadPoolExecutor(max_workers=1, thread_name_prefix="MavLinkExternalProxy")
self.connected = False
self._last_heartbeat_time = time.time()
self.leaf_fc_connected = False
self._last_leaf_fc_heartbeat_time = time.time()
self._connection_check_interval = 5.0 # Check connection every 5 seconds
self._heartbeat_timeout = 10.0 # Consider disconnected if no heartbeat for 60s
self._leaf_fc_heartbeat_timeout = 5.0 # Consider Leaf FC disconnected if no heartbeat for 30s
self._reconnect_interval = 2.0 # Wait 2s between reconnection attempts
self._heartbeat_task = None
self._connection_monitor_task = None
self._reconnect_pending = False
self._mav_lock = threading.Lock()
self._download_lock = threading.Lock() # Prevent concurrent downloads
# Rate limiting for logging
self._last_log_time = {}
self._log_interval = {
'HEARTBEAT': 10.0, # Log heartbeats every 10 seconds max
'MISSION_CURRENT': 5.0, # Log mission current every 5 seconds max
'ATTITUDE': 30.0, # Log attitude every 30 seconds max
'POSITION': 30.0, # Log position every 30 seconds max
'DEFAULT': 2.0 # Default interval for other messages
}
# Messages to suppress completely (only show at DEBUG level)
self._suppress_messages = {
'SERVO_OUTPUT_RAW',
'ACTUATOR_MOTORS',
'ATTITUDE_QUATERNION',
'LOCAL_POSITION_NED',
'GLOBAL_POSITION_INT'
}
def _norm_name(self, x):
"""Normalize parameter name by removing null padding."""
try:
return x.decode("ascii").rstrip("\x00")
except AttributeError:
return str(x).rstrip("\x00")
def _decode_param_value(self, msg):
"""
Decode a PARAM_VALUE message.
If type==INT32, reinterpret the float bits as int32.
If type==UINT32, reinterpret the float bits as uint32.
Parameters
----------
msg : MAVLink_param_value_message
The PARAM_VALUE message to decode.
Returns
-------
Tuple[str, Any]
The parameter name and decoded value.
"""
name = self._norm_name(msg.param_id)
if msg.param_type in _INT_TYPES:
raw_u32 = _f32_bits_to_u32(msg.param_value)
if msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT8:
val = raw_u32 & 0xFF
elif msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT8:
val = _sign_extend(raw_u32 & 0xFF, 8)
elif msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT16:
val = raw_u32 & 0xFFFF
elif msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT16:
val = _sign_extend(raw_u32 & 0xFFFF, 16)
elif msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT32:
val = raw_u32
elif msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT32:
val = _sign_extend(raw_u32, 32)
else:
# Shouldn't happen due to _INT_TYPES, but keep safe
val = raw_u32
return name, val
# REAL32 (what PX4 mostly uses)
if msg.param_type == mavutil.mavlink.MAV_PARAM_TYPE_REAL32:
return name, float(msg.param_value)
# Classic PARAM_* can't reliably carry REAL64/INT64/UINT64; if they appear, just pass through float field
return name, float(msg.param_value)
def _encode_param_value(self, value: Any, param_type: int) -> float:
"""
Encode a parameter value for transmission.
For INT32 types, encode the int32 bits as float32 for wire transmission.
For UINT32 types, encode the uint32 bits as float32 for wire transmission.
Returns the float value to put in param_value field.
Parameters
----------
value : Any
The parameter value to encode.
param_type : int
The MAV_PARAM_TYPE_* type of the parameter.
Returns
-------
float
The encoded float value for transmission.
"""
if param_type in _INT_TYPES:
v = int(value)
if param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT8:
_check_int_range(v, 8, signed=False)
u32 = v
elif param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT8:
_check_int_range(v, 8, signed=True)
u32 = v & 0xFF
elif param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT16:
_check_int_range(v, 16, signed=False)
u32 = v
elif param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT16:
_check_int_range(v, 16, signed=True)
u32 = v & 0xFFFF
elif param_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT32:
_check_int_range(v, 32, signed=False)
u32 = v
elif param_type == mavutil.mavlink.MAV_PARAM_TYPE_INT32:
_check_int_range(v, 32, signed=True)
u32 = v & 0xFFFFFFFF
else:
u32 = v & 0xFFFFFFFF
return _u32_to_f32_bits(u32)
if param_type == mavutil.mavlink.MAV_PARAM_TYPE_REAL32:
return float(value)
# Classic PARAM_SET can't carry 64-bit values; refuse explicitly
if param_type in (
mavutil.mavlink.MAV_PARAM_TYPE_UINT64,
mavutil.mavlink.MAV_PARAM_TYPE_INT64,
mavutil.mavlink.MAV_PARAM_TYPE_REAL64,
):
raise ValueError("64-bit PARAM_SET not supported; use PARAM_EXT_* for 64-bit/REAL64 parameters")
# Fallback
return float(value)
def _should_log_message(self, msg_type: str) -> bool:
"""Determine if a message should be logged based on rate limiting"""
import time
current_time = time.time()
# Suppress high-frequency messages completely at INFO level
if msg_type in self._suppress_messages:
return False
# Get the appropriate interval for this message type
interval = self._log_interval.get(msg_type, self._log_interval['DEFAULT'])
# Check if enough time has passed since last log
last_log = self._last_log_time.get(msg_type, 0.0)
if current_time - last_log >= interval:
self._last_log_time[msg_type] = current_time
return True
return False
@property
def target_system(self) -> int:
"""Return the target system ID of the MAVLink connection."""
if self.master:
return self.master.target_system
return 0
@property
def target_component(self) -> int:
"""Return the target component ID of the MAVLink connection."""
if self.master:
return self.master.target_component
return 0
# ------------------------ life-cycle --------------------- #
[docs]
async def start(self):
"""Open the MAVLink connection then launch the worker thread."""
self._loop = asyncio.get_running_loop()
# Schedule initial connection attempt in background (non-blocking)
# This allows the server to start immediately without waiting for MAVLink
asyncio.create_task(self._initial_connection_attempt())
# Start the worker thread first
await super().start()
# Start connection monitoring and heartbeat tasks
self._connection_monitor_task = asyncio.create_task(self._monitor_connection())
# send heartbeat at configured frequency
if self.mavlink_heartbeat_send_frequency is not None:
try:
frequency = float(self.mavlink_heartbeat_send_frequency)
if frequency <= 0:
raise ValueError("Heartbeat frequency must be positive")
except ValueError as exc:
self._log.error(f"Invalid self.mavlink_heartbeat_send_frequency: {exc}")
frequency = 5.0
self._heartbeat_task = asyncio.create_task(self._send_heartbeat_periodically(frequency=frequency))
async def _initial_connection_attempt(self):
"""Attempt initial MAVLink connection in the background."""
try:
self._log.info("Attempting initial MAVLink connection to %s", self.endpoint)
await self._establish_connection()
if self.connected:
self._log.info("Initial MAVLink connection successful")
else:
self._log.info("Initial MAVLink connection failed - will retry in background")
except Exception as e:
self._log.warning(f"Initial MAVLink connection attempt failed: {e} - will retry in background")
async def _establish_connection(self):
"""Establish MAVLink connection and wait for heartbeat."""
try:
if self.master:
# Check if any FTP operations are in progress before closing connection
if self._download_lock.locked() or self._mav_lock.locked():
self._log.warning("Cannot establish new connection - FTP operation in progress")
return
try:
self.master.close()
except:
pass # Ignore errors when closing old connection
# Run the blocking connection establishment in a separate thread
self.master = await self._loop.run_in_executor(
self._exe,
self._create_mavlink_connection
)
# Try to get a heartbeat with timeout - also run in executor
try:
heartbeat_received = await self._loop.run_in_executor(
self._exe,
self._wait_for_heartbeat
)
if heartbeat_received:
self.connected = True
self._last_heartbeat_time = time.time()
self._log.info("MAVLink connection established - Heartbeat from sys %s, comp %s",
self.master.target_system, self.master.target_component)
# Register heartbeat handler to track connection health
if self._on_heartbeat_received not in self._handlers.get(str(mavlink_dialect.MAVLINK_MSG_ID_HEARTBEAT), []):
self.register_handler(str(mavlink_dialect.MAVLINK_MSG_ID_HEARTBEAT), self._on_heartbeat_received)
if self._on_leaf_fc_heartbeat_received not in self._handlers.get(str(mavlink_dialect.MAVLINK_MSG_ID_LEAF_HEARTBEAT), []):
self.register_handler(str(mavlink_dialect.MAVLINK_MSG_ID_LEAF_HEARTBEAT), self._on_leaf_fc_heartbeat_received)
else:
self.connected = False
self._log.warning("No heartbeat received from MAVLink endpoint %s", self.endpoint)
except (OSError, socket.error) as e:
self.connected = False
self._log.warning(f"Socket error during heartbeat wait: {e}")
except Exception as e:
self.connected = False
self._log.warning(f"Error waiting for heartbeat: {e}")
except Exception as e:
self.connected = False
self._log.error(f"Error establishing MAVLink connection: {str(e)}")
if self.master:
try:
self.master.close()
except:
pass
self.master = None
def _create_mavlink_connection(self):
"""Create MAVLink connection in a separate thread."""
return mavutil.mavlink_connection(
self.endpoint,
baud=self.baud,
dialect="all",
source_system=self.source_system_id,
source_component=self.source_component_id
)
def _wait_for_heartbeat(self):
"""Wait for heartbeat in a separate thread."""
if self.master:
return self.master.wait_heartbeat(timeout=5)
return False
async def _on_heartbeat_received(self, msg):
"""Handler for incoming heartbeat messages to track connection health."""
self._last_heartbeat_time = time.time()
if not self.connected:
self.connected = True
self._log.info("MAVLink connection re-established")
async def _on_leaf_fc_heartbeat_received(self, msg):
"""Handler for incoming heartbeat messages to track connection health."""
self._last_leaf_fc_heartbeat_time = time.time()
if not self.leaf_fc_connected:
self.leaf_fc_connected = True
self._log.info("Leaf FC connection re-established")
async def _monitor_connection(self):
"""Monitor connection health and trigger reconnection if needed."""
while self._recv_running.is_set():
try:
# Skip monitoring if _mav_lock is held (FTP operation in progress)
if self._download_lock.locked():
self._last_leaf_fc_heartbeat_time += self._connection_check_interval
self._last_heartbeat_time += self._connection_check_interval
self._log.debug("Skipping connection monitoring - FTP operation in progress")
await asyncio.sleep(self._connection_check_interval)
continue
current_time = time.time()
# Check if we haven't received a Leaf FC heartbeat recently
if abs(current_time - self._last_leaf_fc_heartbeat_time) > self._leaf_fc_heartbeat_timeout:
if self.leaf_fc_connected:
self._log.warning("No Leaf FC heartbeat received for %.1fs - Leaf FC connection lost",
current_time - self._last_leaf_fc_heartbeat_time)
self.leaf_fc_connected = False
else:
self._log.warning("No Leaf FC heartbeat received for %.1fs - still disconnected",
current_time - self._last_leaf_fc_heartbeat_time)
# Check if we haven't received a heartbeat recently
if abs(current_time - self._last_heartbeat_time) > self._heartbeat_timeout:
if self.connected:
self._log.warning("No heartbeat received for %.1fs - connection lost",
current_time - self._last_heartbeat_time)
self.connected = False
else:
self._log.warning("No heartbeat received for %.1fs - still disconnected",
current_time - self._last_heartbeat_time)
# Attempt reconnection if not connected - BUT only if no FTP operations are in progress
if not self.connected and self._recv_running.is_set():
# Double-check that no FTP operations are running before attempting reconnection
if self._download_lock.locked() or self._mav_lock.locked():
self._log.debug("Delaying reconnection - FTP operation in progress")
await asyncio.sleep(self._connection_check_interval)
continue
self._log.info("Attempting to reconnect to MAVLink...")
await self._establish_connection()
if not self.connected:
await asyncio.sleep(self._reconnect_interval)
# Check connection health periodically
await asyncio.sleep(self._connection_check_interval)
except asyncio.CancelledError:
break
except Exception as e:
self._log.error(f"Error in connection monitor: {str(e)}")
await asyncio.sleep(self._reconnect_interval)
def _schedule_reconnect(self) -> None:
"""Called from the FTP thread when it detects a dead FD."""
if not self._recv_running.is_set():
return
# avoid stampeding: only schedule once
if getattr(self, "_reconnect_pending", False):
return
self._reconnect_pending = True
async def _task():
try:
# Wait for any ongoing FTP operations to complete before reconnecting
while self._download_lock.locked() or self._mav_lock.locked():
self._log.debug("Waiting for FTP operations to complete before reconnecting...")
await asyncio.sleep(0.5)
await self._establish_connection()
except Exception:
# force a fresh BlockingParser next time only on failure
self._parser = None
raise
finally:
self._reconnect_pending = False
asyncio.run_coroutine_threadsafe(_task(), self._loop)
async def _send_heartbeat_periodically(self, frequency: float = 5.0):
"""Periodically send a MAVLink heartbeat message."""
interval = 1.0 / frequency
while self._send_running.is_set():
try:
if self.connected and self.master:
await self.send_heartbeat()
else:
self._log_msgs.debug("Skipping heartbeat send - not connected")
except Exception as exc:
self._log_msgs.error(f"Failed to send heartbeat: {exc}")
# Don't mark as disconnected just for heartbeat send failure
await asyncio.sleep(interval)
[docs]
async def send_heartbeat(self):
"""Send a MAVLink heartbeat message."""
if not self.master:
raise RuntimeError("MAVLink master not initialized")
if not self.connected:
raise RuntimeError("MAVLink not connected")
msg = self.master.mav.heartbeat_encode(
mavutil.mavlink.MAV_TYPE_GCS, # GCS type
mavutil.mavlink.MAV_AUTOPILOT_INVALID, # Autopilot type
0, # Base mode
0, # Custom mode
mavutil.mavlink.MAV_STATE_ACTIVE # System state
)
self.send("mav", msg)
self._log_msgs.debug("Sent MAVLink heartbeat")
[docs]
async def stop(self):
"""Stop the worker and close the link."""
# Cancel monitoring tasks
if self._connection_monitor_task:
self._connection_monitor_task.cancel()
try:
await self._connection_monitor_task
except asyncio.CancelledError:
pass
if self._heartbeat_task:
self._heartbeat_task.cancel()
try:
await self._heartbeat_task
except asyncio.CancelledError:
pass
# Stop the worker thread
await super().stop()
# Close MAVLink connection
if self.master:
self.master.close()
self.master = None
self.connected = False
# ------------------- I/O primitives --------------------- #
def _io_read_once(self, timeout: float = 0.0) -> List[Tuple[str, Any]]:
if not self.master or not self.connected:
return []
out: List[Tuple[str, Any]] = []
try:
while True:
with self._mav_lock:
msg = self.master.recv_match(blocking=True, timeout=timeout)
if msg is None:
break
msg_type = msg.get_type()
msg_id = msg.get_msgId()
out.append(("mav", msg))
out.append((str(msg_id), msg))
out.append((msg_type, msg))
except (OSError, socket.error) as e:
# Handle connection errors gracefully
if e.errno in [errno.EBADF, errno.ECONNRESET, errno.ECONNREFUSED]:
self._log_msgs.debug(f"MAVLink connection lost during read: {e}")
# Don't mark as disconnected here, let the heartbeat monitor handle it
else:
self._log_msgs.error(f"Unexpected error reading MAVLink messages: {e}")
time.sleep(timeout)
except Exception as e:
self._log_msgs.error(f"Error reading MAVLink messages: {e}")
# Don't mark as disconnected here, let the heartbeat monitor handle it
time.sleep(timeout)
return out
def _io_write_once(self, batches):
"""Send queued MAVLink messages."""
if not self.master or not self.connected:
return
for key, msgs in batches.items():
for msg in msgs:
try:
msg_type = msg.get_type() if hasattr(msg, 'get_type') else 'UNKNOWN'
msg_id = msg.get_msgId() if hasattr(msg, 'get_msgId') else 'N/A'
# Only log if rate limiting allows
if self._should_log_message(msg_type):
self._log_msgs.info(f"📤 MAVLink TX: {msg_type} (ID: {msg_id}) - {msg}")
with self._mav_lock:
self.master.mav.send(msg)
except (OSError, socket.error) as e:
if e.errno in [errno.EBADF, errno.ECONNRESET, errno.ECONNREFUSED, errno.EPIPE]:
self._log_msgs.debug(f"MAVLink connection lost during write: {e}")
# Don't mark as disconnected here, let the heartbeat monitor handle it
break # Stop trying to send more messages
else:
self._log_msgs.error(f"Unexpected error sending MAVLink message {key}: {e}")
except Exception as exc:
self._log_msgs.error(
"Failed to send MAVLink message %s: %s",
key, exc
)
# Don't mark as disconnected here, let the heartbeat monitor handle it
# ------------------- helpers exposed to petals --------- #
[docs]
def build_req_msg_long(self, message_id: int) -> mavutil.mavlink.MAVLink_command_long_message:
"""
Build a MAVLink command to request a specific message type.
Parameters
----------
message_id : int
The numeric ID of the MAVLink message to request.
Returns
-------
mavutil.mavlink.MAVLink_command_long_message
The MAVLink command message to request the specified message.
Raises
------
RuntimeError
If MAVLink connection is not established.
"""
if not self.master or not self.connected:
raise RuntimeError("MAVLink connection not established")
cmd = self.master.mav.command_long_encode(
self.master.target_system,
self.master.target_component,
mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE,
0, # confirmation
float(message_id), # param1: Message ID to be streamed
0,
0,
0,
0,
0,
0
)
return cmd
[docs]
def build_req_msg_log_request(self, message_id: int) -> mavutil.mavlink.MAVLink_log_request_list_message:
"""
Build a MAVLink command to request a specific log message.
Parameters
----------
message_id : int
The numeric ID of the log message to request.
Returns
-------
mavutil.mavlink.MAVLink_log_request_list_message
The MAVLink command message to request the specified log.
Raises
------
RuntimeError
If MAVLink connection is not established.
"""
if not self.master or not self.connected:
raise RuntimeError("MAVLink connection not established")
cmd = self.master.mav.log_request_list_encode(
self.master.target_system,
self.master.target_component,
0, # start id
0xFFFF # end id
)
return cmd
[docs]
def build_req_msg_log_data(
self,
log_id: int,
ofs: int = 0,
count: int = 0xFFFFFFFF,
) -> mavutil.mavlink.MAVLink_log_request_data_message:
"""
Build LOG_REQUEST_DATA for a given log.
Parameters
----------
log_id : int
The log id from LOG_ENTRY.id
ofs : int
Offset into the log (usually 0 for first request)
count : int
Number of bytes requested. For PX4/ArduPilot it's common
to use 0xFFFFFFFF to say "send the whole log".
"""
if not self.master or not self.connected:
raise RuntimeError("MAVLink connection not established")
return self.master.mav.log_request_data_encode(
self.master.target_system,
self.master.target_component,
log_id,
ofs,
count,
)
[docs]
def build_param_request_read(self, name: str, index: int = -1):
"""
Build MAVLink PARAM_REQUEST_READ for a named or indexed parameter.
If index == -1, the 'name' is used; otherwise PX4 will ignore name.
"""
if not self.master or not self.connected:
raise RuntimeError("MAVLink connection not established")
# pymavlink will pad/trim to 16 chars; PX4 expects ASCII
return self.master.mav.param_request_read_encode(
self.master.target_system,
self.master.target_component,
name.encode("ascii"),
index
)
[docs]
def build_param_request_list(self):
"""Build MAVLink PARAM_REQUEST_LIST to fetch the full table."""
if not self.master or not self.connected:
raise RuntimeError("MAVLink connection not established")
return self.master.mav.param_request_list_encode(
self.master.target_system,
self.master.target_component
)
[docs]
def build_param_set(self, name: str, value: Any, param_type: int):
"""
Build MAVLink PARAM_SET for setting a parameter.
Handles INT32 encoding where int32 values are encoded as float32 bits for wire transmission.
"""
if not self.master or not self.connected:
raise RuntimeError("MAVLink connection not established")
# Use the encoding method for proper INT32 handling
encoded_value = self._encode_param_value(value, param_type)
return self.master.mav.param_set_encode(
self.master.target_system,
self.master.target_component,
name.encode("ascii"),
encoded_value, # properly encoded value
param_type # mavutil.mavlink.MAV_PARAM_TYPE_*
)
[docs]
def build_reboot_command(
self,
reboot_autopilot: bool = True,
reboot_onboard_computer: bool = False,
) -> mavutil.mavlink.MAVLink_command_long_message:
"""
Build a MAVLink command to reboot the autopilot and/or onboard computer.
Parameters
----------
reboot_autopilot : bool
If True, reboot the autopilot (PX4/ArduPilot). Default is True.
reboot_onboard_computer : bool
If True, reboot the onboard computer. Default is False.
Returns
-------
mavutil.mavlink.MAVLink_command_long_message
The MAVLink COMMAND_LONG message for reboot.
Raises
------
RuntimeError
If MAVLink connection is not established.
"""
if not self.master or not self.connected:
raise RuntimeError("MAVLink connection not established")
# param1: 1=reboot autopilot, 0=do nothing
# param2: 1=reboot onboard computer, 0=do nothing
param1 = 1.0 if reboot_autopilot else 0.0
param2 = 1.0 if reboot_onboard_computer else 0.0
return self.master.mav.command_long_encode(
self.master.target_system,
self.master.target_component,
mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN,
0, # confirmation
param1, # param1: reboot autopilot
param2, # param2: reboot onboard computer
0, 0, 0, 0, 0 # param3..param7 unused
)
[docs]
def build_motor_value_command(
self,
motor_idx: int,
motor_value: float,
timeout: float
) -> mavutil.mavlink.MAVLink_command_long_message:
"""Build MAV_CMD_ACTUATOR_TEST command for a motor."""
# param1 = throttle value (0-1 or NaN)
# param2 = timeout in seconds
# param5 = motor mapping (110x where x is motor index
return self.master.mav.command_long_encode(
1, # TODO: investigate best practice
1, # TODO: investigate best practice
mavutil.mavlink.MAV_CMD_ACTUATOR_TEST,
0, # confirmation
motor_value, # param1: Motor value (0-1 or NaN)
timeout, # param2: Timeout in seconds
0, # Reserved
0, # Reserved
float(1100 + motor_idx), # param5: Motor mapping (110x)
0, # Reserved
0 # Reserved
)
[docs]
def build_request_message_command(self) -> mavutil.mavlink.MAVLink_command_long_message:
"""
Build a MAVLink command to request a specific message once
using MAV_CMD_REQUEST_MESSAGE (common.xml).
Returns
-------
mavutil.mavlink.MAVLink_command_long_message
The MAVLink COMMAND_LONG message requesting the given message.
Raises
------
RuntimeError
If MAVLink connection is not established.
"""
if not self.master or not self.connected:
raise RuntimeError("MAVLink connection not established")
return self.master.mav.command_long_encode(
self.master.target_system,
self.master.target_component,
mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE,
0, # confirmation
float(mavutil.mavlink.MAVLINK_MSG_ID_SYSTEM_TIME), # param1: requested message id
0, 0, 0, 0, 0, 0 # param2..param7 unused
)
[docs]
def build_shell_serial_control_msgs(
self,
text: str,
device: int = 10, # PX4 mavlink_shell.py uses devnum=10
respond: bool = True,
exclusive: bool = True,
) -> list[mavutil.mavlink.MAVLink_serial_control_message]:
"""
Build SERIAL_CONTROL messages that write `text` to the PX4 MAVLink shell.
Splits into <=70 byte chunks (MAVLink SERIAL_CONTROL data field).
Parameters
----------
text : str
The text to send to the MAVLink shell.
device : int
The device number to use (default 10 for PX4 mavlink_shell).
respond : bool
If True, set the RESPOND flag (default True).
exclusive : bool
If True, set the EXCLUSIVE flag (default True).
"""
if not self.master or not self.connected:
raise RuntimeError("MAVLink connection not established")
flags = 0
if respond:
flags |= mavutil.mavlink.SERIAL_CONTROL_FLAG_RESPOND
if exclusive:
flags |= mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE
b = text.encode("utf-8")
msgs = []
# MAVLink shell implementation chunks at 70 bytes
for i in range(0, len(b), 70):
chunk = b[i:i+70]
data = list(chunk) + [0] * (70 - len(chunk))
msgs.append(
self.master.mav.serial_control_encode(
device, # device
flags, # flags
0, # timeout (PX4 ignores it)
0, # baudrate (0 = no change)
len(chunk), # count
data, # data[70]
)
)
return msgs
[docs]
async def send_shell_command(
self,
command: str,
settle_time: float = 0.5,
timeout: float = 3.0,
) -> str:
"""
Send a shell command to PX4 via MAVLink SERIAL_CONTROL and wait
for PX4 to acknowledge it.
Uses :py:meth:`send_and_wait` so the command is transmitted through
the normal I/O queue **and** we block until PX4 replies with at
least one ``SERIAL_CONTROL`` packet (flag ``REPLY``), confirming
the shell received the command. A *settle_time* pause is added
afterwards so PX4 has time to act on it (e.g. release file
handles).
Parameters
----------
command : str
The shell command to send (e.g. ``"logger stop\\n"``).
settle_time : float
Extra time (seconds) to wait **after** PX4 acknowledges the
command, giving it time to act (e.g. close file handles).
timeout : float
Maximum time (seconds) to wait for a reply before raising
``TimeoutError``.
Returns
-------
str
The text content of the first SERIAL_CONTROL reply from PX4,
or an empty string if no reply was received.
"""
if not self.connected:
raise RuntimeError("MAVLink connection not established")
msgs = self.build_shell_serial_control_msgs(command)
if not msgs:
return ""
# We only need to send_and_wait on the *last* chunk; PX4 will
# reply once it has the full command (terminated by \n).
# Send all chunks except the last one directly.
for msg in msgs[:-1]:
self.send("mav", msg)
serial_control_id = str(mavutil.mavlink.MAVLINK_MSG_ID_SERIAL_CONTROL)
reply_received = False
reply_chunks: list[str] = []
def _collector(pkt) -> bool:
"""Return True once we get any SERIAL_CONTROL reply from PX4."""
nonlocal reply_received
flags = getattr(pkt, "flags", 0)
if flags & mavutil.mavlink.SERIAL_CONTROL_FLAG_REPLY:
# Capture reply text for diagnostics
count = getattr(pkt, "count", 0)
data = getattr(pkt, "data", [])
if count > 0 and data:
reply_chunks.append(
bytes(data[:count]).decode("utf-8", errors="replace")
)
reply_received = True
return True
return False
try:
await self.send_and_wait(
match_key=serial_control_id,
request_msg=msgs[-1],
collector=_collector,
timeout=timeout,
)
reply_text = "".join(reply_chunks).strip()
self._log.info(
f"Shell command {command.strip()!r} acknowledged by PX4"
+ (f" — reply: {reply_text!r}" if reply_text else "")
)
except TimeoutError:
reply_text = ""
self._log.warning(
f"No SERIAL_CONTROL reply for shell command "
f"{command.strip()!r} within {timeout}s — "
f"proceeding anyway (command may still have been received)"
)
# Give PX4 time to act on the command (e.g. release file handles)
if settle_time > 0:
await asyncio.sleep(settle_time)
return reply_text
[docs]
async def stop_px4_logging(self, settle_time: float = 2.0, timeout: float = 5.0) -> None:
"""
Stop PX4's SD-card logger so that log files can be deleted.
Sends ``logger stop\\n`` via the MAVLink shell and waits for PX4
to acknowledge. The logger module releases its file handles,
making files deletable via MAVFTP.
"""
self._log.info("Stopping PX4 logger to allow file deletion…")
await self.send_shell_command("logger stop\n", settle_time=settle_time, timeout=timeout)
self._log.info("PX4 logger stopped.")
[docs]
async def start_px4_logging(self, settle_time: float = 1.0, timeout: float = 5.0) -> None:
"""
Restart PX4's SD-card logger after file operations.
Sends ``logger start\\n`` via the MAVLink shell.
"""
self._log.info("Restarting PX4 logger…")
await self.send_shell_command("logger start\n", settle_time=settle_time, timeout=timeout)
self._log.info("PX4 logger restarted.")
[docs]
async def delete_file_via_shell(
self,
remote_path: str,
timeout: float = 5.0,
command: str = "rm",
) -> bool:
"""
Delete a file (or empty directory) on PX4 using a NuttX shell command.
This is a fallback for when MAVFTP ``cmd_rm`` fails with
``FileProtected`` (code 9). The shell ``rm`` bypasses the
MAVFTP server's protection checks and deletes the file
directly via NuttX ``unlink()``.
Parameters
----------
remote_path : str
Full path on the vehicle, e.g.
``"fs/microsd/log/2025-09-01/07_50_39.ulg"``.
The path must start without a leading ``/``.
timeout : float
Maximum seconds to wait for PX4 shell acknowledgment.
command : str
NuttX shell command to use. ``"rm"`` for files,
``"rmdir"`` for empty directories.
Returns
-------
bool
``True`` if the command completed (no error detected in
the reply text), ``False`` otherwise.
"""
# NuttX rm/rmdir expects an absolute path starting with /
abs_path = remote_path if remote_path.startswith("/") else f"/{remote_path}"
self._log.info(f"Shell {command} fallback: {command} {abs_path}")
try:
reply = await self.send_shell_command(
f"{command} {abs_path}\n", settle_time=0.3, timeout=timeout
)
# NuttX rm prints nothing on success; on error it prints
# something like "rm: /path: No such file or directory"
lower = reply.lower()
if reply and (
"no such" in lower
or "error" in lower
or "failed" in lower
or "invalid" in lower
or "not empty" in lower
):
self._log.warning(
f"Shell {command} for {remote_path} returned error: {reply!r}"
)
return False
self._log.info(f"Shell {command} completed for {remote_path}")
return True
except Exception as e:
self._log.error(f"Shell {command} failed for {remote_path}: {e}")
return False
[docs]
async def reboot_autopilot(
self,
reboot_onboard_computer: bool = False,
timeout: float = 3.0,
) -> RebootAutopilotResponse:
"""
Send a reboot command to the autopilot (PX4/ArduPilot).
This sends MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN and waits for a
COMMAND_ACK response.
Parameters
----------
reboot_onboard_computer : bool
If True, also reboot the onboard computer. Default is False.
timeout : float
Maximum time to wait for acknowledgment. Default is 3.0 seconds.
Returns
-------
RebootAutopilotResponse
Structured response indicating success/failure and reason.
Raises
------
RuntimeError
If MAVLink connection is not established.
TimeoutError
If no acknowledgment is received within the timeout.
Notes
-----
After sending this command, the connection to the autopilot will be lost
as it reboots. The proxy will attempt to reconnect automatically.
"""
if not self.connected:
raise RuntimeError("MAVLink connection not established")
cmd = self.build_reboot_command(
reboot_autopilot=True,
reboot_onboard_computer=reboot_onboard_computer,
)
result = {"ack_received": False, "result": None}
def _collector(pkt) -> bool:
if pkt.get_type() != "COMMAND_ACK":
return False
if pkt.command == mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN:
result["ack_received"] = True
result["result"] = pkt.result
return True
return False
COMMAND_ACK_ID = str(mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_ACK)
# Map ACK result codes -> status codes (for failures)
_ACK_TO_STATUS = {
mavutil.mavlink.MAV_RESULT_DENIED: RebootStatusCode.FAIL_ACK_DENIED,
mavutil.mavlink.MAV_RESULT_TEMPORARILY_REJECTED: RebootStatusCode.FAIL_ACK_TEMPORARILY_REJECTED,
mavutil.mavlink.MAV_RESULT_UNSUPPORTED: RebootStatusCode.FAIL_ACK_UNSUPPORTED,
mavutil.mavlink.MAV_RESULT_FAILED: RebootStatusCode.FAIL_ACK_FAILED,
mavutil.mavlink.MAV_RESULT_IN_PROGRESS: RebootStatusCode.FAIL_ACK_IN_PROGRESS,
getattr(mavutil.mavlink, "MAV_RESULT_CANCELLED", -999): RebootStatusCode.FAIL_ACK_CANCELLED,
}
_ACK_NAME = {
mavutil.mavlink.MAV_RESULT_ACCEPTED: "ACCEPTED",
mavutil.mavlink.MAV_RESULT_TEMPORARILY_REJECTED: "TEMPORARILY_REJECTED",
mavutil.mavlink.MAV_RESULT_DENIED: "DENIED",
mavutil.mavlink.MAV_RESULT_UNSUPPORTED: "UNSUPPORTED",
mavutil.mavlink.MAV_RESULT_FAILED: "FAILED",
mavutil.mavlink.MAV_RESULT_IN_PROGRESS: "IN_PROGRESS",
getattr(mavutil.mavlink, "MAV_RESULT_CANCELLED", -999): "CANCELLED",
}
try:
await self.send_and_wait(
match_key=COMMAND_ACK_ID,
request_msg=cmd,
collector=_collector,
timeout=timeout,
)
except TimeoutError:
# No ACK: verify reboot using heartbeat timestamps (populated by your heartbeat handler)
self._log.warning(
"Reboot command sent but no ACK received; verifying via heartbeat drop/return..."
)
last_hb = getattr(self, "_last_heartbeat_time", None)
if last_hb is None:
self._log.warning(
"No heartbeat timestamp available (_last_heartbeat_time is None); reboot not confirmed."
)
return RebootAutopilotResponse(
success=False,
status_code=RebootStatusCode.FAIL_NO_HEARTBEAT_TRACKING,
reason="No ACK received and heartbeat tracking is unavailable (_last_heartbeat_time is None).",
ack_result=None,
)
async def _wait_for_heartbeat_drop(
wait_window_s: float = 2.0,
gap_s: float = 1.0,
poll_s: float = 0.05,
) -> bool:
deadline = time.time() + wait_window_s
while time.time() < deadline:
last = getattr(self, "_last_heartbeat_time", None)
if last is not None and (time.time() - last) >= gap_s:
return True
await asyncio.sleep(poll_s)
return False
async def _wait_for_heartbeat_return(
since_ts: float,
wait_window_s: float = 30.0,
poll_s: float = 0.05,
) -> bool:
deadline = time.time() + wait_window_s
while time.time() < deadline:
last = getattr(self, "_last_heartbeat_time", None)
if last is not None and last > since_ts:
return True
await asyncio.sleep(poll_s)
return False
dropped = await _wait_for_heartbeat_drop()
if not dropped:
self._log.warning("No ACK and no heartbeat drop observed; reboot not confirmed.")
return RebootAutopilotResponse(
success=False,
status_code=RebootStatusCode.FAIL_REBOOT_NOT_CONFIRMED_NO_HB_DROP,
reason="No ACK received and heartbeat did not drop within the expected window.",
ack_result=None,
)
drop_mark = getattr(self, "_last_heartbeat_time", last_hb) or last_hb
returned = await _wait_for_heartbeat_return(since_ts=drop_mark)
if not returned:
self._log.warning("Heartbeat dropped but did not return within the reboot window; reboot not confirmed.")
return RebootAutopilotResponse(
success=False,
status_code=RebootStatusCode.FAIL_REBOOT_NOT_CONFIRMED_HB_NO_RETURN,
reason="Heartbeat drop observed but heartbeat did not return within the reboot window.",
ack_result=None,
)
self._log.info("Reboot confirmed via heartbeat drop + return.")
return RebootAutopilotResponse(
success=True,
status_code=RebootStatusCode.OK_REBOOT_CONFIRMED_NO_ACK,
reason="No ACK received, but reboot confirmed via heartbeat drop + return.",
ack_result=None,
)
# ACK path
if result["ack_received"]:
ack_val = result["result"]
ack_name = _ACK_NAME.get(ack_val, f"UNKNOWN({ack_val})")
if ack_val == mavutil.mavlink.MAV_RESULT_ACCEPTED:
self._log.info("Reboot command accepted by autopilot")
return RebootAutopilotResponse(
success=True,
status_code=RebootStatusCode.OK_ACK_ACCEPTED,
reason="Autopilot acknowledged the reboot command (ACCEPTED).",
ack_result=ack_val,
)
status = _ACK_TO_STATUS.get(ack_val, RebootStatusCode.FAIL_ACK_UNKNOWN)
self._log.warning(f"Reboot command rejected with result: {ack_val} ({ack_name})")
return RebootAutopilotResponse(
success=False,
status_code=status,
reason=f"Autopilot rejected the reboot command: {ack_name}.",
ack_result=ack_val,
)
# Rare: send_and_wait returned without TimeoutError but collector never matched
return RebootAutopilotResponse(
success=False,
status_code=RebootStatusCode.FAIL_NO_ACK_MATCH,
reason="send_and_wait returned but no matching COMMAND_ACK for reboot command was observed.",
ack_result=None,
)
[docs]
async def reboot_autopilot_confirmed(
self,
reboot_onboard_computer: bool = False,
ack_timeout: float = 3.0,
hb_drop_window_s: float = 5.0,
hb_drop_gap_s: float = 1.5,
hb_return_window_s: float = 30.0,
hb_poll_interval_s: float = 0.05,
sitl_mode: bool = False,
) -> RebootAutopilotResponse:
"""
Send a reboot command and confirm success via heartbeat drop + return.
Unlike :meth:`reboot_autopilot`, heartbeat confirmation is the
**primary** verification method, not a fallback. The flow is:
1. Send MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN and wait for COMMAND_ACK.
- If the ACK is a rejection, return immediately with failure.
- If the ACK is accepted *or* times out, proceed to step 2.
2. Wait for heartbeat to **drop** (gap ≥ ``hb_drop_gap_s`` within
``hb_drop_window_s``).
3. Wait for heartbeat to **return** (new heartbeat after the drop
within ``hb_return_window_s``).
4. Return success only when heartbeat returns, confirming the
autopilot has rebooted and is alive again.
Parameters
----------
reboot_onboard_computer : bool
If True, also reboot the onboard computer.
ack_timeout : float
Maximum time (s) to wait for COMMAND_ACK.
hb_drop_window_s : float
Time window (s) to detect heartbeat drop after command.
hb_drop_gap_s : float
Minimum gap (s) without a heartbeat to consider it dropped.
hb_return_window_s : float
Time window (s) to wait for heartbeat to return after drop.
hb_poll_interval_s : float
Polling interval (s) for heartbeat checks.
sitl_mode : bool
If True, treat a DENIED ACK as acceptable instead of failing
immediately. This allows SITL testing where the user manually
kills and restarts the ``px4_sitl`` process to trigger the
heartbeat drop + return cycle.
Returns
-------
RebootAutopilotResponse
"""
if not self.connected:
raise RuntimeError("MAVLink connection not established")
# ── Step 1: Send reboot command and attempt to get ACK ────────────
cmd = self.build_reboot_command(
reboot_autopilot=True,
reboot_onboard_computer=reboot_onboard_computer,
)
result = {"ack_received": False, "result": None}
def _collector(pkt) -> bool:
if pkt.get_type() != "COMMAND_ACK":
return False
if pkt.command == mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN:
result["ack_received"] = True
result["result"] = pkt.result
return True
return False
COMMAND_ACK_ID = str(mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_ACK)
_ACK_TO_STATUS = {
mavutil.mavlink.MAV_RESULT_DENIED: RebootStatusCode.FAIL_ACK_DENIED,
mavutil.mavlink.MAV_RESULT_TEMPORARILY_REJECTED: RebootStatusCode.FAIL_ACK_TEMPORARILY_REJECTED,
mavutil.mavlink.MAV_RESULT_UNSUPPORTED: RebootStatusCode.FAIL_ACK_UNSUPPORTED,
mavutil.mavlink.MAV_RESULT_FAILED: RebootStatusCode.FAIL_ACK_FAILED,
mavutil.mavlink.MAV_RESULT_IN_PROGRESS: RebootStatusCode.FAIL_ACK_IN_PROGRESS,
getattr(mavutil.mavlink, "MAV_RESULT_CANCELLED", -999): RebootStatusCode.FAIL_ACK_CANCELLED,
}
_ACK_NAME = {
mavutil.mavlink.MAV_RESULT_ACCEPTED: "ACCEPTED",
mavutil.mavlink.MAV_RESULT_TEMPORARILY_REJECTED: "TEMPORARILY_REJECTED",
mavutil.mavlink.MAV_RESULT_DENIED: "DENIED",
mavutil.mavlink.MAV_RESULT_UNSUPPORTED: "UNSUPPORTED",
mavutil.mavlink.MAV_RESULT_FAILED: "FAILED",
mavutil.mavlink.MAV_RESULT_IN_PROGRESS: "IN_PROGRESS",
getattr(mavutil.mavlink, "MAV_RESULT_CANCELLED", -999): "CANCELLED",
}
ack_received = False
ack_val = None
try:
await self.send_and_wait(
match_key=COMMAND_ACK_ID,
request_msg=cmd,
collector=_collector,
timeout=ack_timeout,
)
except TimeoutError:
self._log.warning(
"Reboot command sent but no ACK received; proceeding to heartbeat confirmation..."
)
if result["ack_received"]:
ack_received = True
ack_val = result["result"]
ack_name = _ACK_NAME.get(ack_val, f"UNKNOWN({ack_val})")
# If ACK is a rejection, fail immediately — no point waiting for heartbeat
# Exception: in SITL mode, DENIED is expected (simulator cannot reboot)
# so we skip the failure and proceed to heartbeat confirmation,
# allowing the user to manually kill/restart px4_sitl.
if ack_val != mavutil.mavlink.MAV_RESULT_ACCEPTED:
if sitl_mode and ack_val == mavutil.mavlink.MAV_RESULT_DENIED:
self._log.warning(
f"SITL_MODE: Reboot ACK was DENIED (expected for SITL). "
f"Proceeding to heartbeat confirmation — manually kill and "
f"restart px4_sitl within {hb_drop_window_s}s to trigger "
f"heartbeat drop..."
)
else:
status = _ACK_TO_STATUS.get(ack_val, RebootStatusCode.FAIL_ACK_UNKNOWN)
self._log.warning(f"Reboot command rejected with result: {ack_val} ({ack_name})")
return RebootAutopilotResponse(
success=False,
status_code=status,
reason=f"Autopilot rejected the reboot command: {ack_name}.",
ack_result=ack_val,
)
else:
self._log.info("Reboot command accepted by autopilot, proceeding to heartbeat confirmation...")
# ── Step 2: Heartbeat drop confirmation (primary check) ───────────
last_hb = getattr(self, "_last_heartbeat_time", None)
if last_hb is None:
self._log.warning("No heartbeat timestamp available; cannot confirm reboot via heartbeat.")
return RebootAutopilotResponse(
success=False,
status_code=RebootStatusCode.FAIL_NO_HEARTBEAT_TRACKING,
reason="Heartbeat tracking is unavailable (_last_heartbeat_time is None). Cannot confirm reboot.",
ack_result=ack_val,
)
async def _wait_for_heartbeat_drop() -> bool:
deadline = time.time() + hb_drop_window_s
while time.time() < deadline:
last = getattr(self, "_last_heartbeat_time", None)
if last is not None and (time.time() - last) >= hb_drop_gap_s:
return True
await asyncio.sleep(hb_poll_interval_s)
return False
dropped = await _wait_for_heartbeat_drop()
if not dropped:
self._log.warning("Heartbeat did not drop within the expected window; reboot not confirmed.")
return RebootAutopilotResponse(
success=False,
status_code=RebootStatusCode.FAIL_REBOOT_NOT_CONFIRMED_NO_HB_DROP,
reason=(
f"Heartbeat did not drop within {hb_drop_window_s}s window "
f"(gap threshold: {hb_drop_gap_s}s). Reboot not confirmed."
),
ack_result=ack_val,
)
drop_mark = getattr(self, "_last_heartbeat_time", last_hb) or last_hb
self._log.info("Heartbeat drop detected, waiting for heartbeat to return...")
# ── Step 3: Heartbeat return confirmation ─────────────────────────
async def _wait_for_heartbeat_return(since_ts: float) -> bool:
deadline = time.time() + hb_return_window_s
while time.time() < deadline:
last = getattr(self, "_last_heartbeat_time", None)
if last is not None and last > since_ts:
return True
await asyncio.sleep(hb_poll_interval_s)
return False
returned = await _wait_for_heartbeat_return(since_ts=drop_mark)
if not returned:
self._log.warning("Heartbeat dropped but did not return; reboot not confirmed.")
return RebootAutopilotResponse(
success=False,
status_code=RebootStatusCode.FAIL_REBOOT_NOT_CONFIRMED_HB_NO_RETURN,
reason=(
f"Heartbeat drop observed but heartbeat did not return within "
f"{hb_return_window_s}s. Autopilot may still be rebooting."
),
ack_result=ack_val,
)
# ── Step 4: Confirmed ─────────────────────────────────────────────
if ack_received:
self._log.info("Reboot confirmed: ACK accepted + heartbeat drop + return.")
return RebootAutopilotResponse(
success=True,
status_code=RebootStatusCode.OK_ACK_ACCEPTED_HB_CONFIRMED,
reason="Reboot confirmed: COMMAND_ACK accepted and heartbeat drop + return observed.",
ack_result=ack_val,
)
else:
self._log.info("Reboot confirmed via heartbeat drop + return (no ACK received).")
return RebootAutopilotResponse(
success=True,
status_code=RebootStatusCode.OK_HB_CONFIRMED_NO_ACK,
reason="Reboot confirmed via heartbeat drop + return (no COMMAND_ACK received).",
ack_result=None,
)
[docs]
async def get_param(self, name: str, timeout: float = 3.0) -> Dict[str, Any]:
"""
Request a single PARAM_VALUE for `name` and return a dict:
{"name": str, "value": Union[int,float], "raw": float, "type": int, "count": int, "index": int}
Raises TimeoutError if no reply within timeout.
"""
if not self.connected:
raise RuntimeError("MAVLink connection not established")
req = self.build_param_request_read(name, index=-1)
result = {"got": False, "data": None}
def _collector(pkt) -> bool:
# Ensure we only process PARAM_VALUE
if pkt.get_type() != "PARAM_VALUE":
return False
# Use the decoding method for proper INT32 handling
pkt_name, decoded_value = self._decode_param_value(pkt)
if pkt_name != name:
return False
result["got"] = True
result["data"] = {
"name": pkt_name,
"value": decoded_value,
"raw": float(pkt.param_value),
"type": pkt.param_type,
"count": pkt.param_count,
"index": pkt.param_index,
}
return True
# You dispatch by both msg ID string and type; using type keeps it readable.
await self.send_and_wait(
match_key="PARAM_VALUE",
request_msg=req,
collector=_collector,
timeout=timeout,
)
if not result["got"]:
raise TimeoutError(f"No PARAM_VALUE received for {name}")
return result["data"]
[docs]
async def get_all_params(self, timeout: float = 10.0):
"""
Request entire parameter list and return:
{ "<NAME>": {"value": int|float, "raw": float, "type": int, "index": int, "count": int}, ... }
"""
if not self.connected:
raise RuntimeError("MAVLink connection not established")
req = self.build_param_request_list()
params = {}
seen = set()
expected_total = {"val": None}
def _collector(pkt) -> bool:
if pkt.get_type() != "PARAM_VALUE":
return False
if expected_total["val"] is None:
expected_total["val"] = pkt.param_count
# Use the decoding method for proper INT32 handling
name, decoded_value = self._decode_param_value(pkt)
if (name, pkt.param_index) in seen:
# duplicate frame—ignore; can happen with lossy links
pass
else:
seen.add((name, pkt.param_index))
params[name] = {
"value": decoded_value,
"raw": float(pkt.param_value),
"type": pkt.param_type,
"index": pkt.param_index,
"count": pkt.param_count,
}
# Stop when we've collected all expected params
return (expected_total["val"] is not None) and (len(params) >= expected_total["val"])
await self.send_and_wait(
match_key="PARAM_VALUE",
request_msg=req,
collector=_collector,
timeout=timeout,
)
return params
[docs]
async def set_param(self, name: str, value: Any, ptype: Optional[int] = None, timeout: float = 3.0) -> Dict[str, Any]:
"""
Set a parameter and confirm by reading back. `value` can be int or float.
Returns the confirmed PARAM_VALUE dict (same shape as get_param()).
Uses proper INT32 encoding where int32 values are encoded as float32 bits for wire transmission.
>>> ["MAV_PARAM_TYPE"] = {
>>> [1] = "MAV_PARAM_TYPE_UINT8",
>>> [2] = "MAV_PARAM_TYPE_INT8",
>>> [3] = "MAV_PARAM_TYPE_UINT16",
>>> [4] = "MAV_PARAM_TYPE_INT16",
>>> [5] = "MAV_PARAM_TYPE_UINT32",
>>> [6] = "MAV_PARAM_TYPE_INT32",
>>> [7] = "MAV_PARAM_TYPE_UINT64",
>>> [8] = "MAV_PARAM_TYPE_INT64",
>>> [9] = "MAV_PARAM_TYPE_REAL32",
>>> [10] = "MAV_PARAM_TYPE_REAL64",
>>> }
"""
# Pick a MAV_PARAM_TYPE based on Python type (simple heuristic)
if ptype is None:
if isinstance(value, int):
ptype = mavutil.mavlink.MAV_PARAM_TYPE_INT32
elif isinstance(value, float):
ptype = mavutil.mavlink.MAV_PARAM_TYPE_REAL32
else:
self._log.warning(f"Unsupported parameter type for {name}: {type(value)}")
raise ValueError(f"Unsupported parameter type for {name}: {type(value)}")
# Build the PARAM_SET message with proper encoding
req = self.build_param_set(name, value, ptype)
# Send the parameter set command
self.send("mav", req)
# Wait for confirmation by reading back the parameter
try:
return await self.get_param(name, timeout=timeout)
except TimeoutError:
# Fall back to an explicit read if the echo was missed
return await self.get_param(name, timeout=timeout)
[docs]
async def send_and_wait(
self,
*,
match_key: str,
request_msg: mavutil.mavlink.MAVLink_message,
collector: Callable[[mavutil.mavlink.MAVLink_message], bool],
timeout: float = 3.0,
queue_length: Optional[int] = None,
cancel_event: Optional[threading.Event] = None,
) -> None:
"""
Transmit *request_msg*, register a handler on *match_key* and keep feeding
incoming packets to *collector* until it returns **True** or *timeout* expires.
Parameters
----------
match_key :
The key used when the proxy dispatches inbound messages
(numeric ID as string, e.g. `"147"`).
request_msg :
Encoded MAVLink message to send - COMMAND_LONG, LOG_REQUEST_LIST, ...
collector :
Callback that receives each matching packet. Must return **True**
once the desired condition is satisfied; returning **False** keeps
waiting.
timeout :
Maximum seconds to block.
queue_length :
Optional maximum queue length for the handler. If the queue
exceeds this length, older packets will be dropped. If None,
the default queue length is used.
cancel_event :
Optional threading.Event that can be set to cancel the wait.
Raises
------
RuntimeError
If MAVLink connection is not established.
TimeoutError
If no matching response is received within the timeout.
"""
if not self.connected:
raise RuntimeError("MAVLink connection not established")
loop = asyncio.get_running_loop()
done = asyncio.Event()
async def _handler(pkt):
try:
if collector(pkt):
loop.call_soon_threadsafe(done.set)
except Exception as exc:
self._log.error(f"[collector] raised: {exc}")
self.register_handler(
key = match_key,
fn = _handler,
queue_length = queue_length
)
# send the request message after registering the handler
self.send("mav", request_msg)
# create an asyncio task to monitor cancel_event if provided
if cancel_event is not None:
def _cancel_checker():
while not done.is_set():
if cancel_event.is_set():
loop.call_soon_threadsafe(done.set)
break
time.sleep(0.1)
cancel_checker = threading.Thread(target=_cancel_checker, daemon=True, name="_mavlink_send_and_wait_cancel_checker")
cancel_checker.start()
try:
await asyncio.wait_for(done.wait(), timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"No reply/condition for {match_key} in {timeout}s")
finally:
if cancel_event is not None:
cancel_checker.join(timeout=0.1)
self.unregister_handler(match_key, _handler)
[docs]
async def get_log_entries(
self,
*,
msg_id: str,
request_msg: mavutil.mavlink.MAVLink_message,
timeout: float = 8.0,
stale_timeout: float = 2.0,
) -> Dict[int, Dict[str, int]]:
"""
Send LOG_REQUEST_LIST and gather all LOG_ENTRY packets.
Handles two edge cases that PX4 firmware exhibits:
1. **No logs on the vehicle** — PX4 responds with a single
``LOG_ENTRY`` where ``num_logs == 0``. The MAVLink spec
requires this sentinel packet; we detect it and return an
empty dict immediately instead of waiting for the timeout.
2. **Corrupted / missing log entries** — PX4 may advertise
``num_logs = N`` but only deliver fewer than *N*
``LOG_ENTRY`` packets (corrupted slots are silently
skipped). A staleness timer (``stale_timeout``) detects
when no new entry has arrived for a while and returns
whatever was collected so far.
Parameters
----------
msg_id : str
The MAVLink message ID string for LOG_ENTRY (e.g. ``"118"``).
request_msg : mavutil.mavlink.MAVLink_message
The encoded LOG_REQUEST_LIST message to send.
timeout : float
Hard upper-bound on how long to wait (seconds).
stale_timeout : float
If no new LOG_ENTRY arrives within this many seconds
*after* the first one, collection stops early and whatever
entries were gathered are returned. Only applies when
``num_logs > 0`` and fewer than ``num_logs`` entries have
been received. Default 2.0 s.
Returns
-------
Dict[int, Dict[str, int]]
``{log_id: {"size": int, "utc": int}, …}``
"""
entries: Dict[int, Dict[str, int]] = {}
expected_total: Dict[str, Optional[int]] = {"val": None}
last_entry_time: Dict[str, float] = {"t": time.time()}
def _collector(pkt) -> bool:
if expected_total["val"] is None:
expected_total["val"] = pkt.num_logs
# ── No logs on the vehicle ────────────────────────────────
# PX4 sends LOG_ENTRY(id=0, num_logs=0) as a sentinel.
if expected_total["val"] == 0:
self._log.info("Vehicle reports num_logs=0 — no logs on SD card.")
return True # signal done immediately
entries[pkt.id] = {"size": pkt.size, "utc": pkt.time_utc}
last_entry_time["t"] = time.time()
return len(entries) >= expected_total["val"]
# ── Staleness monitor ─────────────────────────────────────────
# If PX4 stops sending LOG_ENTRY packets before we reach the
# advertised num_logs (e.g. corrupted logs), the collector will
# never return True and we'd wait the full timeout. This task
# detects the stall and signals completion early via cancel_event.
cancel_evt = threading.Event()
async def _stale_watchdog():
while not cancel_evt.is_set():
await asyncio.sleep(0.25)
if expected_total["val"] is not None and expected_total["val"] > 0:
elapsed = time.time() - last_entry_time["t"]
if elapsed >= stale_timeout and len(entries) > 0:
got = len(entries)
want = expected_total["val"]
self._log.warning(
f"LOG_ENTRY stream stalled — received {got}/{want} entries "
f"(no new entry for {elapsed:.1f}s). "
f"Returning partial results; {want - got} entries may be corrupted/missing."
)
cancel_evt.set() # unblocks send_and_wait
watchdog_task = asyncio.create_task(_stale_watchdog())
try:
await self.send_and_wait(
match_key=msg_id,
request_msg=request_msg,
collector=_collector,
timeout=timeout,
cancel_event=cancel_evt,
)
except TimeoutError:
# If we collected *some* entries before the hard timeout, return
# them instead of propagating the error.
if entries:
got = len(entries)
want = expected_total.get("val")
self._log.warning(
f"Hard timeout ({timeout}s) reached while collecting LOG_ENTRY packets. "
f"Returning {got}/{want} entries collected so far."
)
else:
raise # genuinely got nothing — let caller handle it
finally:
cancel_evt.set()
watchdog_task.cancel()
try:
await watchdog_task
except asyncio.CancelledError:
pass
return entries
async def _request_chunk(
self,
log_id: int,
ofs: int,
chunk_size: int,
max_retries: int = 3,
timeout: float = 5.0,
) -> mavutil.mavlink.MAVLink_log_data_message:
LOG_DATA_ID = str(mavutil.mavlink.MAVLINK_MSG_ID_LOG_DATA)
# Holder for this specific chunk
holder_lock = threading.Lock()
holder: dict[str, mavutil.mavlink.MAVLink_log_data_message] = {}
def _collector(pkt) -> bool:
# Only accept LOG_DATA for the correct log id
if getattr(pkt, "id", None) != log_id:
return False
# Enforce the expected offset to avoid eating someone else's chunk
if int(pkt.ofs) != ofs:
return False
with holder_lock:
holder["pkt"] = pkt
return True
# Build LOG_REQUEST_DATA for *this* chunk
req = self.master.mav.log_request_data_encode(
self.master.target_system,
self.master.target_component,
log_id,
ofs,
chunk_size,
)
# Try a few times per chunk
attempt = 0
while True:
try:
await self.send_and_wait(
match_key=LOG_DATA_ID,
request_msg=req,
collector=_collector,
timeout=timeout,
)
break # got the chunk
except TimeoutError:
attempt += 1
if attempt >= max_retries:
raise TimeoutError(
f"Timeout while waiting for LOG_DATA "
f"log_id={log_id} ofs={ofs} after {max_retries} attempts"
)
self._log.warning(
f"Timeout for LOG_DATA (log_id={log_id}, ofs={ofs}), "
f"retry {attempt}/{max_retries}"
)
# re-send the same request and loop again
return holder["pkt"]
def _request_log_sync(
self,
*,
log_id: int,
completed_event: threading.Event,
timeout: float = 60.0,
size_bytes: Optional[int] = None,
cancel_event: threading.Event | None = None,
callback: Optional[Callable[[int], None]] = None,
end_of_buffer_timeout: float = 3.0,
) -> bytes:
"""
Synchronous helper: request LOG_DATA for a given log_id and return raw bytes.
Assembles data by offset so out-of-order / duplicate packets are tolerated.
"""
self._log.info(f"Attempting to download Log ID: {log_id}")
# 1) Send LOG_REQUEST_DATA (from offset 0, full log)
self.master.mav.log_request_data_send(
self.master.target_system,
self.master.target_component,
log_id,
0, # offset
0xFFFFFFFF # count: all data from offset
)
self._log.info(f"Requested data for Log ID {log_id}, waiting for data...")
# Chunks keyed by offset
received_chunks: dict[int, bytes] = {}
max_end = 0 # highest offset+length we've seen
total_unique_bytes = 0 # bytes from non-duplicate chunks
completed_event.clear()
start_time = time.time()
last_data_time = start_time
with self._mav_lock:
while True:
now = time.time()
# Global timeout
if timeout is not None and (now - start_time) > timeout:
self._log.error("Timeout while downloading log data (global timeout).")
raise TimeoutError("Log data download timed out.")
# End-of-buffer timeout (only after we've actually received something)
if (
end_of_buffer_timeout is not None
and total_unique_bytes > 0
and (now - last_data_time) > end_of_buffer_timeout
):
self._log.info(
f"No new data for {end_of_buffer_timeout:.1f}s, "
"assuming end of log."
)
break
# Cancellation support
if cancel_event is not None and cancel_event.is_set():
self._log.warning("Log download cancelled.")
break
# Use a small timeout so the loop can check the conditions above
msg = self.master.recv_match(
type="LOG_DATA",
blocking=True,
timeout=1.0,
)
if msg is None:
# No message in this 1-second interval; loop again
continue
if msg.id != log_id:
# Not our log; ignore
continue
ofs = int(msg.ofs)
count = int(msg.count)
data_bytes = bytes(msg.data[:count])
end = ofs + count
# Duplicate chunk (same starting offset): ignore
if ofs in received_chunks:
self._log.debug(
f"Duplicate LOG_DATA chunk at ofs={ofs}, count={count}, ignoring."
)
# We *could* update last_data_time here since link is still active,
# but usually you care about new data only.
continue
# New chunk
received_chunks[ofs] = data_bytes
total_unique_bytes += len(data_bytes)
last_data_time = time.time()
if end > max_end:
max_end = end
# Progress callback (sync)
if callback is not None:
try:
callback(total_unique_bytes)
except Exception as e:
self._log.warning(f"Callback raised exception: {e!r}")
# If we know the size, we can stop once we have at least that range
if size_bytes is not None and max_end >= size_bytes:
self._log.info(
f"Reached expected size ({size_bytes} bytes) for log {log_id}."
)
break
# --- Assemble the final buffer ---
if size_bytes is not None:
total_size = size_bytes
else:
total_size = max_end # best guess from largest end offset
if total_size <= 0:
self._log.warning(
f"No LOG_DATA received for log {log_id}, returning empty bytes."
)
completed_event.set()
return b""
result = bytearray(total_size)
sorted_ofs = sorted(received_chunks.keys())
current = 0
for ofs in sorted_ofs:
chunk = received_chunks[ofs]
end = ofs + len(chunk)
# Detect gaps between chunks
if ofs > current:
self._log.warning(
f"Gap detected in log {log_id}: {current} -> {ofs} "
"(missing bytes in this range)."
)
# Skip chunks fully beyond the expected size
if ofs >= total_size:
self._log.warning(
f"Chunk at ofs={ofs} (len={len(chunk)}) is beyond total_size={total_size}, skipping."
)
continue
# Trim chunk if it extends past total_size
if end > total_size:
chunk = chunk[: total_size - ofs]
end = total_size
result[ofs:end] = chunk
current = max(current, end)
self._log.info(
f"Total unique bytes received for log {log_id}: {total_unique_bytes}, "
f"assembled length={len(result)}"
)
completed_event.set()
return bytes(result)
[docs]
async def download_log(
self,
*,
log_id: int,
completed_event: threading.Event,
timeout: float = 60.0,
buffer: Optional[bytearray] = None,
callback: Optional[Callable[[int], Awaitable[None]]] = None,
end_of_buffer_timeout: float = 10.0,
size_bytes: Optional[int] = None
) -> bytes:
"""
Download one log file via LOG_REQUEST_DATA / LOG_DATA.
Parameters
----------
log_id :
The LOG_ENTRY.id of the log to download.
completed_event :
threading.Event that will be set when download completes.
timeout :
Total time allowed for the whole transfer.
buffer :
Optional bytearray to use as the download buffer.
callback :
Optional async function called as callback(received_bytes)
after each LOG_DATA packet is processed.
end_of_buffer_timeout :
Timeout in seconds to wait for new data before aborting.
size_bytes :
Optional total size of the log in bytes., if known.
Returns
-------
bytes
Raw log bytes (ULog/Dataflash).
Raises
------
RuntimeError
If MAVLink connection is not established.
TimeoutError
If no complete log is received within the timeout.
"""
if not self.connected:
raise RuntimeError("MAVLink connection not established")
cancel_event = threading.Event()
# If you want to support an *async* callback, we wrap it so the sync
loop = asyncio.get_running_loop()
def sync_callback(received_bytes: int):
if callback is not None:
# Schedule the async callback safely from another thread
loop.call_soon_threadsafe(
asyncio.create_task,
callback(received_bytes),
)
sync_cb = sync_callback if callback is not None else None
try:
with self._download_lock:
log_bytes: bytes = await asyncio.to_thread(
self._request_log_sync,
log_id=log_id,
completed_event=completed_event,
timeout=timeout,
size_bytes=size_bytes,
cancel_event=cancel_event,
callback=sync_cb,
end_of_buffer_timeout=end_of_buffer_timeout,
)
except TimeoutError as exc:
self._log.error(f"Failed to download log {log_id}: {exc}")
raise
except Exception as exc:
self._log.error(f"Error downloading log {log_id}: {exc}")
raise
# process all messages in the buffer
if buffer is None:
buffer = bytearray()
# sort messages and data by offset
sorted_msgs = sorted(zip(msgs_ofs, msgs_data), key=lambda x: x[0])
msgs_ofs, msgs_data = zip(*sorted_msgs) if sorted_msgs else ([], [])
# verify the integrity and reconstruct the log
last_chunk_details = {"ofs": -1, "count": -1}
bytes_received = 0
for ofs, data in zip(msgs_ofs, msgs_data):
count = len(data)
if count < 0:
count = 0
# make sure the ofs is as expected (increasing in increments of count)
if last_chunk_details["ofs"] != -1:
expected_ofs = last_chunk_details["ofs"] + last_chunk_details["count"]
else:
expected_ofs = 0
if ofs != expected_ofs:
self._log.warning(
f"Log {log_id} chunk out of order or missing: "
f"expected ofs={expected_ofs}, "
f"got ofs={ofs}"
)
# rerequest the missing chunk(s)
with self._download_lock:
try:
pkt_expected = await self._request_chunk(
log_id=log_id,
ofs=expected_ofs,
chunk_size=90,
max_retries=3,
timeout=timeout
)
ofs = int(pkt_expected.ofs)
count = int(pkt_expected.count)
if count < 0:
count = 0
data = bytes(pkt_expected.data[:count])
except TimeoutError as exc:
self._log.error(f"Failed to download log {log_id} at ofs={expected_ofs}: {exc}")
raise
except Exception as exc:
self._log.error(f"Error downloading log {log_id} at ofs={expected_ofs}: {exc}")
raise
# Append this chunk into the buffer
needed_len = ofs + count
if len(buffer) < needed_len:
buffer.extend(b"\x00" * (needed_len - len(buffer)))
buffer[ofs:ofs + count] = data
bytes_received += count
# Progress callback (in bytes)
if callback is not None:
try:
await callback(bytes_received, "processing")
except Exception as exc:
self._log.warning(f"download_log callback raised: {exc}")
last_chunk_details["ofs"] = ofs
last_chunk_details["count"] = count
completed_event.set()
return bytes(buffer)
[docs]
async def download_log_buffered(
self,
*,
log_id: int,
completed_event: threading.Event,
size_bytes: Optional[int] = None,
timeout: float = 3.0,
chunk_size: int = 90,
max_retries: int = 3,
cancel_event: threading.Event | None = None,
buffer: Optional[bytearray] = None,
callback: Optional[Callable[[int], Awaitable[None]]] = None
) -> bytes:
"""
Download one log file via repeated LOG_REQUEST_DATA / LOG_DATA exchanges.
Strategy:
* For ofs = 0, chunk_size, 2*chunk_size, ...
* send LOG_REQUEST_DATA(log_id, ofs, chunk_size)
* wait for LOG_DATA(log_id, ofs, ...)
* append data[:count]
* stop when:
* count == 0 (end-of-log by spec), or
* ofs + count >= size_bytes (if known)
Parameters
----------
log_id :
LOG_ENTRY.id of the log to download.
completed_event :
threading.Event set when download is fully complete.
size_bytes :
Optional total size of the log from LOG_ENTRY.size; if given, used as
termination condition and sanity cap.
timeout :
Timeout per chunk request (seconds).
chunk_size :
Requested count per chunk. For MAVLink v1, LOG_DATA carries 90 bytes.
max_retries :
Number of retries per chunk on timeout.
cancel_event :
Optional threading.Event to cancel the download.
buffer :
Optional bytearray; if None, a new one is created.
callback :
Optional sync or async callback called as callback(total_bytes_received)
after each successful chunk.
Returns
-------
bytes
Raw log data.
"""
if not self.connected:
raise RuntimeError("MAVLink connection not established")
if buffer is None:
buffer = bytearray()
LOG_DATA_ID = str(mavutil.mavlink.MAVLINK_MSG_ID_LOG_DATA)
ofs = 0
while True:
if cancel_event and cancel_event.is_set():
# Let caller decide what to do; could also raise here
self._log.info(f"Download of log {log_id} cancelled at ofs={ofs}")
break
# If we know the size and we've already covered it, stop
if size_bytes is not None and ofs >= size_bytes:
break
try:
pkt = await self._request_chunk(
log_id=log_id,
ofs=ofs,
chunk_size=chunk_size,
max_retries=max_retries,
timeout=timeout
)
except TimeoutError as exc:
self._log.error(f"Failed to download log {log_id} at ofs={ofs}: {exc}")
raise
count = int(pkt.count)
if count < 0:
count = 0
# Termination condition from spec: count == 0 => end-of-log
if count == 0:
break
# Append this chunk into the buffer
needed_len = ofs + count
if len(buffer) < needed_len:
buffer.extend(b"\x00" * (needed_len - len(buffer)))
buffer[ofs:ofs + count] = bytes(pkt.data[:count])
# Progress callback (in bytes)
if callback is not None:
try:
await callback(len(buffer))
except Exception as exc:
self._log.warning(f"download_log callback raised: {exc}")
ofs += count
# If we know the total size and we've just covered it, stop
if size_bytes is not None and ofs >= size_bytes:
completed_event.set()
break
# If size_bytes is unknown and we received less than chunk_size,
# it's probably the last chunk, so we can stop.
if size_bytes is None and count < chunk_size:
completed_event.set()
break
return bytes(buffer)
[docs]
async def set_params_bulk_lossy(
self,
params_to_set: Dict[str, ParamSpec],
*,
timeout_total: float = 8.0,
max_retries: int = 3,
max_in_flight: int = 8,
resend_interval: float = 0.8,
inter_send_delay: float = 0.01,
verify_ack_value: bool = True,
) -> Dict[str, Dict[str, Any]]:
"""
Lossy-link bulk PARAM_SET:
- User can provide optional type per param as "UINT8", "INT16", "REAL32", etc.
- If type omitted -> auto: int -> INT32, float -> REAL32
- Windowed sends + periodic resend + retry cap
- Confirms via echoed PARAM_VALUE
Returns: confirmed {name: meta_dict}
"""
if not self.connected:
raise RuntimeError("MAVLink connection not established")
loop = asyncio.get_running_loop()
# Normalize input into: desired[name] = (value, ptype_int_or_None)
desired: Dict[str, Tuple[Any, Optional[int]]] = {}
for k, spec in params_to_set.items():
name = self._norm_name(k.encode("ascii"))
if isinstance(spec, tuple) and len(spec) == 2:
value, ptype = spec
desired[name] = (value, _parse_param_type(ptype))
elif isinstance(spec, dict):
if "value" not in spec:
raise ValueError(f"Param '{k}' dict spec must include 'value'")
value = spec["value"]
ptype = spec.get("type", None)
desired[name] = (value, _parse_param_type(ptype))
else:
desired[name] = (spec, None)
def infer_type(value: Any) -> int:
if isinstance(value, int):
return mavutil.mavlink.MAV_PARAM_TYPE_INT32
if isinstance(value, float):
return mavutil.mavlink.MAV_PARAM_TYPE_REAL32
raise ValueError(f"Unsupported value type: {type(value)} (expected int or float)")
def values_match(want: Any, got: Any) -> bool:
if isinstance(want, int):
try:
return int(got) == int(want)
except Exception:
return False
try:
return abs(float(got) - float(want)) <= 1e-4
except Exception:
return False
pending = set(desired.keys())
attempts: Dict[str, int] = {n: 0 for n in pending}
last_sent: Dict[str, float] = {n: 0.0 for n in pending}
confirmed: Dict[str, Dict[str, Any]] = {}
queue = list(desired.keys())
in_flight = 0
done_evt = asyncio.Event()
async def _send_set(name: str) -> None:
nonlocal in_flight
value, ptype_opt = desired[name]
ptype = ptype_opt if ptype_opt is not None else infer_type(value)
msg = self.build_param_set(name, value, ptype)
self.send("mav", msg)
attempts[name] += 1
last_sent[name] = time.monotonic()
in_flight += 1
if inter_send_delay > 0:
await asyncio.sleep(inter_send_delay)
async def _fill_window_from_queue() -> None:
nonlocal in_flight
while in_flight < max_in_flight and queue:
n = queue.pop(0)
if n in pending and attempts[n] < max_retries:
await _send_set(n)
async def _resender_loop() -> None:
nonlocal in_flight
while not done_evt.is_set():
now = time.monotonic()
# initial sends
await _fill_window_from_queue()
# resend timed-out pendings
if in_flight < max_in_flight:
for n in list(pending):
if attempts[n] >= max_retries:
continue
if (now - last_sent[n]) >= resend_interval and in_flight < max_in_flight:
await _send_set(n)
await asyncio.sleep(0.05)
async def _handler(pkt):
try:
if pkt.get_type() != "PARAM_VALUE":
return
pname, decoded_value = self._decode_param_value(pkt)
if pname not in pending:
return
def _apply():
nonlocal in_flight
if pname not in pending:
return
want_value, _want_ptype = desired[pname]
if verify_ack_value and not values_match(want_value, decoded_value):
# not confirmed; free one slot and let retry logic resend
in_flight = max(0, in_flight - 1)
return
confirmed[pname] = {
"name": pname,
"value": decoded_value,
"raw": float(pkt.param_value),
"type": pkt.param_type,
"count": pkt.param_count,
"index": pkt.param_index,
}
pending.remove(pname)
in_flight = max(0, in_flight - 1)
if not pending:
done_evt.set()
loop.call_soon_threadsafe(_apply)
except Exception as exc:
self._log.error(f"[bulk_set handler] raised: {exc}")
self.register_handler(key="PARAM_VALUE", fn=_handler)
try:
# Start sending + resending
await _fill_window_from_queue()
resender_task = asyncio.create_task(_resender_loop())
try:
await asyncio.wait_for(done_evt.wait(), timeout=timeout_total)
except asyncio.TimeoutError:
pass
finally:
resender_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await resender_task
return confirmed
finally:
self.unregister_handler("PARAM_VALUE", _handler)
[docs]
async def get_params_bulk_lossy(
self,
names: Iterable[str],
*,
timeout_total: float = 6.0,
max_retries: int = 3,
max_in_flight: int = 10,
resend_interval: float = 0.7,
inter_send_delay: float = 0.01,
) -> Dict[str, Dict[str, Any]]:
"""
Lossy-link bulk GET using PARAM_REQUEST_READ by name.
Strategy:
- Register ONE PARAM_VALUE handler.
- Send read requests in a window (max_in_flight).
- Periodically resend still-pending names (resend_interval) up to max_retries.
- Stop when all received or timeout_total.
Returns:
{ name: {"name","value","raw","type","count","index"}, ... }
(Partial results if timeout_total hits.)
"""
if not self.connected:
raise RuntimeError("MAVLink connection not established")
loop = asyncio.get_running_loop()
# Normalize requested names (same normalization used by _decode_param_value)
names_list = [self._norm_name(n.encode("ascii")) for n in names]
pending = set(names_list)
attempts: Dict[str, int] = {n: 0 for n in names_list}
last_sent: Dict[str, float] = {n: 0.0 for n in names_list}
results: Dict[str, Dict[str, Any]] = {}
done_evt = asyncio.Event()
queue = list(names_list)
in_flight = 0
async def _send_read(name: str) -> None:
nonlocal in_flight
msg = self.build_param_request_read(name, index=-1)
self.send("mav", msg)
attempts[name] += 1
last_sent[name] = time.monotonic()
in_flight += 1
if inter_send_delay > 0:
await asyncio.sleep(inter_send_delay)
async def _fill_window_from_queue() -> None:
nonlocal in_flight
while in_flight < max_in_flight and queue:
n = queue.pop(0)
if n in pending and attempts[n] < max_retries:
await _send_read(n)
async def _resender_loop() -> None:
nonlocal in_flight
while not done_evt.is_set():
now = time.monotonic()
# initial sends
await _fill_window_from_queue()
# resend timed-out pending items
if in_flight < max_in_flight:
for n in list(pending):
if attempts[n] >= max_retries:
continue
if (now - last_sent[n]) >= resend_interval and in_flight < max_in_flight:
await _send_read(n)
await asyncio.sleep(0.05)
async def _handler(pkt):
try:
if pkt.get_type() != "PARAM_VALUE":
return
pname, decoded_value = self._decode_param_value(pkt)
if pname not in pending:
return
def _apply():
nonlocal in_flight
if pname not in pending:
return
results[pname] = {
"name": pname,
"value": decoded_value,
"raw": float(pkt.param_value),
"type": pkt.param_type,
"count": pkt.param_count,
"index": pkt.param_index,
}
pending.remove(pname)
# Free one in-flight slot (treat this response as completing one request)
in_flight = max(0, in_flight - 1)
if not pending:
done_evt.set()
loop.call_soon_threadsafe(_apply)
except Exception as exc:
self._log.error(f"[bulk_get handler] raised: {exc}")
self.register_handler(key="PARAM_VALUE", fn=_handler)
try:
# Start sending + resending
await _fill_window_from_queue()
resender_task = asyncio.create_task(_resender_loop())
try:
await asyncio.wait_for(done_evt.wait(), timeout=timeout_total)
except asyncio.TimeoutError:
# return partial results
pass
finally:
resender_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await resender_task
return results
finally:
self.unregister_handler("PARAM_VALUE", _handler)
[docs]
class MavLinkFTPProxy(BaseProxy):
"""
Threaded MAVLink FTP driver using `pymavlink`.
"""
def __init__(
self,
mavlink_proxy: MavLinkExternalProxy,
):
self._log = logging.getLogger("MavLinkFTPProxy")
self._loop: asyncio.AbstractEventLoop | None = None
self._exe = ThreadPoolExecutor(max_workers=1, thread_name_prefix="MavLinkFTPProxyWorker")
self.mavlink_proxy: MavLinkExternalProxy = mavlink_proxy
# ------------------------ life-cycle --------------------- #
[docs]
async def start(self):
"""Open the MAVLink connection then launch the worker thread."""
self._loop = asyncio.get_running_loop()
# Start the worker thread first
await super().start()
# # Initialize parser if connection was successful
# if self.mavlink_proxy.master:
# await self._init_parser()
[docs]
async def stop(self):
"""Stop the worker and close the link."""
await asyncio.sleep(0.1) # Ensure any pending writes are flushed
# ------------------- I/O primitives --------------------- #
async def _init_parser(self):
"""Initialize the blocking parser."""
def create_parser():
return _BlockingParser(
self._log,
self.mavlink_proxy.master,
self.mavlink_proxy,
0
)
self._parser = await self._loop.run_in_executor(
self._exe,
create_parser
)
# ------------------- deletion helper with fallback ------------- #
async def _delete_file_with_fallback(self, path: str) -> bool:
"""Delete a file on the vehicle, trying shell rm first then MAVFTP.
On real NuttX hardware the shell ``rm`` command works and bypasses
PX4's MAVFTP ``FileProtected`` restriction. In SITL builds the
NuttX shell is not available (``Invalid command: rm``), so we
fall back to MAVFTP ``cmd_rm`` which works there.
The caller is responsible for stopping the PX4 logger before
invoking this method.
Returns ``True`` on success, ``False`` on failure.
"""
# 1) Try the NuttX shell rm (works on real hardware)
ok = await self.mavlink_proxy.delete_file_via_shell(path)
if ok:
return True
# 2) Fall back to MAVFTP cmd_rm (works on SITL)
self._log.info(f"Shell rm failed for {path}, falling back to MAVFTP delete")
if not hasattr(self, '_parser') or self._parser is None:
await self._init_parser()
def _ftp_delete():
try:
self._parser._delete(path)
return True
except Exception as exc:
self._log.warning(f"MAVFTP delete also failed for {path}: {exc}")
return False
return await self._loop.run_in_executor(self._exe, _ftp_delete)
# ------------------- exposing blocking parser methods --------- #
[docs]
async def list_ulogs(self, base: str = None, connection_timeout: float = 3.0) -> List[ULogInfo]:
"""Return metadata for every .ulg file on the vehicle."""
# Check connection and attempt to establish if needed
if not self.mavlink_proxy.master or not self.mavlink_proxy.connected:
self._log.warning("FTP connection not established, attempting to connect...")
t_start = time.time()
while True:
await asyncio.sleep(1.0) # brief wait before re-checking
if self.mavlink_proxy.master and self.mavlink_proxy.connected:
break
if time.time() - t_start > connection_timeout:
self._log.error("Timeout waiting for MAVLink FTP connection")
raise RuntimeError("MAVLink FTP connection could not be established")
if base is None:
base = self.mavlink_proxy.root_sd_path
# Initialize parser if not already done (e.g., after reconnection)
if not hasattr(self, '_parser') or self._parser is None:
await self._init_parser()
# Try to get log entries from the vehicle, but handle timeout gracefully
entries = {}
try:
msg_id = str(mavutil.mavlink.MAVLINK_MSG_ID_LOG_ENTRY)
msg = self.mavlink_proxy.build_req_msg_log_request(message_id=msg_id)
entries = await self.mavlink_proxy.get_log_entries(
msg_id=msg_id,
request_msg=msg,
timeout=5.0
)
except (TimeoutError, RuntimeError) as e:
self._log.warning(f"Failed to get log entries from vehicle: {e}")
self._log.info("Attempting to list files directly via FTP without log entries...")
entries = {}
# Attempt to list files via FTP
try:
raw = await self._loop.run_in_executor(self._exe, self._parser.list_ulogs, entries, base)
return [ULogInfo(**item) for item in raw]
except Exception as e:
self._log.warning(f"Failed to list files via FTP: {e}")
return []
[docs]
async def download_ulog(
self,
remote_path: str,
local_path: Path,
completed_event: threading.Event,
on_progress: ProgressCB | None = None,
cancel_event: threading.Event | None = None,
connection_timeout: float = 3.0,
n_attempts: int = 3,
) -> Path:
"""
Fetch *remote_path* from the vehicle into *local_path*.
Returns the Path actually written on success or None if cancelled.
"""
# Check connection and attempt to establish if needed
last_exception = None
for attempt in range(n_attempts):
if not self.mavlink_proxy.master or not self.mavlink_proxy.connected:
self._log.warning("FTP connection not established, attempting to connect...")
t_start = time.time()
while True:
await asyncio.sleep(1.0) # brief wait before re-checking
if self.mavlink_proxy.master and self.mavlink_proxy.connected:
break
if time.time() - t_start > connection_timeout:
self._log.error("Timeout waiting for MAVLink FTP connection")
raise RuntimeError("MAVLink FTP connection could not be established")
# Initialize parser if not already done (e.g., after reconnection)
if not hasattr(self, '_parser') or self._parser is None:
await self._init_parser()
try:
result = await self._loop.run_in_executor(
self._exe,
self._parser.download_ulog,
remote_path,
local_path,
on_progress,
cancel_event
)
if result:
completed_event.set()
return local_path
else:
raise RuntimeError("Download was cancelled or failed without exception")
except Exception as e:
self._log.error(f"Failed to download ulog via FTP on attempt {attempt + 1}/{n_attempts}: {e}")
last_exception = e
if last_exception is not None:
self._log.error(f"All {n_attempts} attempts to download ulog failed.")
raise last_exception
[docs]
async def delete_file(self, remote_path: str, connection_timeout: float = 3.0, stop_logger: bool = True) -> None:
"""
Delete a single file at *remote_path* on the vehicle.
Uses the NuttX shell ``rm`` command via MAVLink SERIAL_CONTROL,
which bypasses PX4's MAVFTP ``FileProtected`` restriction.
Parameters
----------
remote_path : str
The path on the vehicle to delete.
connection_timeout : float
Maximum time to wait for MAVLink connection.
stop_logger : bool
If True, stop PX4 logging before deletion and restart it
afterwards. Pass ``False`` when the caller has already
stopped the logger (e.g. bulk-delete loop).
"""
if not self.mavlink_proxy.master or not self.mavlink_proxy.connected:
self._log.warning("MAVLink connection not established, attempting to connect...")
t_start = time.time()
while True:
await asyncio.sleep(1.0)
if self.mavlink_proxy.master and self.mavlink_proxy.connected:
break
if time.time() - t_start > connection_timeout:
self._log.error("Timeout waiting for MAVLink connection")
raise RuntimeError("MAVLink connection could not be established")
if stop_logger:
await self.mavlink_proxy.stop_px4_logging()
try:
ok = await self._delete_file_with_fallback(remote_path)
if not ok:
raise RuntimeError(f"Delete failed for {remote_path} (shell rm and MAVFTP both failed)")
finally:
if stop_logger:
await self.mavlink_proxy.start_px4_logging()
[docs]
async def delete_all_logs(
self,
base: str = None,
connection_timeout: float = 3.0,
progress_callback: "Callable[[int, int], Any] | None" = None,
) -> Dict[str, Any]:
"""
Recursively delete every ``.ulg`` file under *base* on the vehicle.
Uses MAVFTP only for *listing* directories, then deletes each file
via the NuttX shell ``rm`` command (MAVLink SERIAL_CONTROL). This
bypasses PX4's MAVFTP ``FileProtected`` (code 9) restriction which
blocks ``cmd_rm`` on all log files.
Stops the PX4 logger before deletion and restarts it afterwards.
Args:
base: Root directory to scan for .ulg files.
connection_timeout: Seconds to wait for a MAVLink connection.
progress_callback: Optional ``callback(current_index, total)``
invoked after each file deletion attempt. May be a
regular function or an ``async`` coroutine.
Returns a summary dict with deleted/failed counts.
"""
if not self.mavlink_proxy.master or not self.mavlink_proxy.connected:
self._log.warning("MAVLink connection not established, attempting to connect...")
t_start = time.time()
while True:
await asyncio.sleep(1.0)
if self.mavlink_proxy.master and self.mavlink_proxy.connected:
break
if time.time() - t_start > connection_timeout:
self._log.error("Timeout waiting for MAVLink connection")
raise RuntimeError("MAVLink connection could not be established")
if base is None:
base = self.mavlink_proxy.root_sd_path
if not hasattr(self, '_parser') or self._parser is None:
await self._init_parser()
# ── Discover .ulg files AND date directories in one pass ──
def _scan(base_dir):
"""Return (file_list, date_dir_list)."""
files = []
dirs = []
try:
base_entries = self._parser._ls(base_dir, retries=2)
except Exception as exc:
# Empty directory → MAVFTP returns code 73 (timeout).
# Treat as "nothing to delete".
self._log.info(
f"Base directory listing failed ({exc}) — "
f"assuming empty, nothing to delete"
)
return files, dirs
for date, _, is_dir in base_entries:
if not is_dir:
continue
sub = f"{base_dir}/{date}"
dirs.append(sub)
try:
entries = self._parser._ls(sub, retries=1)
except Exception as exc:
self._log.warning(f"Skipping {sub}: {exc}")
continue
for name, size, child_is_dir in entries:
if not child_is_dir and name.endswith(".ulg"):
files.append((f"{sub}/{name}", size))
return files, dirs
ulog_files, date_dirs = await self._loop.run_in_executor(
self._exe, lambda: _scan(base)
)
self._log.info(
f"Found {len(ulog_files)} ULog file(s) under {base} — "
f"deleting via shell rm"
)
# Stop PX4 logger before deletion to release file handles
await self.mavlink_proxy.stop_px4_logging()
try:
results: List[Dict[str, Any]] = []
deleted = 0
failed = 0
total = len(ulog_files)
for idx, (remote_path, size) in enumerate(ulog_files, 1):
ok = await self._delete_file_with_fallback(remote_path)
if ok:
results.append({
"path": remote_path,
"size_bytes": size,
"status": "deleted",
})
deleted += 1
else:
results.append({
"path": remote_path,
"size_bytes": size,
"status": "failed",
"error": "Delete failed (shell rm and MAVFTP)",
})
failed += 1
# Report per-file progress
if progress_callback is not None:
try:
ret = progress_callback(idx, total)
if asyncio.iscoroutine(ret):
await ret
except Exception:
pass # never let a callback error abort deletion
# ── Clean up ALL discovered date directories ─────────
# Remove every date dir we found during listing, not
# just those that had deleted files. This ensures dirs
# left empty by a previous run are also removed.
for d in sorted(date_dirs):
try:
ok = await self.mavlink_proxy.delete_file_via_shell(
d, command="rmdir"
)
if ok:
self._log.info(f"Removed empty directory {d}")
else:
self._log.debug(f"rmdir {d} skipped (may not be empty)")
except Exception as exc:
self._log.debug(f"rmdir {d} failed: {exc}")
summary = {
"total": deleted + failed,
"deleted": deleted,
"failed": failed,
"files": results,
}
self._log.info(
f"Delete all logs complete: {deleted} deleted, "
f"{failed} failed out of {deleted + failed} total"
)
return summary
finally:
# Always restart the logger, even if deletion failed
await self.mavlink_proxy.start_px4_logging()
[docs]
async def detect_error_logs(self, remote_path: str, connection_timeout: float = 3.0) -> Dict[str, Any]:
"""
Detect ``fail_*.log`` error log files under *remote_path* on the vehicle.
Lists files via MAVFTP without deleting anything.
Returns a summary dict ``{total, files}`` where *files* is a list of
``{path, size_bytes}`` dicts.
"""
# Check connection and attempt to establish if needed
if not self.mavlink_proxy.master or not self.mavlink_proxy.connected:
self._log.warning("FTP connection not established, attempting to connect...")
t_start = time.time()
while True:
await asyncio.sleep(1.0)
if self.mavlink_proxy.master and self.mavlink_proxy.connected:
break
if time.time() - t_start > connection_timeout:
self._log.error("Timeout waiting for MAVLink FTP connection")
raise RuntimeError("MAVLink FTP connection could not be established")
# Initialize parser if not already done
if not hasattr(self, '_parser') or self._parser is None:
await self._init_parser()
# List fail_*.log files via MAVFTP
def _list_fail_logs():
try:
entries = self._parser._ls(remote_path, retries=2)
return [
(f"{remote_path}/{name}", size)
for name, size, is_dir in entries
if not is_dir and name.startswith("fail_") and name.endswith(".log")
]
except Exception as exc:
self._log.warning(f"Failed to list error logs in {remote_path}: {exc}")
return []
fail_logs = await self._loop.run_in_executor(self._exe, _list_fail_logs)
files_info = [
{"path": path, "size_bytes": size}
for path, size in fail_logs
]
self._log.info(
f"Detected {len(files_info)} fail_*.log file(s) under {remote_path}"
)
return {"total": len(files_info), "files": files_info}
[docs]
async def clear_error_logs(self, remote_path: str, connection_timeout: float = 3.0) -> Dict[str, Any]:
"""
Clear ``fail_*.log`` error logs under *remote_path* from the vehicle.
Uses MAVFTP only for *listing*, then deletes each file via the
NuttX shell ``rm`` command (same approach as ``delete_all_logs``).
Returns a summary dict ``{total, deleted, failed}``.
"""
# Check connection and attempt to establish if needed
if not self.mavlink_proxy.master or not self.mavlink_proxy.connected:
self._log.warning("FTP connection not established, attempting to connect...")
t_start = time.time()
while True:
await asyncio.sleep(1.0)
if self.mavlink_proxy.master and self.mavlink_proxy.connected:
break
if time.time() - t_start > connection_timeout:
self._log.error("Timeout waiting for MAVLink FTP connection")
raise RuntimeError("MAVLink FTP connection could not be established")
# Initialize parser if not already done
if not hasattr(self, '_parser') or self._parser is None:
await self._init_parser()
# List fail_*.log files via MAVFTP
def _list_fail_logs():
try:
entries = self._parser._ls(remote_path, retries=2)
return [
(f"{remote_path}/{name}", size)
for name, size, is_dir in entries
if not is_dir and name.startswith("fail_") and name.endswith(".log")
]
except Exception as exc:
self._log.warning(f"Failed to list error logs in {remote_path}: {exc}")
return []
fail_logs = await self._loop.run_in_executor(self._exe, _list_fail_logs)
if not fail_logs:
self._log.info(f"No fail_*.log files found under {remote_path}")
return {"total": 0, "deleted": 0, "failed": 0}
self._log.info(f"Found {len(fail_logs)} fail_*.log file(s) under {remote_path}")
deleted = 0
failed = 0
# Stop logger, delete via shell rm (with MAVFTP fallback), restart logger
await self.mavlink_proxy.stop_px4_logging()
try:
for path, _size in fail_logs:
try:
ok = await self._delete_file_with_fallback(path)
if ok:
self._log.info(f"Deleted error log {path}")
deleted += 1
else:
self._log.warning(f"Delete failed for {path} (shell rm and MAVFTP)")
failed += 1
except Exception as exc:
self._log.warning(f"Error deleting {path}: {exc}")
failed += 1
finally:
await self.mavlink_proxy.start_px4_logging()
self._log.info(
f"Clear error logs complete: {deleted} deleted, "
f"{failed} failed out of {deleted + failed} total"
)
return {"total": deleted + failed, "deleted": deleted, "failed": failed}
[docs]
async def list_directory(self, base: str = None, connection_timeout: float = 3.0) -> List[str]:
"""
List all files and directories under *base* on the vehicle.
"""
# Check connection and attempt to establish if needed
if not self.mavlink_proxy.master or not self.mavlink_proxy.connected:
self._log.warning("FTP connection not established, attempting to connect...")
t_start = time.time()
while True:
await asyncio.sleep(1.0) # brief wait before re-checking
if self.mavlink_proxy.master and self.mavlink_proxy.connected:
break
if time.time() - t_start > connection_timeout:
self._log.error("Timeout waiting for MAVLink FTP connection")
raise RuntimeError("MAVLink FTP connection could not be established")
if base is None:
base = self.mavlink_proxy.root_sd_path
# Initialize parser if not already done (e.g., after reconnection)
if not hasattr(self, '_parser') or self._parser is None:
await self._init_parser()
try:
listing = await self._loop.run_in_executor(self._exe, self._parser.list_directory, base)
return listing
except Exception as e:
self._log.warning(f"Failed to list directory via FTP: {e}")
return []
# --------------------------------------------------------------------------- #
# helper functions #
# --------------------------------------------------------------------------- #
def _match_ls_to_entries(
ls_list: List[Tuple[str, int]],
entry_dict: Dict[int, Dict[str, int]],
threshold_size: int = 4096,
) -> Dict[str, Tuple[int, int]]:
files = sorted([(n, s) for n, s in ls_list], key=lambda x: x[1], reverse=True)
entries = sorted(entry_dict.items())
if len(files) != len(entries):
raise ValueError("ls and entry counts differ; can't match safely")
mapping = {}
for log_id, info in entries:
for i, (name, sz) in enumerate(files):
if abs(sz - info['size']) <= threshold_size:
files.pop(i)
mapping[log_id] = (name, sz, info['utc'])
break
return mapping
class _BlockingParser:
"""
Thin wrapper around pymavlink / MAVFTP - runs in a dedicated thread.
All methods are synchronous and blocking; the proxy wraps them in
run_in_executor so the event-loop stays responsive.
"""
# ---------- life-cycle -------------------------------------------------- #
def __init__(
self,
logger: logging.Logger,
master: mavutil.mavserial,
mavlink_proxy: MavLinkExternalProxy,
debug: int = 0
):
self._log = logger.getChild("BlockingParser")
self.master = master
self.proxy = mavlink_proxy
self.root_sd_path = self.proxy.root_sd_path
# try three times to init MAVFTP
try:
for _ in range(3):
try:
if self.master is None or not self.proxy.connected:
raise RuntimeError("MAVLink master not initialized MAVFTP proxy failed")
with self.proxy._mav_lock:
self.ftp = mavftp.MAVFTP(
self.master, self.master.target_system, self.master.target_component
)
break
except Exception as e:
self._log.warning(f"MAVFTP init attempt failed: {e}")
time.sleep(1)
else:
raise RuntimeError("MAVFTP init failed after 3 attempts")
self._log.info("MAVFTP initialized successfully")
self.ftp.ftp_settings.debug = debug
self.ftp.ftp_settings.retry_time = 0.2 # 200 ms instead of 1 s
self.ftp.ftp_settings.burst_read_size = 239
self.ftp.burst_size = 239
except Exception as e:
self._log.error(f"Failed to initialize MAVFTP: {e}")
@property
def system_id(self): # convenience for log message in proxy.start()
return self.master.target_system
def close(self):
self.master.close()
# ---------- public helpers (blocking) ----------------------------------- #
# 1) list_ulogs ---------------------------------------------------------- #
def list_ulogs(self, entries: Dict[int, Dict[str, int]], base:str) -> List[ULogInfo]:
"""
Enumerate *.ulg under the SD-card and return a list of dicts
that can be fed directly into ULogInfo(**dict).
"""
ulog_files = list(self._walk_ulogs(base))
if not ulog_files:
return []
# If we have log entries from the vehicle, try to match them with files
if entries:
try:
mapping = _match_ls_to_entries(ulog_files, entries)
# sort the mapping by utc descending
mapping = sorted(
mapping.values(),
key=lambda x: x[2], # sort by utc (index 2)
reverse=True
)
result = []
for i, (name, size, utc) in enumerate(mapping):
result.append(
dict(index=i, remote_path=name, size_bytes=size, utc=utc)
)
return result
except ValueError as e:
self._log.warning(f"Failed to match files with log entries: {e}")
# Fall through to basic file listing
# If no entries or matching failed, return basic file info without UTC timestamps
self._log.info("Returning basic file listing without log entry metadata")
result = []
for i, (name, size) in enumerate(ulog_files):
result.append(
dict(index=i, remote_path=name, size_bytes=size, utc=0) # UTC=0 when unknown
)
return result
# 2) download_ulog ------------------------------------------------------- #
def download_ulog(
self,
remote_path: str,
local_path: Path,
on_progress: ProgressCB | None = None,
cancel_event: threading.Event | None = None,
):
"""Blocking download with retry + tmp-file recovery with cancellation support."""
# ------------------------------------------------------------------ #
def _progress_cb(frac: float | None):
if frac is None or on_progress is None:
return
# Check for cancellation
if cancel_event and cancel_event.is_set():
# Use our custom exception to signal cancellation
raise DownloadCancelledException("Download cancelled by user")
asyncio.run_coroutine_threadsafe(
on_progress(frac),
loop=self.proxy._loop
)
# ------------------------------------------------------------------ #
try:
self._log.info("Downloading %s → %s", remote_path, local_path)
local_path.parent.mkdir(parents=True, exist_ok=True)
with self.proxy._mav_lock:
with self.proxy._download_lock:
ret = self.ftp.cmd_get(
[remote_path, str(local_path.absolute())],
progress_callback=lambda x: _progress_cb(x)
)
if ret.return_code != 0:
self._log.error(f"OpenFileRO failed: download failed with code {ret.return_code}")
self._reset_ftp_state()
return None
# Check for cancellation before processing reply
if cancel_event and cancel_event.is_set():
self._reset_ftp_state()
if local_path.exists():
local_path.unlink()
return None
# Process the reply with a try-except to handle potential issues
try:
result = self.ftp.process_ftp_reply(ret.operation_name, timeout=0)
if result.return_code != 0:
self._log.error(f"OpenFileRO Download failed with code {result.return_code}")
self._reset_ftp_state()
if local_path.exists():
local_path.unlink()
return None
except DownloadCancelledException:
# Handle cancellation gracefully
self._log.info("Download cancelled by user")
self._reset_ftp_state()
if local_path.exists():
local_path.unlink()
return None
except (OSError, socket.error) as e:
self._log.error(f"FTP error during download: {str(e)}")
return None
except Exception as e:
self._log.error(f"Error processing FTP reply: {str(e)}")
self._reset_ftp_state()
return None
if not local_path.exists():
# handle temp-file move failure
tmp = Path(self.ftp.temp_filename)
if tmp.exists():
shutil.move(tmp, local_path)
self._log.warning("Temp file recovered to %s", local_path)
self._reset_ftp_state() # for next download
if not local_path.exists():
self._log.error("Failed to recover temp file to %s", local_path)
return None
self._log.info("Saved %s (%.1f KiB)",
local_path.name, local_path.stat().st_size / 1024)
return str(local_path)
except DownloadCancelledException:
# Handle cancellation gracefully at the outer level too
self._log.info("Download cancelled by user")
with self.proxy._mav_lock:
self._reset_ftp_state()
if local_path.exists():
local_path.unlink()
raise
except (OSError, socket.error) as e:
# Handle connection errors (including "Bad file descriptor")
self._log.error(f"Download error: {str(e)}")
with self.proxy._mav_lock:
with self.proxy._download_lock:
self._reset_ftp_state()
# Clean up partial file
if local_path.exists():
local_path.unlink()
# Re-raise the original exception
raise
except RuntimeError as e:
self._log.error(f"Download error: {str(e)}")
# Always reset FTP state on error
with self.proxy._mav_lock:
with self.proxy._download_lock:
self._reset_ftp_state()
# Clean up partial file
if local_path.exists():
local_path.unlink()
# Re-raise the original exception
raise
except Exception as e:
self._log.error(f"Download error: {str(e)}")
# Always reset FTP state on error
with self.proxy._mav_lock:
with self.proxy._download_lock:
self._reset_ftp_state()
# Clean up partial file
if local_path.exists():
local_path.unlink()
# Re-raise the original exception
raise
# 3) clear error logs ---------------------------------------------------- #
def clear_error_logs(self, base: str = "fs/microsd") -> None:
fail_logs = self._list_fail_logs(base)
for log in fail_logs:
try:
self._log.info(f"Deleting error log {log.remote_path}")
# Check if connection is still valid before attempting operation
if not self.proxy.master or not self.proxy.connected:
self._log.warning(f"Connection lost, skipping delete for {log.remote_path}")
return
self._delete(log.remote_path)
time.sleep(0.1) # Give some time for the delete operation to complete
except (OSError, socket.error) as e:
# Handle connection errors gracefully
if e.errno in [errno.EBADF, errno.ECONNRESET, errno.ECONNREFUSED, errno.EPIPE]:
self._log.warning(f"Connection lost during delete operation: {e}")
else:
self._log.error(f"Unexpected error deleting log {log.remote_path}: {e}")
except Exception as e:
self._log.error(f"Error deleting log {log.remote_path}: {e}")
self._log.info("Cleared all error logs")
# 4) delete_file --------------------------------------------------------- #
def delete_file(self, path: str) -> None:
"""
Delete a single file at *path* on the vehicle via MAVFTP.
Wraps the private ``_delete`` helper with a public interface.
"""
self._log.info(f"Deleting file: {path}")
if not self.proxy.master or not self.proxy.connected:
raise RuntimeError("MAVLink connection not available for delete operation")
self._delete(path)
# 5) delete_all_logs ----------------------------------------------------- #
def delete_all_logs(self, base: str = "fs/microsd/log") -> Dict[str, Any]:
"""
Recursively walk *base* (same logic as ``_walk_ulogs``) and delete
every ``.ulg`` file found. Does NOT require log entries from the
vehicle – purely filesystem-based.
.. note::
The caller (``MavLinkFTPProxy.delete_all_logs``) is responsible
for stopping PX4 logging **before** invoking this method.
Without that, PX4 returns ``FileProtected`` (FTP code 9) for
every file because the logger holds open file handles.
Returns a summary dict with ``deleted``, ``failed``, and ``total``
counts plus a list of per-file results.
"""
self._log.info(f"Deleting all ULog files under {base}")
if not self.proxy.master or not self.proxy.connected:
raise RuntimeError("MAVLink connection not available for delete operation")
# Reset FTP state before starting the delete loop to ensure a clean session
with self.proxy._mav_lock:
with self.proxy._download_lock:
self._reset_ftp_state()
results: List[Dict[str, Any]] = []
deleted = 0
failed = 0
for remote_path, size in self._walk_ulogs(base):
try:
self._delete(remote_path)
results.append({"path": remote_path, "size_bytes": size, "status": "deleted"})
deleted += 1
time.sleep(0.1) # brief pause between deletes
except FTPDeleteError as e:
self._log.error(f"Failed to delete {remote_path}: {e}")
results.append({
"path": remote_path,
"size_bytes": size,
"status": "failed",
"error": str(e),
"ftp_code": e.ftp_code,
})
failed += 1
except Exception as e:
self._log.error(f"Failed to delete {remote_path}: {e}")
results.append({"path": remote_path, "size_bytes": size, "status": "failed", "error": str(e)})
failed += 1
summary = {
"total": deleted + failed,
"deleted": deleted,
"failed": failed,
"files": results,
}
self._log.info(f"Delete all logs complete: {deleted} deleted, {failed} failed out of {deleted + failed} total")
return summary
# 6) ls a directory ------------------------------------------------------ #
def list_directory(self, base: str = "fs/microsd") -> List[Dict[str, Any]]:
"""List the contents of a directory on the vehicle's filesystem."""
try:
self._log.info(f"Listing directory: {base}")
# Check if connection is still valid before attempting operation
if not self.proxy.master or not self.proxy.connected:
self._log.warning(f"Connection lost, skipping ls for {base}")
return []
return self._ls(base)
except Exception as e:
self._log.error(f"Error listing directory {base}: {e}")
return []
# ---------- internal helpers ------------------------------------------- #
def _reset_ftp_state(self):
"""Reset all FTP state to handle canceled transfers properly."""
self._log.warning("Resetting FTP state")
try:
# First try to terminate the current session
self.ftp._MAVFTP__terminate_session()
self.ftp.process_ftp_reply("TerminateSession")
except Exception as e:
self._log.warning(f"Error terminating session: {e}")
try:
# Then reset all sessions for good measure
op = mavftp.OP_ResetSessions
self.ftp._MAVFTP__send(FTP_OP(self.ftp.seq, self.ftp.session, op, 0, 0, 0, 0, None))
self.ftp.process_ftp_reply("ResetSessions")
except Exception as e:
self._log.warning(f"Error resetting sessions: {e}")
# Reset internal dictionaries that could cause issues
self.ftp.active_read_sessions = {}
# These are the problematic data structures that cause the KeyError
if hasattr(self.ftp, 'read_gap_times'):
self.ftp.read_gap_times = {}
if hasattr(self.ftp, 'read_gaps'):
# This should be a list, not a dict
self.ftp.read_gaps = []
# Reset session counter and tracking
if hasattr(self.ftp, 'next_read_session'):
self.ftp.next_read_session = 1
if hasattr(self.ftp, 'session'):
self.ftp.session = 0
if hasattr(self.ftp, 'seq'):
self.ftp.seq = 0
# Reset other stateful variables
if hasattr(self.ftp, 'read_total'):
self.ftp.read_total = 0
if hasattr(self.ftp, 'read_offset'):
self.ftp.read_offset = 0
if hasattr(self.ftp, 'remote_file_size'):
self.ftp.remote_file_size = 0
if hasattr(self.ftp, 'burst_state'):
self.ftp.burst_state = 0
def _walk_ulogs(self, base="fs/microsd/log") -> Generator[Tuple[str, int], None, None]:
dates = self._ls(base)
for date, _, is_dir in dates:
if not is_dir:
continue
try:
entries = self._ls(f"{base}/{date}", retries=1)
except Exception as exc:
self._log.warning(
f"Skipping {base}/{date}: listing failed ({exc})"
)
continue
for name, size, is_dir in entries:
if not is_dir and name.endswith(".ulg"):
yield f"{base}/{date}/{name}", size
# plain MAVFTP ls
def _ls(self, path: str, retries=5, delay=2.0):
for n in range(1, retries + 1):
try:
# Check if connection and master are valid before attempting operation
if not self.master or not self.proxy.connected:
self._log.warning(f"Connection not available, skipping ls for {path} (attempt {n}/{retries})")
if n >= retries:
return [] # Return empty list if all retries exhausted
time.sleep(delay)
continue
# Additional check: verify the file descriptor is still valid
try:
# Test if the socket is still open by checking its fileno
if hasattr(self.master, 'port') and hasattr(self.master.port, 'fileno'):
fd = self.master.port.fileno()
if fd < 0:
raise OSError("Invalid file descriptor")
except (OSError, AttributeError):
self._log.warning(f"File descriptor invalid, marking connection as lost (attempt {n}/{retries})")
with self.proxy._mav_lock:
with self.proxy._download_lock:
self._reset_ftp_state()
if n >= retries:
return []
time.sleep(delay)
continue
with self.proxy._mav_lock:
with self.proxy._download_lock:
# Double-check connection inside the lock
if not self.master or not self.proxy.connected:
self._log.warning(f"Connection lost during lock acquisition for {path}")
if n >= retries:
return []
continue
ack = self.ftp.cmd_list([path])
if ack.return_code == 0:
return list(set((e.name, e.size_b, e.is_dir) for e in self.ftp.list_result))
else:
# FTP command failed - check if it's a retryable error
if ack.return_code == 1:
# Error code 1 typically means "file/directory not found" or "permission denied"
# This is not a connection issue, so don't retry
self._log.warning(f"FTP ls failed: path '{path}' not found or not accessible (return code {ack.return_code})")
return [] # Return empty list instead of raising error
else:
# Other error codes might be retryable
self._log.warning(f"FTP ls command failed with return code {ack.return_code} (attempt {n}/{retries})")
if n >= retries:
raise RuntimeError(f"ls('{path}') failed after {retries} attempts: FTP return code {ack.return_code}")
except (OSError, socket.error) as e:
# Handle connection errors gracefully
if e.errno in [errno.EBADF, errno.ECONNRESET, errno.ECONNREFUSED, errno.EPIPE]:
self._log.warning(f"Connection lost during ls operation")
if n >= retries:
raise RuntimeError(f"ls('{path}') failed after {retries} attempts due to connection loss")
else:
self._log.error(f"Unexpected socket error during ls: {e}")
raise
except Exception as e:
self._log.error(f"Error during ls operation (attempt {n}/{retries}): {e}")
if n >= retries:
raise RuntimeError(f"ls('{path}') failed after {retries} attempts: {e}")
# If we reach here, the operation failed but we can retry
if n < retries:
self._log.info(f"Retrying ls operation for {path} in {delay}s (attempt {n+1}/{retries})")
time.sleep(delay)
raise RuntimeError(f"ls('{path}') failed {retries}×")
def _list_fail_logs(self, base: str = "fs/microsd") -> List[ULogInfo]:
"""
List all fail_*.log files under the given *base* directory. without walking
"""
try:
entries = self._ls(base)
fail_logs = [
ULogInfo(index=i, remote_path=f"{base}/{name}", size_bytes=size, utc=0)
for i, (name, size, is_dir) in enumerate(entries)
if not is_dir and name.startswith("fail_") and name.endswith(".log")
]
return fail_logs
except RuntimeError as e:
self._log.error(f"Failed to list fail logs in {base}: {e}")
return []
def _delete(self, path: str, retries=2, delay=2.0):
"""
Delete a file or directory at *path* using MAVFTP.
Retries on failure up to *retries* times with *delay* seconds between attempts.
"""
for n in range(1, retries + 1):
try:
# Check if connection and master are valid before attempting operation
if not self.master or not self.proxy.connected:
self._log.warning(f"Connection not available, skipping delete for {path} (attempt {n}/{retries})")
if n >= retries:
return # Give up after all retries
time.sleep(delay)
continue
# Additional check: verify the file descriptor is still valid
try:
if hasattr(self.master, 'port') and hasattr(self.master.port, 'fileno'):
fd = self.master.port.fileno()
if fd < 0:
raise OSError("Invalid file descriptor")
except (OSError, AttributeError):
self._log.warning(f"File descriptor invalid for delete, marking connection as lost (attempt {n}/{retries})")
with self.proxy._mav_lock:
with self.proxy._download_lock:
self._reset_ftp_state()
if n >= retries:
return
time.sleep(delay)
continue
with self.proxy._mav_lock:
with self.proxy._download_lock:
# Double-check connection inside the lock
if not self.master or not self.proxy.connected:
self._log.warning(f"Connection lost during lock acquisition for delete {path}")
if n >= retries:
return
continue
ack = self.ftp.cmd_rm([path])
if ack.return_code == 0:
self._log.info(f"Successfully deleted {path}")
return
else:
# Translate FTP error codes to human-readable messages
_FTP_ERR_NAMES = {
1: "Fail", 2: "FailErrno", 3: "InvalidDataSize",
4: "InvalidSession", 5: "NoSessionsAvailable",
6: "EndOfFile", 7: "UnknownCommand",
8: "FileExists", 9: "FileProtected",
10: "FileNotFound", 73: "RemoteReplyTimeout",
}
err_name = _FTP_ERR_NAMES.get(ack.return_code, f"Unknown({ack.return_code})")
detail = (
f"FTP delete failed: {err_name} (code {ack.return_code}) "
f"(attempt {n}/{retries})"
)
if ack.return_code == 9: # FileProtected
detail += (
" — PX4 logger likely has the file locked. "
"Ensure 'logger stop' is sent before deleting."
)
self._log.warning(detail)
if n >= retries:
raise FTPDeleteError(
path=path,
ftp_code=ack.return_code,
message=(
f"delete('{path}') failed after {retries} attempts: "
f"{err_name} (FTP code {ack.return_code})"
),
)
except (OSError, socket.error) as e:
# Handle connection errors gracefully
if e.errno in [errno.EBADF, errno.ECONNRESET, errno.ECONNREFUSED, errno.EPIPE]:
self._log.warning(f"Connection lost during delete operation (attempt {n}/{retries}): {e}")
if n >= retries:
raise RuntimeError(f"delete('{path}') failed after {retries} attempts due to connection loss")
else:
self._log.error(f"Unexpected socket error during delete: {e}")
raise
except FTPDeleteError:
raise # propagate structured FTP errors as-is
except Exception as e:
self._log.error(f"Error during delete operation (attempt {n}/{retries}): {e}")
if n >= retries:
raise RuntimeError(f"delete('{path}') failed after {retries} attempts: {e}")
# If we reach here, the operation failed but we can retry
if n < retries:
self._log.info(f"Retrying delete operation for {path} in {delay}s (attempt {n+1}/{retries})")
time.sleep(delay)