Using Proxies in Petals
Proxies provide a standardized interface for petals to access backend services like databases, message queues, cloud services, and external protocols. This guide explains how to use each available proxy type in your petal development.
Overview
Proxies abstract complex backend integrations and provide:
Consistent API: Uniform interface across different services
Dependency Management: Automatic service availability checking
Error Handling: Standardized error responses and retry logic
Configuration: Centralized service configuration
Health Monitoring: Built-in health checks and status reporting
Available Proxy Types
Proxy Type |
Use Cases |
|---|---|
RedisProxy |
Caching, pub/sub messaging, session storage, real-time data |
MavlinkExternalProxy |
Drone communication, telemetry, command/control, MAVLink protocol |
CloudDBProxy |
Cloud database operations, persistent storage, data synchronization |
LocalDBProxy |
Local database operations, offline storage, edge computing |
S3BucketProxy |
File storage, log uploads, data archiving, content delivery |
MQTTProxy |
IoT messaging, device communication, event streaming |
Accessing Proxies in Your Petal
Basic Pattern:
from petal_app_manager.plugins.base import Petal
from petal_app_manager.proxies.redis import RedisProxy
class YourPetal(Petal):
def get_required_proxies(self) -> List[str]:
return ["redis", "ext_mavlink"] # Declare dependencies
async def your_method(self):
# Get proxy instance
redis_proxy: RedisProxy = self.get_proxy("redis")
# Use proxy methods
await redis_proxy.set("key", "value")
result = await redis_proxy.get("key")
RedisProxy
Purpose: High-performance caching, pub/sub messaging, session management
Common Use Cases:
Caching telemetry data
Real-time message passing between petals
Session and state management
Temporary data storage
Scanning keys by pattern for job management
Basic Usage:
from petal_app_manager.proxies.redis import RedisProxy
class YourPetal(Petal):
def get_required_proxies(self) -> List[str]:
return ["redis"]
async def cache_telemetry(self, data: dict):
redis_proxy: RedisProxy = self.get_proxy("redis")
# Store data with expiration
await redis_proxy.set("telemetry:latest", json.dumps(data), ex=60)
# Retrieve data
cached = await redis_proxy.get("telemetry:latest")
Methods:
# Basic key-value operations
await redis_proxy.set(key, value, ex=expiration_seconds)
await redis_proxy.get(key)
await redis_proxy.delete(key)
await redis_proxy.exists(key)
# Hash operations
await redis_proxy.hset(hash_key, field, value)
await redis_proxy.hget(hash_key, field)
await redis_proxy.hgetall(hash_key)
# List operations
await redis_proxy.lpush(list_key, value)
await redis_proxy.rpush(list_key, value)
await redis_proxy.lrange(list_key, start, end)
# Pub/Sub messaging
await redis_proxy.publish(channel, message)
await redis_proxy.subscribe(channel, callback)
# Key scanning (useful for job management)
await redis_proxy.scan_keys(pattern="job:*", count=100)
Scan Keys Example (Job Management):
from petal_app_manager.proxies.redis import RedisProxy
class JobManagerPetal(Petal):
def get_required_proxies(self) -> List[str]:
return ["redis"]
async def list_pending_jobs(self):
"""Find all pending jobs using key pattern scanning."""
redis_proxy: RedisProxy = self.get_proxy("redis")
# Scan for all job keys matching pattern
job_keys = await redis_proxy.scan_keys(
pattern="job:pending:*",
count=100 # Keys per scan iteration
)
jobs = []
for key in job_keys:
job_data = await redis_proxy.get(key)
if job_data:
jobs.append(json.loads(job_data))
return jobs
async def cleanup_expired_jobs(self, max_age_hours: int = 24):
"""Clean up old job entries."""
redis_proxy: RedisProxy = self.get_proxy("redis")
# Find all completed job keys
completed_keys = await redis_proxy.scan_keys(pattern="job:completed:*")
deleted = 0
for key in completed_keys:
job_data = await redis_proxy.get(key)
if job_data:
job = json.loads(job_data)
# Check if job is older than max_age
if self._is_expired(job.get("completed_at"), max_age_hours):
await redis_proxy.delete(key)
deleted += 1
self.logger.info(f"Cleaned up {deleted} expired jobs")
return {"deleted": deleted}
async def get_job_statistics(self):
"""Get counts of jobs by status."""
redis_proxy: RedisProxy = self.get_proxy("redis")
stats = {}
for status in ["pending", "processing", "completed", "failed"]:
keys = await redis_proxy.scan_keys(pattern=f"job:{status}:*")
stats[status] = len(keys)
return stats
Caching Pattern Example:
class TelemetryCachePetal(Petal):
def get_required_proxies(self) -> List[str]:
return ["redis"]
async def cache_flight_data(self, flight_id: str, data: dict):
"""Cache flight telemetry with TTL."""
redis_proxy: RedisProxy = self.get_proxy("redis")
key = f"flight:{flight_id}:telemetry"
await redis_proxy.set(key, json.dumps(data), ex=3600) # 1 hour TTL
async def get_cached_flight_data(self, flight_id: str) -> Optional[dict]:
"""Retrieve cached flight data."""
redis_proxy: RedisProxy = self.get_proxy("redis")
key = f"flight:{flight_id}:telemetry"
cached = await redis_proxy.get(key)
if cached:
return json.loads(cached)
return None
MavlinkExternalProxy
Purpose: Communication with drones and autopilots using MAVLink protocol
Common Use Cases:
Sending commands to drone
Receiving telemetry data
Mission management
Parameter configuration (including bulk operations over lossy links)
Real-time monitoring
Autopilot reboot and system control
Basic Usage:
from petal_app_manager.proxies.external import MavLinkExternalProxy
class YourPetal(Petal):
def get_required_proxies(self) -> List[str]:
return ["ext_mavlink"]
async def send_command(self):
mavlink_proxy: MavLinkExternalProxy = self.get_proxy("ext_mavlink")
# Reboot the autopilot
result = await mavlink_proxy.reboot_autopilot()
if result.success:
print("Autopilot rebooting...")
Methods:
# Reboot autopilot (PX4/ArduPilot)
await mavlink_proxy.reboot_autopilot(
reboot_onboard_computer=False, # Also reboot companion computer
timeout=3.0 # Timeout for ACK response
)
# Returns: RebootAutopilotResponse with success, status_code, reason
# Bulk parameter setting over lossy links
await mavlink_proxy.set_params_bulk_lossy(
params_to_set={
"NAV_ACC_RAD": 2.0,
"MPC_XY_VEL_MAX": (12.0, "REAL32"), # With explicit type
"COM_DISARM_LAND": {"value": 2, "type": "INT32"}
},
timeout_total=8.0,
max_retries=3,
max_in_flight=8
)
# Returns: Dict of confirmed parameters
# Bulk parameter retrieval over lossy links
await mavlink_proxy.get_params_bulk_lossy(
names=["NAV_ACC_RAD", "MPC_XY_VEL_MAX", "COM_DISARM_LAND"],
timeout_total=6.0,
max_retries=3,
max_in_flight=10
)
# Returns: Dict with name, value, raw, type, count, index for each param
Reboot Autopilot Example:
from petal_app_manager.proxies.external import MavLinkExternalProxy
class SystemControlPetal(Petal):
def get_required_proxies(self) -> List[str]:
return ["ext_mavlink"]
async def reboot_flight_controller(self, include_companion: bool = False):
"""Reboot the autopilot with proper error handling."""
mavlink_proxy: MavLinkExternalProxy = self.get_proxy("ext_mavlink")
try:
result = await mavlink_proxy.reboot_autopilot(
reboot_onboard_computer=include_companion,
timeout=3.0
)
if result.success:
self.logger.info("Autopilot reboot initiated successfully")
return {"status": "rebooting", "reason": result.reason}
else:
self.logger.warning(f"Reboot failed: {result.reason}")
return {"status": "failed", "reason": result.reason}
except RuntimeError as e:
self.logger.error(f"MAVLink not connected: {e}")
return {"status": "error", "reason": "MAVLink connection not established"}
Bulk Parameter Operations Example (Lossy Links):
class ParameterConfigPetal(Petal):
def get_required_proxies(self) -> List[str]:
return ["ext_mavlink"]
async def configure_flight_parameters(self):
"""Set multiple parameters efficiently over unreliable links."""
mavlink_proxy: MavLinkExternalProxy = self.get_proxy("ext_mavlink")
# Define parameters to set with optional type hints
params = {
# Simple value (type auto-detected)
"NAV_ACC_RAD": 2.0,
# Tuple format: (value, type_string)
"MPC_XY_VEL_MAX": (12.0, "REAL32"),
"MPC_Z_VEL_MAX_UP": (3.0, "REAL32"),
# Dict format with explicit type
"COM_DISARM_LAND": {"value": 2, "type": "INT32"},
"COM_ARM_WO_GPS": {"value": 1, "type": "INT32"}
}
try:
confirmed = await mavlink_proxy.set_params_bulk_lossy(
params_to_set=params,
timeout_total=8.0, # Total operation timeout
max_retries=3, # Retry count per parameter
max_in_flight=8, # Concurrent requests
resend_interval=0.8, # Time before resending unconfirmed
verify_ack_value=True # Verify echoed value matches
)
self.logger.info(f"Confirmed {len(confirmed)}/{len(params)} parameters")
# Check which parameters were NOT confirmed
failed = set(params.keys()) - set(confirmed.keys())
if failed:
self.logger.warning(f"Failed to confirm: {failed}")
return {"confirmed": list(confirmed.keys()), "failed": list(failed)}
except RuntimeError as e:
return {"error": str(e)}
async def read_flight_parameters(self, param_names: List[str]):
"""Read multiple parameters efficiently over unreliable links."""
mavlink_proxy: MavLinkExternalProxy = self.get_proxy("ext_mavlink")
try:
results = await mavlink_proxy.get_params_bulk_lossy(
names=param_names,
timeout_total=6.0,
max_retries=3,
max_in_flight=10,
resend_interval=0.7
)
# Results contain detailed info for each parameter
for name, info in results.items():
self.logger.debug(
f"{name}: value={info['value']}, "
f"type={info['type']}, index={info['index']}"
)
return results
except RuntimeError as e:
return {"error": str(e)}
CloudDBProxy
Purpose: Cloud database operations for persistent, scalable data storage
Common Use Cases:
Storing flight logs in cloud
Synchronizing configuration data
Multi-device data sharing
Analytics and reporting
Long-term data retention
Basic Usage:
from petal_app_manager.proxies.cloud import CloudDBProxy
class YourPetal(Petal):
def get_required_proxies(self) -> List[str]:
return ["cloud"]
async def store_flight_data(self, data: dict):
cloud_proxy: CloudDBProxy = self.get_proxy("cloud")
# TODO: Add CloudDBProxy methods and examples
# - Database operations
# - Table management
# - Query operations
# - Batch operations
# - Synchronization patterns
# - Authentication handling
Methods:
# TODO: Document CloudDBProxy methods:
# await cloud_proxy.scan_table(table_name, filters)
# await cloud_proxy.get_item(table_name, key)
# await cloud_proxy.put_item(table_name, item)
# await cloud_proxy.update_item(table_name, key, updates)
# await cloud_proxy.delete_item(table_name, key)
# await cloud_proxy.batch_write(table_name, items)
Example Patterns:
# TODO: Add practical cloud database usage examples:
# - Flight log storage
# - Configuration synchronization
# - User data management
# - Analytics data collection
# - Backup and recovery
LocalDBProxy
Purpose: Local database operations for offline-capable, edge computing scenarios
Common Use Cases:
Offline data storage
Edge computing applications
Local caching with persistence
Backup when cloud unavailable
Real-time local queries
Basic Usage:
from petal_app_manager.proxies.localdb import LocalDBProxy
class YourPetal(Petal):
def get_required_proxies(self) -> List[str]:
return ["db"]
async def store_local_data(self, data: dict):
db_proxy: LocalDBProxy = self.get_proxy("db")
# TODO: Add LocalDBProxy methods and examples
# - Local database operations
# - SQLite/embedded database usage
# - Offline synchronization
# - Local query patterns
# - Data migration
# - Backup strategies
Methods:
# TODO: Document LocalDBProxy methods:
# await db_proxy.execute_query(sql, params)
# await db_proxy.fetch_one(sql, params)
# await db_proxy.fetch_all(sql, params)
# await db_proxy.insert(table, data)
# await db_proxy.update(table, data, where)
# await db_proxy.delete(table, where)
Example Patterns:
# TODO: Add practical local database usage examples:
# - Offline flight data storage
# - Local configuration management
# - Edge analytics
# - Sync queue management
# - Local user sessions
S3BucketProxy
Purpose: File storage, uploads, downloads, and content management using AWS S3
Common Use Cases:
Upload flight logs and media
Store configuration files
Archive historical data
Content delivery
Backup storage
Moving/renaming files within buckets
Basic Usage:
from petal_app_manager.proxies.bucket import S3BucketProxy
class YourPetal(Petal):
def get_required_proxies(self) -> List[str]:
return ["bucket"]
async def upload_file(self, file_path: str):
bucket_proxy: S3BucketProxy = self.get_proxy("bucket")
# Upload with auto-generated S3 key
result = await bucket_proxy.upload_file(Path(file_path))
# Upload with custom S3 key
result = await bucket_proxy.upload_file(
Path(file_path),
custom_s3_key="custom/path/myfile.ulg"
)
Methods:
# Upload file with auto-generated key
await bucket_proxy.upload_file(
file_path=Path("/path/to/file.ulg"),
custom_filename="renamed.ulg" # Optional: rename file
)
# Upload file with custom S3 key (full control over path)
await bucket_proxy.upload_file(
file_path=Path("/path/to/file.ulg"),
custom_s3_key="flights/2026/01/07/flight_001.ulg"
)
# Move/rename a file within the bucket
await bucket_proxy.move_file(
source_key="uploads/temp/file.ulg",
dest_key="archive/2026/file.ulg"
)
# Delete a file
await bucket_proxy.delete_file(s3_key="path/to/file.ulg")
# List objects in bucket
await bucket_proxy.list_objects(prefix="flights/")
Upload with Custom S3 Key Example:
from pathlib import Path
from petal_app_manager.proxies.bucket import S3BucketProxy
class FlightLogPetal(Petal):
def get_required_proxies(self) -> List[str]:
return ["bucket"]
async def upload_flight_log(self, file_path: Path, flight_date: str, flight_id: str):
"""Upload flight log with organized S3 path structure."""
bucket_proxy: S3BucketProxy = self.get_proxy("bucket")
# Create organized S3 key: flights/YYYY/MM/DD/flight_id.ulg
custom_key = f"flights/{flight_date.replace('-', '/')}/{flight_id}.ulg"
result = await bucket_proxy.upload_file(
file_path=file_path,
custom_s3_key=custom_key
)
if result.get("success"):
self.logger.info(f"Uploaded to: {result['s3_key']}")
return {"status": "success", "url": result.get("url")}
else:
self.logger.error(f"Upload failed: {result.get('error')}")
return {"status": "error", "reason": result.get("error")}
Move/Rename Files Example:
class FileOrganizerPetal(Petal):
def get_required_proxies(self) -> List[str]:
return ["bucket"]
async def archive_processed_file(self, source_key: str):
"""Move processed file from uploads to archive."""
bucket_proxy: S3BucketProxy = self.get_proxy("bucket")
# Move from uploads/ to archive/
# e.g., "uploads/pending/file.ulg" -> "archive/2026/01/file.ulg"
from datetime import datetime
date_path = datetime.now().strftime("%Y/%m")
filename = source_key.split("/")[-1]
dest_key = f"archive/{date_path}/{filename}"
result = await bucket_proxy.move_file(
source_key=source_key,
dest_key=dest_key
)
if result.get("success"):
self.logger.info(f"Moved {source_key} -> {dest_key}")
return {"status": "archived", "new_path": dest_key}
else:
self.logger.error(f"Move failed: {result.get('error')}")
return {"status": "error", "reason": result.get("error")}
async def batch_organize_files(self, source_prefix: str, dest_prefix: str):
"""Move all files from one prefix to another."""
bucket_proxy: S3BucketProxy = self.get_proxy("bucket")
# List all files in source prefix
objects = await bucket_proxy.list_objects(prefix=source_prefix)
results = {"moved": [], "failed": []}
for obj in objects.get("objects", []):
source_key = obj["key"]
filename = source_key.split("/")[-1]
dest_key = f"{dest_prefix}/{filename}"
result = await bucket_proxy.move_file(source_key, dest_key)
if result.get("success"):
results["moved"].append(dest_key)
else:
results["failed"].append(source_key)
return results
MQTTProxy
Purpose: IoT messaging, device communication, and MQTT command handling for web/mobile client communication.
Common Use Cases:
Receiving commands from web/mobile clients (via
@mqtt_action)Sending command responses back to clients
Publishing status updates and telemetry to the
command/webtopicReal-time event streaming between edge device and cloud
Receiving Commands — the ``@mqtt_action`` Decorator
The recommended way to handle incoming MQTT commands is the @mqtt_action
decorator. You do not need to call register_handler() manually — the
framework does it automatically at startup.
See MQTT Command Handlers (@mqtt_action) in the Adding a New Petal guide for full
details on @mqtt_action and the cpu_heavy parameter.
from petal_app_manager.plugins.base import Petal
from petal_app_manager.plugins.decorators import http_action, mqtt_action
class YourPetal(Petal):
name = "petal-example"
def get_required_proxies(self) -> List[str]:
return ["mqtt"]
@mqtt_action(command="do_something")
async def _do_something(self, topic: str, message: dict):
"""Handles ``petal-example/do_something`` on ``command/edge``."""
msg_id = message.get("messageId", "unknown")
payload = message.get("payload", {})
# ... business logic ...
mqtt_proxy = self._proxies.get("mqtt")
await mqtt_proxy.send_command_response(
message_id=msg_id,
response_data={"status": "success"},
)
@mqtt_action(command="heavy_compute", cpu_heavy=True)
async def _heavy_compute(self, topic: str, message: dict):
"""CPU-intensive handler — offloaded to a thread-pool executor."""
result = expensive_calculation(message.get("payload", {}))
mqtt_proxy = self._proxies.get("mqtt")
await mqtt_proxy.send_command_response(
message_id=message.get("messageId", "unknown"),
response_data={"status": "success", "result": result},
)
Publishing Messages — Public API Methods:
mqtt_proxy = self._proxies.get("mqtt")
# Publish to the ``command/web`` topic (for web/mobile clients)
await mqtt_proxy.publish_message(
payload={"key": "value", "status": "update"},
qos=1, # QoS 0, 1, or 2
)
# Send a direct response to a specific command (correlates via messageId)
await mqtt_proxy.send_command_response(
message_id="original-msg-id",
response_data={"status": "success", "data": {...}},
)
Method Reference:
Method |
Description |
|---|---|
|
Publish a message to the |
|
Send a correlated response to an incoming command (published to the
response topic with the original |
|
Register an async handler on the |
|
Unregister a previously registered handler by subscription ID. |
|
Return proxy health status including connection state. |
The ``cpu_heavy`` Flag:
Both @mqtt_action(cpu_heavy=True) and register_handler(handler, cpu_heavy=True)
cause the handler’s execution to be offloaded to a thread-pool executor.
This prevents CPU-intensive work (e.g. NumPy, image processing, large JSON
serialization) from blocking the asyncio event loop that services all other
MQTT, HTTP, and WebSocket traffic.
The default is cpu_heavy=False — use it for typical I/O-bound handlers
that await proxy calls, database queries, or network requests.
Example — Status Broadcasting:
class TelemetryPetal(Petal):
name = "petal-telemetry"
def get_required_proxies(self) -> List[str]:
return ["mqtt", "ext_mavlink"]
async def broadcast_position(self, lat: float, lon: float, alt: float):
"""Publish a position update to connected web clients."""
mqtt_proxy = self._proxies.get("mqtt")
await mqtt_proxy.publish_message(
payload={
"type": "position_update",
"latitude": lat,
"longitude": lon,
"altitude": alt,
}
)
Error Handling and Best Practices
Proxy Availability:
class YourPetal(Petal):
async def safe_proxy_usage(self):
try:
redis_proxy = self.get_proxy("redis")
if redis_proxy is None:
logger.warning("Redis proxy not available")
return
result = await redis_proxy.get("key")
except Exception as e:
logger.error(f"Proxy operation failed: {e}")
Dependency Declaration:
def get_required_proxies(self) -> List[str]:
# Always declare required proxies
return ["redis", "ext_mavlink"]
def get_optional_proxies(self) -> List[str]:
# Declare optional proxies for graceful degradation
return ["cloud", "bucket"]
Health Checks:
async def check_proxy_health(self):
for proxy_name in self.get_required_proxies():
proxy = self.get_proxy(proxy_name)
if proxy and hasattr(proxy, 'health_check'):
health = await proxy.health_check()
logger.info(f"{proxy_name} health: {health}")
Configuration Management:
# Proxies automatically use configuration from .env file
# PETAL_REDIS_HOST, PETAL_REDIS_PORT, PETAL_MAVLINK_ENDPOINT, etc.
# No manual configuration needed in petal code
Next Steps
Review the Adding a New Petal Guide guide for complete petal development
Check Contribution Guidelines for release and versioning
Explore existing petals for real-world proxy usage examples
Use the Admin Dashboard to monitor proxy health and status
Note
Documentation Status: The MavlinkExternalProxy, S3BucketProxy, RedisProxy, and MQTTProxy sections include comprehensive examples. CloudDBProxy and LocalDBProxy sections contain placeholders that will be populated with detailed examples in future updates.