Proxies

Base Proxy

class petal_app_manager.proxies.base.BaseProxy[source]

Bases: ABC

Abstract base for every proxy.

Subclasses must implement start() and stop().

The optional is_ready / wait_until_ready() pair lets petals wait until a proxy has finished its startup handshake (e.g. MQTT connected, MAVLink heartbeat received) before calling its public API.

property is_ready: bool

Return True once the proxy is fully usable.

The default implementation returns True as soon as start() has been called. Subclasses may override this to gate on additional conditions (e.g. connection established, heartbeat received).

abstract async start()[source]
abstract async stop()[source]
async wait_until_ready(timeout: float = 30.0) bool[source]

Block (async) until is_ready or timeout seconds elapse.

Returns True if the proxy became ready, False on timeout.

Redis Proxy

RedisProxy

Simple Redis proxy for basic pub/sub messaging and key-value operations. Supports Unix socket connections.

class petal_app_manager.proxies.redis.RedisProxy(host: str = 'localhost', port: int = 6379, db: int = 0, password: str | None = None, debug: bool = False, unix_socket_path: str | None = None)[source]

Bases: BaseProxy

Simple Redis proxy for pub/sub messaging and key-value operations. Supports Unix socket connections.

async delete(key: str) int[source]

Delete a key from Redis.

async exists(key: str) bool[source]

Check if a key exists in Redis.

async get(key: str) str | None[source]

Get a value from Redis.

async list_online_applications() List[str][source]

List online applications by checking Redis keys for app registrations.

async publish(channel: str, message: str) int[source]

Publish a message to a channel (async, non-blocking).

register_pattern_channel_callback(channel: str, callback: Callable[[str, str], Awaitable[None]], cpu_heavy: bool = False)[source]

Register a callback for a specific channel for pattern subscriptions.

The callback must be an async function (coroutine). Sync callbacks are not supported and will raise TypeError.

Parameters:
  • channel – The channel to register the callback for

  • callback – Async callback function that receives (channel, data)

  • cpu_heavy – If True, the callback’s CPU-bound work will be offloaded to a thread-pool executor managed by the proxy.

async scan_keys(pattern: str, count: int = 100) List[str][source]

Scan Redis keys matching a pattern.

Parameters:
  • pattern – Key pattern to match (e.g., job:*)

  • count – Number of keys to return per scan iteration

Returns:

List of matching keys

async set(key: str, value: str, ex: int | None = None) bool[source]

Set a value in Redis.

async start()[source]

Initialize the connection to Redis.

async stop()[source]

Close the Redis connection and clean up resources.

subscribe(channel: str, callback: Callable[[str, str], Awaitable[None]], cpu_heavy: bool = False)[source]

Subscribe to a channel with an async callback function.

Parameters:
  • channel – The Redis channel to subscribe to

  • callback – Async callback function that receives (channel, data)

  • cpu_heavy – If True, the callback’s CPU-bound work will be offloaded to a thread-pool executor managed by the proxy.

Raises:

TypeError – If callback is not an async function

unregister_pattern_channel_callback(channel: str)[source]

Unregister a callback for a specific channel for pattern subscriptions.

unsubscribe(channel: str)[source]

Unsubscribe from a channel.

External MAVLink Proxy

petalappmanager.proxies.external

Thread-based proxies for long-running I/O back-ends (MAVLink, ROS 1, …).

Key changes vs. the first draft:

  • All per-key buffers are now collections.deque with maxlen. New data silently overwrites the oldest entry → bounded memory.

  • Public API (send, register_handler) is unchanged for petals.

  • Docstrings preserved / expanded for clarity.

exception petal_app_manager.proxies.external.DownloadCancelledException[source]

Bases: Exception

Raised when a download is cancelled by the user.

class petal_app_manager.proxies.external.ExternalProxy(maxlen: int, sleep_time_ms: float = 1.0)[source]

Bases: BaseProxy

Base class for I/O drivers that must poll or listen continuously.

A dedicated thread calls _io_read_once() / _io_write_once() in a tight loop while the FastAPI event-loop thread stays unblocked.

  • Send buffers - self._send[key] (deque, newest → right side) Outbound messages are enqueued here via send(). The I/O thread drains these buffers by calling _io_write_once() with all pending messages.

  • Handlers - self._handlers[key] Callbacks registered via register_handler() are stored here. When new messages arrive via _io_read_once(), they are processed directly in the I/O recv thread, which schedules async handlers on the event loop.

register_handler(key: str, fn: Callable[[Any], Awaitable[None]], duplicate_filter_interval: float | None = None, queue_length: int | None = None, cpu_heavy: bool = False) None[source]

Attach an async callback fn so it fires for every message appended to _recv[key].

The callback is scheduled on the main event loop for thread-safe execution.

