Proxies
Base Proxy
- class petal_app_manager.proxies.base.BaseProxy[source]
Bases:
ABCAbstract base for every proxy.
Subclasses must implement
start()andstop().The optional
is_ready/wait_until_ready()pair lets petals wait until a proxy has finished its startup handshake (e.g. MQTT connected, MAVLink heartbeat received) before calling its public API.
Redis Proxy
RedisProxy
Simple Redis proxy for basic pub/sub messaging and key-value operations. Supports Unix socket connections.
- class petal_app_manager.proxies.redis.RedisProxy(host: str = 'localhost', port: int = 6379, db: int = 0, password: str | None = None, debug: bool = False, unix_socket_path: str | None = None)[source]
Bases:
BaseProxySimple Redis proxy for pub/sub messaging and key-value operations. Supports Unix socket connections.
- async list_online_applications() List[str][source]
List online applications by checking Redis keys for app registrations.
- async publish(channel: str, message: str) int[source]
Publish a message to a channel (async, non-blocking).
- register_pattern_channel_callback(channel: str, callback: Callable[[str, str], Awaitable[None]], cpu_heavy: bool = False)[source]
Register a callback for a specific channel for pattern subscriptions.
The callback must be an async function (coroutine). Sync callbacks are not supported and will raise TypeError.
- Parameters:
channel – The channel to register the callback for
callback – Async callback function that receives (channel, data)
cpu_heavy – If True, the callback’s CPU-bound work will be offloaded to a thread-pool executor managed by the proxy.
- async scan_keys(pattern: str, count: int = 100) List[str][source]
Scan Redis keys matching a pattern.
- Parameters:
pattern – Key pattern to match (e.g.,
job:*)count – Number of keys to return per scan iteration
- Returns:
List of matching keys
- subscribe(channel: str, callback: Callable[[str, str], Awaitable[None]], cpu_heavy: bool = False)[source]
Subscribe to a channel with an async callback function.
- Parameters:
channel – The Redis channel to subscribe to
callback – Async callback function that receives (channel, data)
cpu_heavy – If True, the callback’s CPU-bound work will be offloaded to a thread-pool executor managed by the proxy.
- Raises:
TypeError – If callback is not an async function
External MAVLink Proxy
petalappmanager.proxies.external
Thread-based proxies for long-running I/O back-ends (MAVLink, ROS 1, …).
Key changes vs. the first draft:
All per-key buffers are now
collections.dequewithmaxlen. New data silently overwrites the oldest entry → bounded memory.Public API (
send,register_handler) is unchanged for petals.Docstrings preserved / expanded for clarity.
- exception petal_app_manager.proxies.external.DownloadCancelledException[source]
Bases:
ExceptionRaised when a download is cancelled by the user.
- class petal_app_manager.proxies.external.ExternalProxy(maxlen: int, sleep_time_ms: float = 1.0)[source]
Bases:
BaseProxyBase class for I/O drivers that must poll or listen continuously.
A dedicated thread calls
_io_read_once()/_io_write_once()in a tight loop while the FastAPI event-loop thread stays unblocked.Send buffers -
self._send[key](deque, newest → right side) Outbound messages are enqueued here viasend(). The I/O thread drains these buffers by calling_io_write_once()with all pending messages.Handlers -
self._handlers[key]Callbacks registered viaregister_handler()are stored here. When new messages arrive via_io_read_once(), they are processed directly in the I/O recv thread, which schedules async handlers on the event loop.
- register_handler(key: str, fn: Callable[[Any], Awaitable[None]], duplicate_filter_interval: float | None = None, queue_length: int | None = None, cpu_heavy: bool = False) None[source]
Attach an async callback fn so it fires for every message appended to
_recv[key].The callback is scheduled on the main event loop for thread-safe execution.
- Parameters:
key (str) – The key to register the handler for.
fn (Callable[[Any], Awaitable[None]]) – The async handler function to call for each message.
duplicate_filter_interval (Optional[float]) – If specified, duplicate messages received within this interval (in seconds) will be filtered out and the handler will not be called. None disables filtering.
queue_length (Optional[int]) – If specified, sets the maximum length of the message buffer for key. New messages overwrite the oldest when the buffer is full. If None, the default maxlen from proxy initialization is used.
cpu_heavy (bool) – If True, the handler’s CPU-bound work will be offloaded to a thread-pool executor managed by the proxy, preventing event-loop starvation. The handler must still be
async def, but it may callawait loop.run_in_executor(...)internally—or simply set this flag and the proxy will wrap the entire handler invocation inrun_in_executorautomatically.
- Raises:
TypeError – If fn is not an async function (coroutine).
- send(key: str, msg: Any, burst_count: int | None = None, burst_interval: float | None = None) None[source]
Enqueue msg for transmission. The newest message is kept if the buffer is already full.
- Parameters:
key (str) – The key to send the message on.
msg (Any) – The message to send.
burst_count (Optional[int]) – If specified, send the message this many times in a burst.
burst_interval (Optional[float]) – If burst_count is specified, wait this many seconds between each message. If None, all messages are sent immediately.
- exception petal_app_manager.proxies.external.FTPDeleteError(path: str, ftp_code: int, message: str)[source]
Bases:
ExceptionRaised when a MAVFTP delete operation fails with a known FTP error code.
- class petal_app_manager.proxies.external.MavLinkExternalProxy(endpoint: str, baud: int, source_system_id: int, source_component_id: int, maxlen: int, mavlink_worker_sleep_ms: float = 1.0, mavlink_heartbeat_send_frequency: float = 5.0, root_sd_path: str = 'fs/microsd/log')[source]
Bases:
ExternalProxyThreaded MAVLink driver using pymavlink.
Buffers used
send["mav"]- outboundMAVLink_messagerecv["mav"]- any inbound messagerecv[str(msg.get_msgId())]- by numeric IDrecv[msg.get_type()]- by string type
- build_format_storage_command(storage_id: int = 0) MAVLink_command_long_message[source]
Build a MAVLink command to format the SD card (MAV_CMD_PREFLIGHT_STORAGE).
- Parameters:
storage_id (int) – Storage device ID (0 = primary SD card). Default is 0.
- Returns:
The MAVLink COMMAND_LONG message for format storage.
- Return type:
mavutil.mavlink.MAVLink_command_long_message
- Raises:
RuntimeError – If MAVLink connection is not established.
- build_motor_value_command(motor_idx: int, motor_value: float, timeout: float) MAVLink_command_long_message[source]
Build MAV_CMD_ACTUATOR_TEST command for a motor.
- build_param_request_read(name: str, index: int = -1)[source]
Build MAVLink PARAM_REQUEST_READ for a named or indexed parameter. If index == -1, the ‘name’ is used; otherwise PX4 will ignore name.
- build_param_set(name: str, value: Any, param_type: int)[source]
Build MAVLink PARAM_SET for setting a parameter. Handles INT32 encoding where int32 values are encoded as float32 bits for wire transmission.
- build_reboot_command(reboot_autopilot: bool = True, reboot_onboard_computer: bool = False) MAVLink_command_long_message[source]
Build a MAVLink command to reboot the autopilot and/or onboard computer.
- Parameters:
- Returns:
The MAVLink COMMAND_LONG message for reboot.
- Return type:
mavutil.mavlink.MAVLink_command_long_message
- Raises:
RuntimeError – If MAVLink connection is not established.
- build_req_msg_log_data(log_id: int, ofs: int = 0, count: int = 4294967295) MAVLink_log_request_data_message[source]
Build LOG_REQUEST_DATA for a given log.
- build_req_msg_log_request(message_id: int) MAVLink_log_request_list_message[source]
Build a MAVLink command to request a specific log message.
- Parameters:
message_id (int) – The numeric ID of the log message to request.
- Returns:
The MAVLink command message to request the specified log.
- Return type:
mavutil.mavlink.MAVLink_log_request_list_message
- Raises:
RuntimeError – If MAVLink connection is not established.
- build_req_msg_long(message_id: int) MAVLink_command_long_message[source]
Build a MAVLink command to request a specific message type.
- Parameters:
message_id (int) – The numeric ID of the MAVLink message to request.
- Returns:
The MAVLink command message to request the specified message.
- Return type:
mavutil.mavlink.MAVLink_command_long_message
- Raises:
RuntimeError – If MAVLink connection is not established.
- build_request_message_command() MAVLink_command_long_message[source]
Build a MAVLink command to request a specific message once using MAV_CMD_REQUEST_MESSAGE (common.xml).
- Returns:
The MAVLink COMMAND_LONG message requesting the given message.
- Return type:
mavutil.mavlink.MAVLink_command_long_message
- Raises:
RuntimeError – If MAVLink connection is not established.
- build_shell_serial_control_msgs(text: str, device: int = 10, respond: bool = True, exclusive: bool = True) list[MAVLink_serial_control_message][source]
Build SERIAL_CONTROL messages that write text to the PX4 MAVLink shell. Splits into <=70 byte chunks (MAVLink SERIAL_CONTROL data field).
- async delete_file_via_shell(remote_path: str, timeout: float = 5.0, command: str = 'rm') bool[source]
Delete a file (or empty directory) on PX4 using a NuttX shell command.
This is a fallback for when MAVFTP
cmd_rmfails withFileProtected(code 9). The shellrmbypasses the MAVFTP server’s protection checks and deletes the file directly via NuttXunlink().- Parameters:
- Returns:
Trueif the command completed (no error detected in the reply text),Falseotherwise.- Return type:
- async download_log(*, log_id: int, completed_event: Event, timeout: float = 60.0, buffer: bytearray | None = None, callback: Callable[[int], Awaitable[None]] | None = None, end_of_buffer_timeout: float = 10.0, size_bytes: int | None = None) bytes[source]
Download one log file via LOG_REQUEST_DATA / LOG_DATA.
- Parameters:
log_id – The LOG_ENTRY.id of the log to download.
completed_event – threading.Event that will be set when download completes.
timeout – Total time allowed for the whole transfer.
buffer – Optional bytearray to use as the download buffer.
callback – Optional async function called as callback(received_bytes) after each LOG_DATA packet is processed.
end_of_buffer_timeout – Timeout in seconds to wait for new data before aborting.
size_bytes – Optional total size of the log in bytes., if known.
- Returns:
Raw log bytes (ULog/Dataflash).
- Return type:
- Raises:
RuntimeError – If MAVLink connection is not established.
TimeoutError – If no complete log is received within the timeout.
- async download_log_buffered(*, log_id: int, completed_event: Event, size_bytes: int | None = None, timeout: float = 3.0, chunk_size: int = 90, max_retries: int = 3, cancel_event: Event | None = None, buffer: bytearray | None = None, callback: Callable[[int], Awaitable[None]] | None = None) bytes[source]
Download one log file via repeated LOG_REQUEST_DATA / LOG_DATA exchanges.
Strategy:
For ofs = 0, chunk_size, 2*chunk_size, …
send LOG_REQUEST_DATA(log_id, ofs, chunk_size)
wait for LOG_DATA(log_id, ofs, …)
append data[:count]
stop when:
count == 0 (end-of-log by spec), or
ofs + count >= size_bytes (if known)
- Parameters:
log_id – LOG_ENTRY.id of the log to download.
completed_event – threading.Event set when download is fully complete.
size_bytes – Optional total size of the log from LOG_ENTRY.size; if given, used as termination condition and sanity cap.
timeout – Timeout per chunk request (seconds).
chunk_size – Requested count per chunk. For MAVLink v1, LOG_DATA carries 90 bytes.
max_retries – Number of retries per chunk on timeout.
cancel_event – Optional threading.Event to cancel the download.
buffer – Optional bytearray; if None, a new one is created.
callback – Optional sync or async callback called as callback(total_bytes_received) after each successful chunk.
- Returns:
Raw log data.
- Return type:
- async format_sd_card(storage_id: int = 0, timeout: float = 10.0) FormatStorageResponse[source]
Send a format storage command to the autopilot and wait for ACK.
- Parameters:
- Returns:
Structured response indicating success/failure and reason.
- Return type:
FormatStorageResponse
- async get_all_params(timeout: float = 10.0)[source]
Request entire parameter list and return: { “<NAME>”: {“value”: int|float, “raw”: float, “type”: int, “index”: int, “count”: int}, … }
- async get_log_entries(*, msg_id: str, request_msg: MAVLink_message, timeout: float = 8.0, stale_timeout: float = 2.0) Dict[int, Dict[str, int]][source]
Send LOG_REQUEST_LIST and gather all LOG_ENTRY packets.
Handles two edge cases that PX4 firmware exhibits:
No logs on the vehicle — PX4 responds with a single
LOG_ENTRYwherenum_logs == 0. The MAVLink spec requires this sentinel packet; we detect it and return an empty dict immediately instead of waiting for the timeout.Corrupted / missing log entries — PX4 may advertise
num_logs = Nbut only deliver fewer than NLOG_ENTRYpackets (corrupted slots are silently skipped). A staleness timer (stale_timeout) detects when no new entry has arrived for a while and returns whatever was collected so far.
- Parameters:
msg_id (str) – The MAVLink message ID string for LOG_ENTRY (e.g.
"118").request_msg (mavutil.mavlink.MAVLink_message) – The encoded LOG_REQUEST_LIST message to send.
timeout (float) – Hard upper-bound on how long to wait (seconds).
stale_timeout (float) – If no new LOG_ENTRY arrives within this many seconds after the first one, collection stops early and whatever entries were gathered are returned. Only applies when
num_logs > 0and fewer thannum_logsentries have been received. Default 2.0 s.
- Returns:
{log_id: {"size": int, "utc": int}, …}- Return type:
- async get_param(name: str, timeout: float = 3.0) Dict[str, Any][source]
Request a single PARAM_VALUE for name and return a dict: {“name”: str, “value”: Union[int,float], “raw”: float, “type”: int, “count”: int, “index”: int} Raises TimeoutError if no reply within timeout.
- async get_params_bulk_lossy(names: Iterable[str], *, timeout_total: float = 6.0, max_retries: int = 3, max_in_flight: int = 10, resend_interval: float = 0.7, inter_send_delay: float = 0.01) Dict[str, Dict[str, Any]][source]
Lossy-link bulk GET using PARAM_REQUEST_READ by name.
Strategy: - Register ONE PARAM_VALUE handler. - Send read requests in a window (max_in_flight). - Periodically resend still-pending names (resend_interval) up to max_retries. - Stop when all received or timeout_total.
Returns: { name: {“name”,”value”,”raw”,”type”,”count”,”index”}, … } (Partial results if timeout_total hits.)
- async reboot_autopilot(reboot_onboard_computer: bool = False, timeout: float = 3.0) RebootAutopilotResponse[source]
Send a reboot command to the autopilot (PX4/ArduPilot).
This sends MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN and waits for a COMMAND_ACK response.
- Parameters:
- Returns:
Structured response indicating success/failure and reason.
- Return type:
RebootAutopilotResponse
- Raises:
RuntimeError – If MAVLink connection is not established.
TimeoutError – If no acknowledgment is received within the timeout.
Notes
After sending this command, the connection to the autopilot will be lost as it reboots. The proxy will attempt to reconnect automatically.
- async reboot_autopilot_confirmed(reboot_onboard_computer: bool = False, ack_timeout: float = 3.0, hb_drop_window_s: float = 5.0, hb_drop_gap_s: float = 1.5, hb_return_window_s: float = 30.0, hb_poll_interval_s: float = 0.05, sitl_mode: bool = False) RebootAutopilotResponse[source]
Send a reboot command and confirm success via heartbeat drop + return.
Unlike
reboot_autopilot(), heartbeat confirmation is the primary verification method, not a fallback. The flow is:Send MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN and wait for COMMAND_ACK. - If the ACK is a rejection, return immediately with failure. - If the ACK is accepted or times out, proceed to step 2.
Wait for heartbeat to drop (gap ≥
hb_drop_gap_swithinhb_drop_window_s).Wait for heartbeat to return (new heartbeat after the drop within
hb_return_window_s).Return success only when heartbeat returns, confirming the autopilot has rebooted and is alive again.
- Parameters:
reboot_onboard_computer (bool) – If True, also reboot the onboard computer.
ack_timeout (float) – Maximum time (s) to wait for COMMAND_ACK.
hb_drop_window_s (float) – Time window (s) to detect heartbeat drop after command.
hb_drop_gap_s (float) – Minimum gap (s) without a heartbeat to consider it dropped.
hb_return_window_s (float) – Time window (s) to wait for heartbeat to return after drop.
hb_poll_interval_s (float) – Polling interval (s) for heartbeat checks.
sitl_mode (bool) – If True, treat a DENIED ACK as acceptable instead of failing immediately. This allows SITL testing where the user manually kills and restarts the
px4_sitlprocess to trigger the heartbeat drop + return cycle.
- Return type:
RebootAutopilotResponse
- async send_and_wait(*, match_key: str, request_msg: MAVLink_message, collector: Callable[[MAVLink_message], bool], timeout: float = 3.0, queue_length: int | None = None, cancel_event: Event | None = None) None[source]
Transmit request_msg, register a handler on match_key and keep feeding incoming packets to collector until it returns True or timeout expires.
- Parameters:
match_key – The key used when the proxy dispatches inbound messages (numeric ID as string, e.g. “147”).
request_msg – Encoded MAVLink message to send - COMMAND_LONG, LOG_REQUEST_LIST, …
collector – Callback that receives each matching packet. Must return True once the desired condition is satisfied; returning False keeps waiting.
timeout – Maximum seconds to block.
queue_length – Optional maximum queue length for the handler. If the queue exceeds this length, older packets will be dropped. If None, the default queue length is used.
cancel_event – Optional threading.Event that can be set to cancel the wait.
- Raises:
RuntimeError – If MAVLink connection is not established.
TimeoutError – If no matching response is received within the timeout.
- async send_shell_command(command: str, settle_time: float = 0.5, timeout: float = 3.0) str[source]
Send a shell command to PX4 via MAVLink SERIAL_CONTROL and wait for PX4 to acknowledge it.
Uses
send_and_wait()so the command is transmitted through the normal I/O queue and we block until PX4 replies with at least oneSERIAL_CONTROLpacket (flagREPLY), confirming the shell received the command. A settle_time pause is added afterwards so PX4 has time to act on it (e.g. release file handles).- Parameters:
- Returns:
The text content of the first SERIAL_CONTROL reply from PX4, or an empty string if no reply was received.
- Return type:
- async set_param(name: str, value: Any, ptype: int | None = None, timeout: float = 3.0) Dict[str, Any][source]
Set a parameter and confirm by reading back. value can be int or float. Returns the confirmed PARAM_VALUE dict (same shape as get_param()).
Uses proper INT32 encoding where int32 values are encoded as float32 bits for wire transmission.
>>> ["MAV_PARAM_TYPE"] = { >>> [1] = "MAV_PARAM_TYPE_UINT8", >>> [2] = "MAV_PARAM_TYPE_INT8", >>> [3] = "MAV_PARAM_TYPE_UINT16", >>> [4] = "MAV_PARAM_TYPE_INT16", >>> [5] = "MAV_PARAM_TYPE_UINT32", >>> [6] = "MAV_PARAM_TYPE_INT32", >>> [7] = "MAV_PARAM_TYPE_UINT64", >>> [8] = "MAV_PARAM_TYPE_INT64", >>> [9] = "MAV_PARAM_TYPE_REAL32", >>> [10] = "MAV_PARAM_TYPE_REAL64", >>> }
- async set_params_bulk_lossy(params_to_set: Dict[str, Any | Tuple[Any, str | int] | Dict[str, Any]], *, timeout_total: float = 8.0, max_retries: int = 3, max_in_flight: int = 8, resend_interval: float = 0.8, inter_send_delay: float = 0.01, verify_ack_value: bool = True) Dict[str, Dict[str, Any]][source]
Lossy-link bulk PARAM_SET: - User can provide optional type per param as “UINT8”, “INT16”, “REAL32”, etc. - If type omitted -> auto: int -> INT32, float -> REAL32 - Windowed sends + periodic resend + retry cap - Confirms via echoed PARAM_VALUE
Returns: confirmed {name: meta_dict}
- async start_px4_logging(settle_time: float = 1.0, timeout: float = 5.0) None[source]
Restart PX4’s SD-card logger after file operations.
Sends
logger start\nvia the MAVLink shell.
- async stop_px4_logging(settle_time: float = 2.0, timeout: float = 5.0) None[source]
Stop PX4’s SD-card logger so that log files can be deleted.
Sends
logger stop\nvia the MAVLink shell and waits for PX4 to acknowledge. The logger module releases its file handles, making files deletable via MAVFTP.
- class petal_app_manager.proxies.external.MavLinkFTPProxy(mavlink_proxy: MavLinkExternalProxy)[source]
Bases:
BaseProxyThreaded MAVLink FTP driver using pymavlink.
- async clear_error_logs(remote_path: str, connection_timeout: float = 3.0) Dict[str, Any][source]
Clear
fail_*.logerror logs under remote_path from the vehicle.Uses MAVFTP only for listing, then deletes each file via the NuttX shell
rmcommand (same approach asdelete_all_logs).Returns a summary dict
{total, deleted, failed}.
- async delete_all_logs(base: str = None, connection_timeout: float = 3.0, progress_callback: Callable[[int, int], Any] | None = None) Dict[str, Any][source]
Recursively delete every
.ulgfile under base on the vehicle.Uses MAVFTP only for listing directories, then deletes each file via the NuttX shell
rmcommand (MAVLink SERIAL_CONTROL). This bypasses PX4’s MAVFTPFileProtected(code 9) restriction which blockscmd_rmon all log files.Stops the PX4 logger before deletion and restarts it afterwards.
- Parameters:
base – Root directory to scan for .ulg files.
connection_timeout – Seconds to wait for a MAVLink connection.
progress_callback – Optional
callback(current_index, total)invoked after each file deletion attempt. May be a regular function or anasynccoroutine.
Returns a summary dict with deleted/failed counts.
- async delete_file(remote_path: str, connection_timeout: float = 3.0, stop_logger: bool = True) None[source]
Delete a single file at remote_path on the vehicle.
Uses the NuttX shell
rmcommand via MAVLink SERIAL_CONTROL, which bypasses PX4’s MAVFTPFileProtectedrestriction.- Parameters:
remote_path (str) – The path on the vehicle to delete.
connection_timeout (float) – Maximum time to wait for MAVLink connection.
stop_logger (bool) – If True, stop PX4 logging before deletion and restart it afterwards. Pass
Falsewhen the caller has already stopped the logger (e.g. bulk-delete loop).
- async detect_error_logs(remote_path: str, connection_timeout: float = 3.0) Dict[str, Any][source]
Detect
fail_*.logerror log files under remote_path on the vehicle.Lists files via MAVFTP without deleting anything.
Returns a summary dict
{total, files}where files is a list of{path, size_bytes}dicts.
- async download_ulog(remote_path: str, local_path: Path, completed_event: Event, on_progress: Callable[[float], Awaitable[None]] | None = None, cancel_event: Event | None = None, connection_timeout: float = 3.0, n_attempts: int = 3) Path[source]
Fetch remote_path from the vehicle into local_path.
Returns the Path actually written on success or None if cancelled.
- async list_directory(base: str = None, connection_timeout: float = 3.0) List[str][source]
List all files and directories under base on the vehicle.
- class petal_app_manager.proxies.external.ULogInfo(*, index: int, remote_path: str, size_bytes: int, utc: int)[source]
Bases:
BaseModelMetadata for a ULog that resides on the PX4 SD-card.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Local Database Proxy
LocalDBProxy
Provides access to the local DynamoDB instance through the controller dashboard API
Handles machine ID retrieval and authentication for requests
Abstracts the HTTP communication details away from petals
Provides async CRUD operations for DynamoDB tables
This proxy allows petals to interact with DynamoDB without worrying about the underlying HTTP communication details.
- class petal_app_manager.proxies.localdb.LocalDBProxy(get_data_url: str, scan_data_url: str, update_data_url: str, set_data_url: str, host: str, port: int, debug: bool = False)[source]
Bases:
BaseProxyProxy for communicating with a local DynamoDB instance through a custom API.
- async delete_item(table_name: str, filter_key: str, filter_value: str) Dict[str, Any][source]
Soft delete an item from DynamoDB by setting deleted=True.
- Parameters:
table_name – The DynamoDB table name
filter_key – Name of the key to filter on (usually ‘id’)
filter_value – Value of the key to delete
- Returns:
Response from the update operation
- async get_item(table_name: str, partition_key: str, partition_value: str) Dict[str, Any][source]
Retrieve a single item from DynamoDB using its partition key. Only returns items that are not soft-deleted (deleted != True) and belong to this machine.
- Parameters:
table_name – The DynamoDB table name
partition_key – Name of the partition key (usually ‘id’)
partition_value – Value of the partition key to look up
- Returns:
The item as a dictionary if found, not deleted, and belongs to this machine, or an error dictionary
- async scan_items(table_name: str, filters: List[Dict[str, str]] | None = None) List[Dict[str, Any]][source]
Scan a DynamoDB table with optional filters. Only returns items that are not soft-deleted (deleted != True) and belong to this machine.
- Parameters:
table_name – The DynamoDB table name
filters – List of filter dictionaries, each with ‘filter_key_name’ and ‘filter_key_value’
- Returns:
List of matching items that are not deleted and belong to this machine
- async set_item(table_name: str, filter_key: str, filter_value: str, data: Dict[str, Any]) Dict[str, Any][source]
Puts an item in DynamoDB. Automatically adds robot_instance_id to ensure item belongs to this machine.
- Parameters:
table_name – The DynamoDB table name
filter_key – Name of the key to filter on (usually ‘id’)
filter_value – Value of the key to update
data – The complete item data to update or insert
- Returns:
Response from the set operation
- async update_item(table_name: str, filter_key: str, filter_value: str, data: Dict[str, Any]) Dict[str, Any][source]
Update or insert an item in DynamoDB. Automatically adds robot_instance_id to ensure item belongs to this machine.
- Parameters:
table_name – The DynamoDB table name
filter_key – Name of the key to filter on (usually ‘id’)
filter_value – Value of the key to update
data – The complete item data to update or insert
- Returns:
Response from the update operation
Cloud Proxy
CloudDBProxy
Provides access to the cloud DynamoDB instance through authenticated API calls
Handles authentication token retrieval and management with caching
Abstracts the HTTP communication details away from petals
Provides async CRUD operations for DynamoDB tables in the cloud
This proxy allows petals to interact with cloud DynamoDB without worrying about the underlying authentication and HTTP communication details.
- class petal_app_manager.proxies.cloud.CloudDBProxy(access_token_url: str, endpoint: str, session_token_url: str = None, s3_bucket_name: str = None, get_data_url: str = '/drone/onBoard/config/getData', scan_data_url: str = '/drone/onBoard/config/scanData', update_data_url: str = '/drone/onBoard/config/updateData', set_data_url: str = '/drone/onBoard/config/setData', debug: bool = False, request_timeout: int = 30)[source]
Bases:
BaseProxyProxy for communicating with a cloud DynamoDB instance through authenticated API calls.
- async delete_item(table_name: str, filter_key: str, filter_value: str) Dict[str, Any][source]
Soft delete an item from DynamoDB by setting deleted=True.
- Parameters:
table_name – The DynamoDB table name
filter_key – Name of the key to filter on (usually ‘id’)
filter_value – Value of the key to delete
- Returns:
Response from the update operation
- async get_item(table_name: str, partition_key: str, partition_value: str) Dict[str, Any][source]
Retrieve a single item from DynamoDB using its partition key. Only returns items that are not soft-deleted (deleted != True) and belong to this machine.
- Parameters:
table_name – The DynamoDB table name
partition_key – Name of the partition key (usually ‘id’)
partition_value – Value of the partition key to look up
- Returns:
The item as a dictionary if found, not deleted, and belongs to this machine, or an error dictionary
- async scan_items(table_name: str, filters: List[Dict[str, str]] | None = None) Dict[str, Any][source]
Scan a DynamoDB table with optional filters. Only returns items that are not soft-deleted (deleted != True) and belong to this machine.
- Parameters:
table_name – The DynamoDB table name
filters – List of filter dictionaries, each with ‘filter_key_name’ and ‘filter_key_value’
- Returns:
Dictionary containing list of matching items that are not deleted and belong to this machine
- async set_item(table_name: str, filter_key: str, filter_value: str, data: Dict[str, Any]) Dict[str, Any][source]
Puts an item in DynamoDB. Automatically adds robot_instance_id to ensure item belongs to this machine.
- Parameters:
table_name – The DynamoDB table name
filter_key – Name of the key to filter on (usually ‘id’)
filter_value – Value of the key to update
data – The complete item data to update or insert
- Returns:
Response from the set operation
- async update_item(table_name: str, filter_key: str, filter_value: str, data: Dict[str, Any]) Dict[str, Any][source]
Update or insert an item in DynamoDB. Automatically adds robot_instance_id to ensure item belongs to this machine.
- Parameters:
table_name – The DynamoDB table name
filter_key – Name of the key to filter on (usually ‘id’)
filter_value – Value of the key to update
data – The complete item data to update or insert
- Returns:
Response from the update operation
Bucket Proxy
- class petal_app_manager.proxies.bucket.S3BucketProxy(session_token_url: str, bucket_name: str, upload_prefix: str = 'flight_logs/', debug: bool = False, request_timeout: int = 30)[source]
Bases:
BaseProxyProxy for communicating with an S3 bucket for flight log storage. Supports upload, download, and listing of .ulg and .bag files.
- ALLOWED_EXTENSIONS = {'.bag', '.ulg'}
- async delete_file(s3_key: str) Dict[str, Any][source]
Delete a file from S3.
- Parameters:
s3_key – S3 key of the file to delete
- Returns:
Dictionary with deletion results
- async download_file(s3_key: str, local_path: Path) Dict[str, Any][source]
Download a file from S3.
- Parameters:
s3_key – S3 key of the file to download
local_path – Local path where to save the file
- Returns:
Dictionary with download results
- async head_object(s3_key: str) Dict[str, Any][source]
Check if an object exists in S3 and retrieve its metadata.
- Parameters:
s3_key – S3 key of the object to check
- Returns:
Dictionary with head object results
- async list_files(prefix: str | None = None, max_keys: int = 100) Dict[str, Any][source]
List files in the S3 bucket for the current machine.
- Parameters:
prefix – Optional additional prefix to filter by
max_keys – Maximum number of files to return (default 100, max 1000)
- Returns:
Dictionary with list of files
- async move_file(source_key: str, dest_key: str) Dict[str, Any][source]
Move (rename) a file within the S3 bucket.
- Parameters:
source_key – Current S3 key of the file
dest_key – New S3 key for the file
- Returns:
Dictionary with move results
- async upload_file(file_path: Path, custom_filename: str | None = None, custom_s3_key: str | None = None) Dict[str, Any][source]
Upload a flight log file to S3.
- Parameters:
file_path – Path to the local file to upload
custom_filename – Optional custom filename (defaults to original)
custom_s3_key – Optional custom S3 key (overrides default key generation)
- Returns:
Dictionary with upload results
MQTT Proxy
MQTTProxy
Provides access to AWS IoT MQTT broker through TypeScript client API calls
Handles callback server for receiving continuous message streams
Uses async callback dispatch for message processing
Abstracts MQTT communication details away from petals
Provides async pub/sub operations with callback-style message handling
This proxy allows petals to interact with MQTT without worrying about the underlying connection management and HTTP communication details.
- class petal_app_manager.proxies.mqtt.MQTTProxy(ts_client_host: str = 'localhost', ts_client_port: int = 3004, callback_host: str = 'localhost', callback_port: int = 3005, enable_callbacks: bool = True, debug: bool = False, request_timeout: int = 30, max_seen_message_ids: int = 1000, command_edge_topic: str = 'command/edge', response_topic: str = 'response', test_topic: str = 'command', command_web_topic: str = 'command/web', health_check_interval: float = 10.0)[source]
Bases:
BaseProxyProxy for communicating with AWS IoT MQTT through TypeScript client API calls. Uses async callback dispatch for message processing.
The callback endpoint is exposed as a FastAPI router that should be registered with the main application. The callback_port should be set to the main app’s port (e.g., 8000) since the callback router is now part of the main app.
- Configuration note:
Set PETAL_CALLBACK_PORT to the main FastAPI app port (default: 8000) The callback URL will be: http://{callback_host}:{callback_port}/mqtt-callback/callback
- property organization_id: str | None
Organization ID property for backward compatibility. Fetches organization_id on-demand from OrganizationManager.
- Returns:
Organization ID if available, None otherwise
- async publish_message(payload: Dict[str, Any], qos: int = 1) bool[source]
Publish a message to an MQTT topic via TypeScript client to ‘command/web’ topic. :param payload: Message payload as a dictionary :param qos: Quality of Service level (0, 1, or 2)
- Returns:
True if published successfully, False otherwise
- register_handler(handler: Callable[[str, Dict[str, Any]], Awaitable[None]], cpu_heavy: bool = False) str[source]
Register an async handler to the ‘command/edge’ topic.
- Parameters:
handler – Async callback function to handle messages
cpu_heavy – If True, the handler’s CPU-bound work will be offloaded to a thread-pool executor managed by the proxy.
- Returns:
Subscription ID string, or None if registration failed
- Raises:
TypeError – If handler is not an async function
- class petal_app_manager.proxies.mqtt.MessageCallback(*, topic: str, payload: Dict[str, Any], timestamp: str | None = None, qos: int | None = None)[source]
Bases:
BaseModelModel for incoming MQTT messages via callback
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].