Parameters:
  • key (str) – The key to register the handler for.

  • fn (Callable[[Any], Awaitable[None]]) – The async handler function to call for each message.

  • duplicate_filter_interval (Optional[float]) – If specified, duplicate messages received within this interval (in seconds) will be filtered out and the handler will not be called. None disables filtering.

  • queue_length (Optional[int]) – If specified, sets the maximum length of the message buffer for key. New messages overwrite the oldest when the buffer is full. If None, the default maxlen from proxy initialization is used.

  • cpu_heavy (bool) – If True, the handler’s CPU-bound work will be offloaded to a thread-pool executor managed by the proxy, preventing event-loop starvation. The handler must still be async def, but it may call await loop.run_in_executor(...) internally—or simply set this flag and the proxy will wrap the entire handler invocation in run_in_executor automatically.

Raises:

TypeError – If fn is not an async function (coroutine).

send(key: str, msg: Any, burst_count: int | None = None, burst_interval: float | None = None) None[source]

Enqueue msg for transmission. The newest message is kept if the buffer is already full.

Parameters:
  • key (str) – The key to send the message on.

  • msg (Any) – The message to send.

  • burst_count (Optional[int]) – If specified, send the message this many times in a burst.

  • burst_interval (Optional[float]) – If burst_count is specified, wait this many seconds between each message. If None, all messages are sent immediately.

async start() None[source]

Create the I/O threads and begin polling/writing.

async stop() None[source]

Ask the I/O threads to exit and join them (best-effort, 5 s timeout).

unregister_handler(key: str, fn: Callable[[Any], Awaitable[None]]) None[source]

Remove the callback fn from the broadcast list attached to key.

If fn was not registered, the call is silently ignored. When the last callback for key is removed, the key itself is pruned to keep the dict size small.

exception petal_app_manager.proxies.external.FTPDeleteError(path: str, ftp_code: int, message: str)[source]

Bases: Exception

Raised when a MAVFTP delete operation fails with a known FTP error code.

class petal_app_manager.proxies.external.MavLinkExternalProxy(endpoint: str, baud: int, source_system_id: int, source_component_id: int, maxlen: int, mavlink_worker_sleep_ms: float = 1.0, mavlink_heartbeat_send_frequency: float = 5.0, root_sd_path: str = 'fs/microsd/log')[source]

Bases: ExternalProxy

Threaded MAVLink driver using pymavlink.

Buffers used

  • send["mav"] - outbound MAVLink_message

  • recv["mav"] - any inbound message

  • recv[str(msg.get_msgId())] - by numeric ID

  • recv[msg.get_type()] - by string type

build_format_storage_command(storage_id: int = 0) MAVLink_command_long_message[source]

Build a MAVLink command to format the SD card (MAV_CMD_PREFLIGHT_STORAGE).

Parameters:

storage_id (int) – Storage device ID (0 = primary SD card). Default is 0.

Returns:

The MAVLink COMMAND_LONG message for format storage.

Return type:

mavutil.mavlink.MAVLink_command_long_message

Raises:

RuntimeError – If MAVLink connection is not established.

build_motor_value_command(motor_idx: int, motor_value: float, timeout: float) MAVLink_command_long_message[source]

Build MAV_CMD_ACTUATOR_TEST command for a motor.

build_param_request_list()[source]

Build MAVLink PARAM_REQUEST_LIST to fetch the full table.

build_param_request_read(name: str, index: int = -1)[source]

Build MAVLink PARAM_REQUEST_READ for a named or indexed parameter. If index == -1, the ‘name’ is used; otherwise PX4 will ignore name.

build_param_set(name: str, value: Any, param_type: int)[source]

Build MAVLink PARAM_SET for setting a parameter. Handles INT32 encoding where int32 values are encoded as float32 bits for wire transmission.

build_reboot_command(reboot_autopilot: bool = True, reboot_onboard_computer: bool = False) MAVLink_command_long_message[source]

Build a MAVLink command to reboot the autopilot and/or onboard computer.

Parameters:
  • reboot_autopilot (bool) – If True, reboot the autopilot (PX4/ArduPilot). Default is True.

  • reboot_onboard_computer (bool) – If True, reboot the onboard computer. Default is False.

Returns:

The MAVLink COMMAND_LONG message for reboot.

Return type:

mavutil.mavlink.MAVLink_command_long_message

Raises:

RuntimeError – If MAVLink connection is not established.

build_req_msg_log_data(log_id: int, ofs: int = 0, count: int = 4294967295) MAVLink_log_request_data_message[source]

Build LOG_REQUEST_DATA for a given log.

Parameters:
  • log_id (int) – The log id from LOG_ENTRY.id

  • ofs (int) – Offset into the log (usually 0 for first request)

  • count (int) – Number of bytes requested. For PX4/ArduPilot it’s common to use 0xFFFFFFFF to say “send the whole log”.

build_req_msg_log_request(message_id: int) MAVLink_log_request_list_message[source]

Build a MAVLink command to request a specific log message.

Parameters:

message_id (int) – The numeric ID of the log message to request.

Returns:

The MAVLink command message to request the specified log.

Return type:

mavutil.mavlink.MAVLink_log_request_list_message

Raises:

RuntimeError – If MAVLink connection is not established.

build_req_msg_long(message_id: int) MAVLink_command_long_message[source]

Build a MAVLink command to request a specific message type.

Parameters:

message_id (int) – The numeric ID of the MAVLink message to request.

Returns:

The MAVLink command message to request the specified message.

Return type:

mavutil.mavlink.MAVLink_command_long_message

Raises:

RuntimeError – If MAVLink connection is not established.

build_request_message_command() MAVLink_command_long_message[source]

Build a MAVLink command to request a specific message once using MAV_CMD_REQUEST_MESSAGE (common.xml).

Returns:

The MAVLink COMMAND_LONG message requesting the given message.

Return type:

mavutil.mavlink.MAVLink_command_long_message

Raises:

RuntimeError – If MAVLink connection is not established.

build_shell_serial_control_msgs(text: str, device: int = 10, respond: bool = True, exclusive: bool = True) list[MAVLink_serial_control_message][source]

Build SERIAL_CONTROL messages that write text to the PX4 MAVLink shell. Splits into <=70 byte chunks (MAVLink SERIAL_CONTROL data field).

Parameters:
  • text (str) – The text to send to the MAVLink shell.

  • device (int) – The device number to use (default 10 for PX4 mavlink_shell).

  • respond (bool) – If True, set the RESPOND flag (default True).

  • exclusive (bool) – If True, set the EXCLUSIVE flag (default True).

async delete_file_via_shell(remote_path: str, timeout: float = 5.0, command: str = 'rm') bool[source]

Delete a file (or empty directory) on PX4 using a NuttX shell command.

This is a fallback for when MAVFTP cmd_rm fails with FileProtected (code 9). The shell rm bypasses the MAVFTP server’s protection checks and deletes the file directly via NuttX unlink().

Parameters:
  • remote_path (str) – Full path on the vehicle, e.g. "fs/microsd/log/2025-09-01/07_50_39.ulg". The path must start without a leading /.

  • timeout (float) – Maximum seconds to wait for PX4 shell acknowledgment.

  • command (str) – NuttX shell command to use. "rm" for files, "rmdir" for empty directories.

Returns:

True if the command completed (no error detected in the reply text), False otherwise.

Return type:

bool

async download_log(*, log_id: int, completed_event: Event, timeout: float = 60.0, buffer: bytearray | None = None, callback: Callable[[int], Awaitable[None]] | None = None, end_of_buffer_timeout: float = 10.0, size_bytes: int | None = None) bytes[source]

Download one log file via LOG_REQUEST_DATA / LOG_DATA.

Parameters:
  • log_id – The LOG_ENTRY.id of the log to download.

  • completed_event – threading.Event that will be set when download completes.

  • timeout – Total time allowed for the whole transfer.

  • buffer – Optional bytearray to use as the download buffer.

  • callback – Optional async function called as callback(received_bytes) after each LOG_DATA packet is processed.

  • end_of_buffer_timeout – Timeout in seconds to wait for new data before aborting.

  • size_bytes – Optional total size of the log in bytes., if known.

Returns:

Raw log bytes (ULog/Dataflash).

Return type:

bytes

Raises:
  • RuntimeError – If MAVLink connection is not established.

  • TimeoutError – If no complete log is received within the timeout.

async download_log_buffered(*, log_id: int, completed_event: Event, size_bytes: int | None = None, timeout: float = 3.0, chunk_size: int = 90, max_retries: int = 3, cancel_event: Event | None = None, buffer: bytearray | None = None, callback: Callable[[int], Awaitable[None]] | None = None) bytes[source]

Download one log file via repeated LOG_REQUEST_DATA / LOG_DATA exchanges.

Strategy:

  • For ofs = 0, chunk_size, 2*chunk_size, …

    • send LOG_REQUEST_DATA(log_id, ofs, chunk_size)

    • wait for LOG_DATA(log_id, ofs, …)

    • append data[:count]

    • stop when:

      • count == 0 (end-of-log by spec), or

      • ofs + count >= size_bytes (if known)

Parameters:
  • log_id – LOG_ENTRY.id of the log to download.

  • completed_event – threading.Event set when download is fully complete.

  • size_bytes – Optional total size of the log from LOG_ENTRY.size; if given, used as termination condition and sanity cap.

  • timeout – Timeout per chunk request (seconds).

  • chunk_size – Requested count per chunk. For MAVLink v1, LOG_DATA carries 90 bytes.

  • max_retries – Number of retries per chunk on timeout.

  • cancel_event – Optional threading.Event to cancel the download.

  • buffer – Optional bytearray; if None, a new one is created.

  • callback – Optional sync or async callback called as callback(total_bytes_received) after each successful chunk.

Returns:

Raw log data.

Return type:

bytes

async format_sd_card(storage_id: int = 0, timeout: float = 10.0) FormatStorageResponse[source]

Send a format storage command to the autopilot and wait for ACK.

Parameters:
  • storage_id (int) – Storage device ID (0 = primary SD card). Default is 0.

  • timeout (float) – Maximum time to wait for acknowledgment. Default is 10.0 seconds (formatting can take a few seconds).

Returns:

Structured response indicating success/failure and reason.

Return type:

FormatStorageResponse

async get_all_params(timeout: float = 10.0)[source]

Request entire parameter list and return: { “<NAME>”: {“value”: int|float, “raw”: float, “type”: int, “index”: int, “count”: int}, … }

async get_log_entries(*, msg_id: str, request_msg: MAVLink_message, timeout: float = 8.0, stale_timeout: float = 2.0) Dict[int, Dict[str, int]][source]

Send LOG_REQUEST_LIST and gather all LOG_ENTRY packets.

Handles two edge cases that PX4 firmware exhibits:

  1. No logs on the vehicle — PX4 responds with a single LOG_ENTRY where num_logs == 0. The MAVLink spec requires this sentinel packet; we detect it and return an empty dict immediately instead of waiting for the timeout.

  2. Corrupted / missing log entries — PX4 may advertise num_logs = N but only deliver fewer than N LOG_ENTRY packets (corrupted slots are silently skipped). A staleness timer (stale_timeout) detects when no new entry has arrived for a while and returns whatever was collected so far.

Parameters:
  • msg_id (str) – The MAVLink message ID string for LOG_ENTRY (e.g. "118").

  • request_msg (mavutil.mavlink.MAVLink_message) – The encoded LOG_REQUEST_LIST message to send.

  • timeout (float) – Hard upper-bound on how long to wait (seconds).

  • stale_timeout (float) – If no new LOG_ENTRY arrives within this many seconds after the first one, collection stops early and whatever entries were gathered are returned. Only applies when num_logs > 0 and fewer than num_logs entries have been received. Default 2.0 s.

Returns:

{log_id: {"size": int, "utc": int}, …}

Return type:

Dict[int, Dict[str, int]]

async get_param(name: str, timeout: float = 3.0) Dict[str, Any][source]

Request a single PARAM_VALUE for name and return a dict: {“name”: str, “value”: Union[int,float], “raw”: float, “type”: int, “count”: int, “index”: int} Raises TimeoutError if no reply within timeout.

async get_params_bulk_lossy(names: Iterable[str], *, timeout_total: float = 6.0, max_retries: int = 3, max_in_flight: int = 10, resend_interval: float = 0.7, inter_send_delay: float = 0.01) Dict[str, Dict[str, Any]][source]

Lossy-link bulk GET using PARAM_REQUEST_READ by name.

Strategy: - Register ONE PARAM_VALUE handler. - Send read requests in a window (max_in_flight). - Periodically resend still-pending names (resend_interval) up to max_retries. - Stop when all received or timeout_total.

Returns: { name: {“name”,”value”,”raw”,”type”,”count”,”index”}, … } (Partial results if timeout_total hits.)

async reboot_autopilot(reboot_onboard_computer: bool = False, timeout: float = 3.0) RebootAutopilotResponse[source]

Send a reboot command to the autopilot (PX4/ArduPilot).

This sends MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN and waits for a COMMAND_ACK response.

Parameters:
  • reboot_onboard_computer (bool) – If True, also reboot the onboard computer. Default is False.

  • timeout (float) – Maximum time to wait for acknowledgment. Default is 3.0 seconds.

Returns:

Structured response indicating success/failure and reason.

Return type:

RebootAutopilotResponse

Raises:
  • RuntimeError – If MAVLink connection is not established.

  • TimeoutError – If no acknowledgment is received within the timeout.

Notes

After sending this command, the connection to the autopilot will be lost as it reboots. The proxy will attempt to reconnect automatically.

async reboot_autopilot_confirmed(reboot_onboard_computer: bool = False, ack_timeout: float = 3.0, hb_drop_window_s: float = 5.0, hb_drop_gap_s: float = 1.5, hb_return_window_s: float = 30.0, hb_poll_interval_s: float = 0.05, sitl_mode: bool = False) RebootAutopilotResponse[source]

Send a reboot command and confirm success via heartbeat drop + return.

Unlike reboot_autopilot(), heartbeat confirmation is the primary verification method, not a fallback. The flow is:

  1. Send MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN and wait for COMMAND_ACK. - If the ACK is a rejection, return immediately with failure. - If the ACK is accepted or times out, proceed to step 2.

  2. Wait for heartbeat to drop (gap ≥ hb_drop_gap_s within hb_drop_window_s).

  3. Wait for heartbeat to return (new heartbeat after the drop within hb_return_window_s).

  4. Return success only when heartbeat returns, confirming the autopilot has rebooted and is alive again.

Parameters:
  • reboot_onboard_computer (bool) – If True, also reboot the onboard computer.

  • ack_timeout (float) – Maximum time (s) to wait for COMMAND_ACK.

  • hb_drop_window_s (float) – Time window (s) to detect heartbeat drop after command.

  • hb_drop_gap_s (float) – Minimum gap (s) without a heartbeat to consider it dropped.

  • hb_return_window_s (float) – Time window (s) to wait for heartbeat to return after drop.

  • hb_poll_interval_s (float) – Polling interval (s) for heartbeat checks.

  • sitl_mode (bool) – If True, treat a DENIED ACK as acceptable instead of failing immediately. This allows SITL testing where the user manually kills and restarts the px4_sitl process to trigger the heartbeat drop + return cycle.

Return type:

RebootAutopilotResponse

async send_and_wait(*, match_key: str, request_msg: MAVLink_message, collector: Callable[[MAVLink_message], bool], timeout: float = 3.0, queue_length: int | None = None, cancel_event: Event | None = None) None[source]

Transmit request_msg, register a handler on match_key and keep feeding incoming packets to collector until it returns True or timeout expires.

Parameters:
  • match_key – The key used when the proxy dispatches inbound messages (numeric ID as string, e.g. “147”).

  • request_msg – Encoded MAVLink message to send - COMMAND_LONG, LOG_REQUEST_LIST, …

  • collector – Callback that receives each matching packet. Must return True once the desired condition is satisfied; returning False keeps waiting.

  • timeout – Maximum seconds to block.

  • queue_length – Optional maximum queue length for the handler. If the queue exceeds this length, older packets will be dropped. If None, the default queue length is used.

  • cancel_event – Optional threading.Event that can be set to cancel the wait.

Raises:
  • RuntimeError – If MAVLink connection is not established.

  • TimeoutError – If no matching response is received within the timeout.

async send_heartbeat()[source]

Send a MAVLink heartbeat message.

async send_shell_command(command: str, settle_time: float = 0.5, timeout: float = 3.0) str[source]

Send a shell command to PX4 via MAVLink SERIAL_CONTROL and wait for PX4 to acknowledge it.

Uses send_and_wait() so the command is transmitted through the normal I/O queue and we block until PX4 replies with at least one SERIAL_CONTROL packet (flag REPLY), confirming the shell received the command. A settle_time pause is added afterwards so PX4 has time to act on it (e.g. release file handles).

Parameters:
  • command (str) – The shell command to send (e.g. "logger stop\n").

  • settle_time (float) – Extra time (seconds) to wait after PX4 acknowledges the command, giving it time to act (e.g. close file handles).

  • timeout (float) – Maximum time (seconds) to wait for a reply before raising TimeoutError.

Returns:

The text content of the first SERIAL_CONTROL reply from PX4, or an empty string if no reply was received.

Return type:

str

async set_param(name: str, value: Any, ptype: int | None = None, timeout: float = 3.0) Dict[str, Any][source]

Set a parameter and confirm by reading back. value can be int or float. Returns the confirmed PARAM_VALUE dict (same shape as get_param()).

Uses proper INT32 encoding where int32 values are encoded as float32 bits for wire transmission.

>>> ["MAV_PARAM_TYPE"] = {
>>>     [1] = "MAV_PARAM_TYPE_UINT8",
>>>     [2] = "MAV_PARAM_TYPE_INT8",
>>>     [3] = "MAV_PARAM_TYPE_UINT16",
>>>     [4] = "MAV_PARAM_TYPE_INT16",
>>>     [5] = "MAV_PARAM_TYPE_UINT32",
>>>     [6] = "MAV_PARAM_TYPE_INT32",
>>>     [7] = "MAV_PARAM_TYPE_UINT64",
>>>     [8] = "MAV_PARAM_TYPE_INT64",
>>>     [9] = "MAV_PARAM_TYPE_REAL32",
>>>     [10] = "MAV_PARAM_TYPE_REAL64",
>>> }
async set_params_bulk_lossy(params_to_set: Dict[str, Any | Tuple[Any, str | int] | Dict[str, Any]], *, timeout_total: float = 8.0, max_retries: int = 3, max_in_flight: int = 8, resend_interval: float = 0.8, inter_send_delay: float = 0.01, verify_ack_value: bool = True) Dict[str, Dict[str, Any]][source]

Lossy-link bulk PARAM_SET: - User can provide optional type per param as “UINT8”, “INT16”, “REAL32”, etc. - If type omitted -> auto: int -> INT32, float -> REAL32 - Windowed sends + periodic resend + retry cap - Confirms via echoed PARAM_VALUE

Returns: confirmed {name: meta_dict}

async start()[source]

Open the MAVLink connection then launch the worker thread.

async start_px4_logging(settle_time: float = 1.0, timeout: float = 5.0) None[source]

Restart PX4’s SD-card logger after file operations.

Sends logger start\n via the MAVLink shell.

async stop()[source]

Stop the worker and close the link.

async stop_px4_logging(settle_time: float = 2.0, timeout: float = 5.0) None[source]

Stop PX4’s SD-card logger so that log files can be deleted.

Sends logger stop\n via the MAVLink shell and waits for PX4 to acknowledge. The logger module releases its file handles, making files deletable via MAVFTP.

property target_component: int

Return the target component ID of the MAVLink connection.

property target_system: int

Return the target system ID of the MAVLink connection.

class petal_app_manager.proxies.external.MavLinkFTPProxy(mavlink_proxy: MavLinkExternalProxy)[source]

Bases: BaseProxy

Threaded MAVLink FTP driver using pymavlink.

async clear_error_logs(remote_path: str, connection_timeout: float = 3.0) Dict[str, Any][source]

Clear fail_*.log error logs under remote_path from the vehicle.

Uses MAVFTP only for listing, then deletes each file via the NuttX shell rm command (same approach as delete_all_logs).

Returns a summary dict {total, deleted, failed}.

async delete_all_logs(base: str = None, connection_timeout: float = 3.0, progress_callback: Callable[[int, int], Any] | None = None) Dict[str, Any][source]

Recursively delete every .ulg file under base on the vehicle.

Uses MAVFTP only for listing directories, then deletes each file via the NuttX shell rm command (MAVLink SERIAL_CONTROL). This bypasses PX4’s MAVFTP FileProtected (code 9) restriction which blocks cmd_rm on all log files.

Stops the PX4 logger before deletion and restarts it afterwards.

Parameters:
  • base – Root directory to scan for .ulg files.

  • connection_timeout – Seconds to wait for a MAVLink connection.

  • progress_callback – Optional callback(current_index, total) invoked after each file deletion attempt. May be a regular function or an async coroutine.

Returns a summary dict with deleted/failed counts.

async delete_file(remote_path: str, connection_timeout: float = 3.0, stop_logger: bool = True) None[source]

Delete a single file at remote_path on the vehicle.

Uses the NuttX shell rm command via MAVLink SERIAL_CONTROL, which bypasses PX4’s MAVFTP FileProtected restriction.

Parameters:
  • remote_path (str) – The path on the vehicle to delete.

  • connection_timeout (float) – Maximum time to wait for MAVLink connection.

  • stop_logger (bool) – If True, stop PX4 logging before deletion and restart it afterwards. Pass False when the caller has already stopped the logger (e.g. bulk-delete loop).

async detect_error_logs(remote_path: str, connection_timeout: float = 3.0) Dict[str, Any][source]

Detect fail_*.log error log files under remote_path on the vehicle.

Lists files via MAVFTP without deleting anything.

Returns a summary dict {total, files} where files is a list of {path, size_bytes} dicts.

async download_ulog(remote_path: str, local_path: Path, completed_event: Event, on_progress: Callable[[float], Awaitable[None]] | None = None, cancel_event: Event | None = None, connection_timeout: float = 3.0, n_attempts: int = 3) Path[source]

Fetch remote_path from the vehicle into local_path.

Returns the Path actually written on success or None if cancelled.

async list_directory(base: str = None, connection_timeout: float = 3.0) List[str][source]

List all files and directories under base on the vehicle.

async list_ulogs(base: str = None, connection_timeout: float = 3.0) List[ULogInfo][source]

Return metadata for every .ulg file on the vehicle.

async start()[source]

Open the MAVLink connection then launch the worker thread.

async stop()[source]

Stop the worker and close the link.

class petal_app_manager.proxies.external.ULogInfo(*, index: int, remote_path: str, size_bytes: int, utc: int)[source]

Bases: BaseModel

Metadata for a ULog that resides on the PX4 SD-card.

index: int
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

remote_path: str
size_bytes: int
utc: int
petal_app_manager.proxies.external.setup_file_only_logger(name: str, log_file: str, level: str = 'INFO') Logger[source]

Setup a logger that only writes to files, not console.

Local Database Proxy

LocalDBProxy

  • Provides access to the local DynamoDB instance through the controller dashboard API

  • Handles machine ID retrieval and authentication for requests

  • Abstracts the HTTP communication details away from petals

  • Provides async CRUD operations for DynamoDB tables

This proxy allows petals to interact with DynamoDB without worrying about the underlying HTTP communication details.

class petal_app_manager.proxies.localdb.LocalDBProxy(get_data_url: str, scan_data_url: str, update_data_url: str, set_data_url: str, host: str, port: int, debug: bool = False)[source]

Bases: BaseProxy

Proxy for communicating with a local DynamoDB instance through a custom API.

async delete_item(table_name: str, filter_key: str, filter_value: str) Dict[str, Any][source]

Soft delete an item from DynamoDB by setting deleted=True.

Parameters:
  • table_name – The DynamoDB table name

  • filter_key – Name of the key to filter on (usually ‘id’)

  • filter_value – Value of the key to delete

Returns:

Response from the update operation

async get_item(table_name: str, partition_key: str, partition_value: str) Dict[str, Any][source]

Retrieve a single item from DynamoDB using its partition key. Only returns items that are not soft-deleted (deleted != True) and belong to this machine.

Parameters:
  • table_name – The DynamoDB table name

  • partition_key – Name of the partition key (usually ‘id’)

  • partition_value – Value of the partition key to look up

Returns:

The item as a dictionary if found, not deleted, and belongs to this machine, or an error dictionary

property machine_id: str | None

Get the machine ID for this instance.

property organization_id: str | None

Get the organization ID from OrganizationManager.

property robot_type_id: str | None

Get the robot type ID for this instance.

async scan_items(table_name: str, filters: List[Dict[str, str]] | None = None) List[Dict[str, Any]][source]

Scan a DynamoDB table with optional filters. Only returns items that are not soft-deleted (deleted != True) and belong to this machine.

Parameters:
  • table_name – The DynamoDB table name

  • filters – List of filter dictionaries, each with ‘filter_key_name’ and ‘filter_key_value’

Returns:

List of matching items that are not deleted and belong to this machine

async set_item(table_name: str, filter_key: str, filter_value: str, data: Dict[str, Any]) Dict[str, Any][source]

Puts an item in DynamoDB. Automatically adds robot_instance_id to ensure item belongs to this machine.

Parameters:
  • table_name – The DynamoDB table name

  • filter_key – Name of the key to filter on (usually ‘id’)

  • filter_value – Value of the key to update

  • data – The complete item data to update or insert

Returns:

Response from the set operation

async start()[source]

Initialize the connection to the local API service.

async stop()[source]

Clean up resources when shutting down.

async update_item(table_name: str, filter_key: str, filter_value: str, data: Dict[str, Any]) Dict[str, Any][source]

Update or insert an item in DynamoDB. Automatically adds robot_instance_id to ensure item belongs to this machine.

Parameters:
  • table_name – The DynamoDB table name

  • filter_key – Name of the key to filter on (usually ‘id’)

  • filter_value – Value of the key to update

  • data – The complete item data to update or insert

Returns:

Response from the update operation

Cloud Proxy

CloudDBProxy

  • Provides access to the cloud DynamoDB instance through authenticated API calls

  • Handles authentication token retrieval and management with caching

  • Abstracts the HTTP communication details away from petals

  • Provides async CRUD operations for DynamoDB tables in the cloud

This proxy allows petals to interact with cloud DynamoDB without worrying about the underlying authentication and HTTP communication details.

class petal_app_manager.proxies.cloud.CloudDBProxy(access_token_url: str, endpoint: str, session_token_url: str = None, s3_bucket_name: str = None, get_data_url: str = '/drone/onBoard/config/getData', scan_data_url: str = '/drone/onBoard/config/scanData', update_data_url: str = '/drone/onBoard/config/updateData', set_data_url: str = '/drone/onBoard/config/setData', debug: bool = False, request_timeout: int = 30)[source]

Bases: BaseProxy

Proxy for communicating with a cloud DynamoDB instance through authenticated API calls.

async delete_item(table_name: str, filter_key: str, filter_value: str) Dict[str, Any][source]

Soft delete an item from DynamoDB by setting deleted=True.

Parameters:
  • table_name – The DynamoDB table name

  • filter_key – Name of the key to filter on (usually ‘id’)

  • filter_value – Value of the key to delete

Returns:

Response from the update operation

async get_item(table_name: str, partition_key: str, partition_value: str) Dict[str, Any][source]

Retrieve a single item from DynamoDB using its partition key. Only returns items that are not soft-deleted (deleted != True) and belong to this machine.

Parameters:
  • table_name – The DynamoDB table name

  • partition_key – Name of the partition key (usually ‘id’)

  • partition_value – Value of the partition key to look up

Returns:

The item as a dictionary if found, not deleted, and belongs to this machine, or an error dictionary

async scan_items(table_name: str, filters: List[Dict[str, str]] | None = None) Dict[str, Any][source]

Scan a DynamoDB table with optional filters. Only returns items that are not soft-deleted (deleted != True) and belong to this machine.

Parameters:
  • table_name – The DynamoDB table name

  • filters – List of filter dictionaries, each with ‘filter_key_name’ and ‘filter_key_value’

Returns:

Dictionary containing list of matching items that are not deleted and belong to this machine

async set_item(table_name: str, filter_key: str, filter_value: str, data: Dict[str, Any]) Dict[str, Any][source]

Puts an item in DynamoDB. Automatically adds robot_instance_id to ensure item belongs to this machine.

Parameters:
  • table_name – The DynamoDB table name

  • filter_key – Name of the key to filter on (usually ‘id’)

  • filter_value – Value of the key to update

  • data – The complete item data to update or insert

Returns:

Response from the set operation

async start()[source]

Initialize the cloud proxy and fetch initial credentials.

async stop()[source]

Clean up resources when shutting down.

async update_item(table_name: str, filter_key: str, filter_value: str, data: Dict[str, Any]) Dict[str, Any][source]

Update or insert an item in DynamoDB. Automatically adds robot_instance_id to ensure item belongs to this machine.

Parameters:
  • table_name – The DynamoDB table name

  • filter_key – Name of the key to filter on (usually ‘id’)

  • filter_value – Value of the key to update

  • data – The complete item data to update or insert

Returns:

Response from the update operation

Bucket Proxy

class petal_app_manager.proxies.bucket.S3BucketProxy(session_token_url: str, bucket_name: str, upload_prefix: str = 'flight_logs/', debug: bool = False, request_timeout: int = 30)[source]

Bases: BaseProxy

Proxy for communicating with an S3 bucket for flight log storage. Supports upload, download, and listing of .ulg and .bag files.

ALLOWED_EXTENSIONS = {'.bag', '.ulg'}
async delete_file(s3_key: str) Dict[str, Any][source]

Delete a file from S3.

Parameters:

s3_key – S3 key of the file to delete

Returns:

Dictionary with deletion results

async download_file(s3_key: str, local_path: Path) Dict[str, Any][source]

Download a file from S3.

Parameters:
  • s3_key – S3 key of the file to download

  • local_path – Local path where to save the file

Returns:

Dictionary with download results

async head_object(s3_key: str) Dict[str, Any][source]

Check if an object exists in S3 and retrieve its metadata.

Parameters:

s3_key – S3 key of the object to check

Returns:

Dictionary with head object results

async list_files(prefix: str | None = None, max_keys: int = 100) Dict[str, Any][source]

List files in the S3 bucket for the current machine.

Parameters:
  • prefix – Optional additional prefix to filter by

  • max_keys – Maximum number of files to return (default 100, max 1000)

Returns:

Dictionary with list of files

async move_file(source_key: str, dest_key: str) Dict[str, Any][source]

Move (rename) a file within the S3 bucket.

Parameters:
  • source_key – Current S3 key of the file

  • dest_key – New S3 key for the file

Returns:

Dictionary with move results

async start()[source]

Initialize the S3 proxy and fetch initial credentials.

async stop()[source]

Clean up resources when shutting down.

async upload_file(file_path: Path, custom_filename: str | None = None, custom_s3_key: str | None = None) Dict[str, Any][source]

Upload a flight log file to S3.

Parameters:
  • file_path – Path to the local file to upload

  • custom_filename – Optional custom filename (defaults to original)

  • custom_s3_key – Optional custom S3 key (overrides default key generation)

Returns:

Dictionary with upload results

MQTT Proxy

MQTTProxy

  • Provides access to AWS IoT MQTT broker through TypeScript client API calls

  • Handles callback server for receiving continuous message streams

  • Uses async callback dispatch for message processing

  • Abstracts MQTT communication details away from petals

  • Provides async pub/sub operations with callback-style message handling

This proxy allows petals to interact with MQTT without worrying about the underlying connection management and HTTP communication details.

class petal_app_manager.proxies.mqtt.MQTTProxy(ts_client_host: str = 'localhost', ts_client_port: int = 3004, callback_host: str = 'localhost', callback_port: int = 3005, enable_callbacks: bool = True, debug: bool = False, request_timeout: int = 30, max_seen_message_ids: int = 1000, command_edge_topic: str = 'command/edge', response_topic: str = 'response', test_topic: str = 'command', command_web_topic: str = 'command/web', health_check_interval: float = 10.0)[source]

Bases: BaseProxy

Proxy for communicating with AWS IoT MQTT through TypeScript client API calls. Uses async callback dispatch for message processing.

The callback endpoint is exposed as a FastAPI router that should be registered with the main application. The callback_port should be set to the main app’s port (e.g., 8000) since the callback router is now part of the main app.

Configuration note:

Set PETAL_CALLBACK_PORT to the main FastAPI app port (default: 8000) The callback URL will be: http://{callback_host}:{callback_port}/mqtt-callback/callback

async health_check() Dict[str, Any][source]

Check MQTT proxy health status.

property organization_id: str | None

Organization ID property for backward compatibility. Fetches organization_id on-demand from OrganizationManager.

Returns:

Organization ID if available, None otherwise

async publish_message(payload: Dict[str, Any], qos: int = 1) bool[source]

Publish a message to an MQTT topic via TypeScript client to ‘command/web’ topic. :param payload: Message payload as a dictionary :param qos: Quality of Service level (0, 1, or 2)

Returns:

True if published successfully, False otherwise

register_handler(handler: Callable[[str, Dict[str, Any]], Awaitable[None]], cpu_heavy: bool = False) str[source]

Register an async handler to the ‘command/edge’ topic.

Parameters:
  • handler – Async callback function to handle messages

  • cpu_heavy – If True, the handler’s CPU-bound work will be offloaded to a thread-pool executor managed by the proxy.

Returns:

Subscription ID string, or None if registration failed

Raises:

TypeError – If handler is not an async function

async send_command_response(message_id: str, response_data: Dict[str, Any]) bool[source]

Send a command response to the response topic. :param message_id: Original message ID to correlate response :param response_data: Response payload data

Returns:

True if published successfully, False otherwise

async start()[source]

Initialize the MQTT proxy and start callback processing.

async stop()[source]

Clean up resources when shutting down.

unregister_handler(subscription_id: str) bool[source]

Unregister a handler from the ‘command/edge’ topic.

class petal_app_manager.proxies.mqtt.MessageCallback(*, topic: str, payload: Dict[str, Any], timestamp: str | None = None, qos: int | None = None)[source]

Bases: BaseModel

Model for incoming MQTT messages via callback

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

payload: Dict[str, Any]
qos: int | None
timestamp: str | None
topic: str