Using Proxies in Petals

Proxies provide a standardized interface for petals to access backend services like databases, message queues, cloud services, and external protocols. This guide explains how to use each available proxy type in your petal development.

Overview

Proxies abstract complex backend integrations and provide:

  • Consistent API: Uniform interface across different services

  • Dependency Management: Automatic service availability checking

  • Error Handling: Standardized error responses and retry logic

  • Configuration: Centralized service configuration

  • Health Monitoring: Built-in health checks and status reporting

Available Proxy Types

Proxy Types and Use Cases

Proxy Type

Use Cases

RedisProxy

Caching, pub/sub messaging, session storage, real-time data

MavlinkExternalProxy

Drone communication, telemetry, command/control, MAVLink protocol

CloudDBProxy

Cloud database operations, persistent storage, data synchronization

LocalDBProxy

Local database operations, offline storage, edge computing

S3BucketProxy

File storage, log uploads, data archiving, content delivery

MQTTProxy

IoT messaging, device communication, event streaming

Accessing Proxies in Your Petal

Basic Pattern:

from petal_app_manager.plugins.base import Petal
from petal_app_manager.proxies.redis import RedisProxy

class YourPetal(Petal):
    def get_required_proxies(self) -> List[str]:
        return ["redis", "ext_mavlink"]  # Declare dependencies

    async def your_method(self):
        # Get proxy instance
        redis_proxy: RedisProxy = self.get_proxy("redis")

        # Use proxy methods
        await redis_proxy.set("key", "value")
        result = await redis_proxy.get("key")

RedisProxy

Purpose: High-performance caching, pub/sub messaging, session management

Common Use Cases:

  • Caching telemetry data

  • Real-time message passing between petals

  • Session and state management

  • Temporary data storage

  • Scanning keys by pattern for job management

Basic Usage:

from petal_app_manager.proxies.redis import RedisProxy

class YourPetal(Petal):
    def get_required_proxies(self) -> List[str]:
        return ["redis"]

    async def cache_telemetry(self, data: dict):
        redis_proxy: RedisProxy = self.get_proxy("redis")

        # Store data with expiration
        await redis_proxy.set("telemetry:latest", json.dumps(data), ex=60)

        # Retrieve data
        cached = await redis_proxy.get("telemetry:latest")

Methods:

# Basic key-value operations
await redis_proxy.set(key, value, ex=expiration_seconds)
await redis_proxy.get(key)
await redis_proxy.delete(key)
await redis_proxy.exists(key)

# Hash operations
await redis_proxy.hset(hash_key, field, value)
await redis_proxy.hget(hash_key, field)
await redis_proxy.hgetall(hash_key)

# List operations
await redis_proxy.lpush(list_key, value)
await redis_proxy.rpush(list_key, value)
await redis_proxy.lrange(list_key, start, end)

# Pub/Sub messaging
await redis_proxy.publish(channel, message)
await redis_proxy.subscribe(channel, callback)

# Key scanning (useful for job management)
await redis_proxy.scan_keys(pattern="job:*", count=100)

Scan Keys Example (Job Management):

from petal_app_manager.proxies.redis import RedisProxy

class JobManagerPetal(Petal):
    def get_required_proxies(self) -> List[str]:
        return ["redis"]

    async def list_pending_jobs(self):
        """Find all pending jobs using key pattern scanning."""
        redis_proxy: RedisProxy = self.get_proxy("redis")

        # Scan for all job keys matching pattern
        job_keys = await redis_proxy.scan_keys(
            pattern="job:pending:*",
            count=100  # Keys per scan iteration
        )

        jobs = []
        for key in job_keys:
            job_data = await redis_proxy.get(key)
            if job_data:
                jobs.append(json.loads(job_data))

        return jobs

    async def cleanup_expired_jobs(self, max_age_hours: int = 24):
        """Clean up old job entries."""
        redis_proxy: RedisProxy = self.get_proxy("redis")

        # Find all completed job keys
        completed_keys = await redis_proxy.scan_keys(pattern="job:completed:*")

        deleted = 0
        for key in completed_keys:
            job_data = await redis_proxy.get(key)
            if job_data:
                job = json.loads(job_data)
                # Check if job is older than max_age
                if self._is_expired(job.get("completed_at"), max_age_hours):
                    await redis_proxy.delete(key)
                    deleted += 1

        self.logger.info(f"Cleaned up {deleted} expired jobs")
        return {"deleted": deleted}

    async def get_job_statistics(self):
        """Get counts of jobs by status."""
        redis_proxy: RedisProxy = self.get_proxy("redis")

        stats = {}
        for status in ["pending", "processing", "completed", "failed"]:
            keys = await redis_proxy.scan_keys(pattern=f"job:{status}:*")
            stats[status] = len(keys)

        return stats

Caching Pattern Example:

class TelemetryCachePetal(Petal):
    def get_required_proxies(self) -> List[str]:
        return ["redis"]

    async def cache_flight_data(self, flight_id: str, data: dict):
        """Cache flight telemetry with TTL."""
        redis_proxy: RedisProxy = self.get_proxy("redis")

        key = f"flight:{flight_id}:telemetry"
        await redis_proxy.set(key, json.dumps(data), ex=3600)  # 1 hour TTL

    async def get_cached_flight_data(self, flight_id: str) -> Optional[dict]:
        """Retrieve cached flight data."""
        redis_proxy: RedisProxy = self.get_proxy("redis")

        key = f"flight:{flight_id}:telemetry"
        cached = await redis_proxy.get(key)

        if cached:
            return json.loads(cached)
        return None

MavlinkExternalProxy

Purpose: Communication with drones and autopilots using MAVLink protocol

Common Use Cases:

  • Sending commands to drone

  • Receiving telemetry data

  • Mission management

  • Parameter configuration (including bulk operations over lossy links)

  • Real-time monitoring

  • Autopilot reboot and system control

Basic Usage:

from petal_app_manager.proxies.external import MavLinkExternalProxy

class YourPetal(Petal):
    def get_required_proxies(self) -> List[str]:
        return ["ext_mavlink"]

    async def send_command(self):
        mavlink_proxy: MavLinkExternalProxy = self.get_proxy("ext_mavlink")

        # Reboot the autopilot
        result = await mavlink_proxy.reboot_autopilot()
        if result.success:
            print("Autopilot rebooting...")

Methods:

# Reboot autopilot (PX4/ArduPilot)
await mavlink_proxy.reboot_autopilot(
    reboot_onboard_computer=False,  # Also reboot companion computer
    timeout=3.0                      # Timeout for ACK response
)
# Returns: RebootAutopilotResponse with success, status_code, reason

# Bulk parameter setting over lossy links
await mavlink_proxy.set_params_bulk_lossy(
    params_to_set={
        "NAV_ACC_RAD": 2.0,
        "MPC_XY_VEL_MAX": (12.0, "REAL32"),  # With explicit type
        "COM_DISARM_LAND": {"value": 2, "type": "INT32"}
    },
    timeout_total=8.0,
    max_retries=3,
    max_in_flight=8
)
# Returns: Dict of confirmed parameters

# Bulk parameter retrieval over lossy links
await mavlink_proxy.get_params_bulk_lossy(
    names=["NAV_ACC_RAD", "MPC_XY_VEL_MAX", "COM_DISARM_LAND"],
    timeout_total=6.0,
    max_retries=3,
    max_in_flight=10
)
# Returns: Dict with name, value, raw, type, count, index for each param

Reboot Autopilot Example:

from petal_app_manager.proxies.external import MavLinkExternalProxy

class SystemControlPetal(Petal):
    def get_required_proxies(self) -> List[str]:
        return ["ext_mavlink"]

    async def reboot_flight_controller(self, include_companion: bool = False):
        """Reboot the autopilot with proper error handling."""
        mavlink_proxy: MavLinkExternalProxy = self.get_proxy("ext_mavlink")

        try:
            result = await mavlink_proxy.reboot_autopilot(
                reboot_onboard_computer=include_companion,
                timeout=3.0
            )

            if result.success:
                self.logger.info("Autopilot reboot initiated successfully")
                return {"status": "rebooting", "reason": result.reason}
            else:
                self.logger.warning(f"Reboot failed: {result.reason}")
                return {"status": "failed", "reason": result.reason}

        except RuntimeError as e:
            self.logger.error(f"MAVLink not connected: {e}")
            return {"status": "error", "reason": "MAVLink connection not established"}

Bulk Parameter Operations Example (Lossy Links):

class ParameterConfigPetal(Petal):
    def get_required_proxies(self) -> List[str]:
        return ["ext_mavlink"]

    async def configure_flight_parameters(self):
        """Set multiple parameters efficiently over unreliable links."""
        mavlink_proxy: MavLinkExternalProxy = self.get_proxy("ext_mavlink")

        # Define parameters to set with optional type hints
        params = {
            # Simple value (type auto-detected)
            "NAV_ACC_RAD": 2.0,

            # Tuple format: (value, type_string)
            "MPC_XY_VEL_MAX": (12.0, "REAL32"),
            "MPC_Z_VEL_MAX_UP": (3.0, "REAL32"),

            # Dict format with explicit type
            "COM_DISARM_LAND": {"value": 2, "type": "INT32"},
            "COM_ARM_WO_GPS": {"value": 1, "type": "INT32"}
        }

        try:
            confirmed = await mavlink_proxy.set_params_bulk_lossy(
                params_to_set=params,
                timeout_total=8.0,    # Total operation timeout
                max_retries=3,        # Retry count per parameter
                max_in_flight=8,      # Concurrent requests
                resend_interval=0.8,  # Time before resending unconfirmed
                verify_ack_value=True # Verify echoed value matches
            )

            self.logger.info(f"Confirmed {len(confirmed)}/{len(params)} parameters")

            # Check which parameters were NOT confirmed
            failed = set(params.keys()) - set(confirmed.keys())
            if failed:
                self.logger.warning(f"Failed to confirm: {failed}")

            return {"confirmed": list(confirmed.keys()), "failed": list(failed)}

        except RuntimeError as e:
            return {"error": str(e)}

    async def read_flight_parameters(self, param_names: List[str]):
        """Read multiple parameters efficiently over unreliable links."""
        mavlink_proxy: MavLinkExternalProxy = self.get_proxy("ext_mavlink")

        try:
            results = await mavlink_proxy.get_params_bulk_lossy(
                names=param_names,
                timeout_total=6.0,
                max_retries=3,
                max_in_flight=10,
                resend_interval=0.7
            )

            # Results contain detailed info for each parameter
            for name, info in results.items():
                self.logger.debug(
                    f"{name}: value={info['value']}, "
                    f"type={info['type']}, index={info['index']}"
                )

            return results

        except RuntimeError as e:
            return {"error": str(e)}

CloudDBProxy

Purpose: Cloud database operations for persistent, scalable data storage

Common Use Cases:

  • Storing flight logs in cloud

  • Synchronizing configuration data

  • Multi-device data sharing

  • Analytics and reporting

  • Long-term data retention

Basic Usage:

from petal_app_manager.proxies.cloud import CloudDBProxy

class YourPetal(Petal):
    def get_required_proxies(self) -> List[str]:
        return ["cloud"]

    async def store_flight_data(self, data: dict):
        cloud_proxy: CloudDBProxy = self.get_proxy("cloud")

        # TODO: Add CloudDBProxy methods and examples
        # - Database operations
        # - Table management
        # - Query operations
        # - Batch operations
        # - Synchronization patterns
        # - Authentication handling

Methods:

# TODO: Document CloudDBProxy methods:
# await cloud_proxy.scan_table(table_name, filters)
# await cloud_proxy.get_item(table_name, key)
# await cloud_proxy.put_item(table_name, item)
# await cloud_proxy.update_item(table_name, key, updates)
# await cloud_proxy.delete_item(table_name, key)
# await cloud_proxy.batch_write(table_name, items)

Example Patterns:

# TODO: Add practical cloud database usage examples:
# - Flight log storage
# - Configuration synchronization
# - User data management
# - Analytics data collection
# - Backup and recovery

LocalDBProxy

Purpose: Local database operations for offline-capable, edge computing scenarios

Common Use Cases:

  • Offline data storage

  • Edge computing applications

  • Local caching with persistence

  • Backup when cloud unavailable

  • Real-time local queries

Basic Usage:

from petal_app_manager.proxies.localdb import LocalDBProxy

class YourPetal(Petal):
    def get_required_proxies(self) -> List[str]:
        return ["db"]

    async def store_local_data(self, data: dict):
        db_proxy: LocalDBProxy = self.get_proxy("db")

        # TODO: Add LocalDBProxy methods and examples
        # - Local database operations
        # - SQLite/embedded database usage
        # - Offline synchronization
        # - Local query patterns
        # - Data migration
        # - Backup strategies

Methods:

# TODO: Document LocalDBProxy methods:
# await db_proxy.execute_query(sql, params)
# await db_proxy.fetch_one(sql, params)
# await db_proxy.fetch_all(sql, params)
# await db_proxy.insert(table, data)
# await db_proxy.update(table, data, where)
# await db_proxy.delete(table, where)

Example Patterns:

# TODO: Add practical local database usage examples:
# - Offline flight data storage
# - Local configuration management
# - Edge analytics
# - Sync queue management
# - Local user sessions

S3BucketProxy

Purpose: File storage, uploads, downloads, and content management using AWS S3

Common Use Cases:

  • Upload flight logs and media

  • Store configuration files

  • Archive historical data

  • Content delivery

  • Backup storage

  • Moving/renaming files within buckets

Basic Usage:

from petal_app_manager.proxies.bucket import S3BucketProxy

class YourPetal(Petal):
    def get_required_proxies(self) -> List[str]:
        return ["bucket"]

    async def upload_file(self, file_path: str):
        bucket_proxy: S3BucketProxy = self.get_proxy("bucket")

        # Upload with auto-generated S3 key
        result = await bucket_proxy.upload_file(Path(file_path))

        # Upload with custom S3 key
        result = await bucket_proxy.upload_file(
            Path(file_path),
            custom_s3_key="custom/path/myfile.ulg"
        )

Methods:

# Upload file with auto-generated key
await bucket_proxy.upload_file(
    file_path=Path("/path/to/file.ulg"),
    custom_filename="renamed.ulg"  # Optional: rename file
)

# Upload file with custom S3 key (full control over path)
await bucket_proxy.upload_file(
    file_path=Path("/path/to/file.ulg"),
    custom_s3_key="flights/2026/01/07/flight_001.ulg"
)

# Move/rename a file within the bucket
await bucket_proxy.move_file(
    source_key="uploads/temp/file.ulg",
    dest_key="archive/2026/file.ulg"
)

# Delete a file
await bucket_proxy.delete_file(s3_key="path/to/file.ulg")

# List objects in bucket
await bucket_proxy.list_objects(prefix="flights/")

Upload with Custom S3 Key Example:

from pathlib import Path
from petal_app_manager.proxies.bucket import S3BucketProxy

class FlightLogPetal(Petal):
    def get_required_proxies(self) -> List[str]:
        return ["bucket"]

    async def upload_flight_log(self, file_path: Path, flight_date: str, flight_id: str):
        """Upload flight log with organized S3 path structure."""
        bucket_proxy: S3BucketProxy = self.get_proxy("bucket")

        # Create organized S3 key: flights/YYYY/MM/DD/flight_id.ulg
        custom_key = f"flights/{flight_date.replace('-', '/')}/{flight_id}.ulg"

        result = await bucket_proxy.upload_file(
            file_path=file_path,
            custom_s3_key=custom_key
        )

        if result.get("success"):
            self.logger.info(f"Uploaded to: {result['s3_key']}")
            return {"status": "success", "url": result.get("url")}
        else:
            self.logger.error(f"Upload failed: {result.get('error')}")
            return {"status": "error", "reason": result.get("error")}

Move/Rename Files Example:

class FileOrganizerPetal(Petal):
    def get_required_proxies(self) -> List[str]:
        return ["bucket"]

    async def archive_processed_file(self, source_key: str):
        """Move processed file from uploads to archive."""
        bucket_proxy: S3BucketProxy = self.get_proxy("bucket")

        # Move from uploads/ to archive/
        # e.g., "uploads/pending/file.ulg" -> "archive/2026/01/file.ulg"
        from datetime import datetime
        date_path = datetime.now().strftime("%Y/%m")
        filename = source_key.split("/")[-1]
        dest_key = f"archive/{date_path}/{filename}"

        result = await bucket_proxy.move_file(
            source_key=source_key,
            dest_key=dest_key
        )

        if result.get("success"):
            self.logger.info(f"Moved {source_key} -> {dest_key}")
            return {"status": "archived", "new_path": dest_key}
        else:
            self.logger.error(f"Move failed: {result.get('error')}")
            return {"status": "error", "reason": result.get("error")}

    async def batch_organize_files(self, source_prefix: str, dest_prefix: str):
        """Move all files from one prefix to another."""
        bucket_proxy: S3BucketProxy = self.get_proxy("bucket")

        # List all files in source prefix
        objects = await bucket_proxy.list_objects(prefix=source_prefix)

        results = {"moved": [], "failed": []}
        for obj in objects.get("objects", []):
            source_key = obj["key"]
            filename = source_key.split("/")[-1]
            dest_key = f"{dest_prefix}/{filename}"

            result = await bucket_proxy.move_file(source_key, dest_key)
            if result.get("success"):
                results["moved"].append(dest_key)
            else:
                results["failed"].append(source_key)

        return results

MQTTProxy

Purpose: IoT messaging, device communication, and MQTT command handling for web/mobile client communication.

Common Use Cases:

  • Receiving commands from web/mobile clients (via @mqtt_action)

  • Sending command responses back to clients

  • Publishing status updates and telemetry to the command/web topic

  • Real-time event streaming between edge device and cloud

Receiving Commands — the ``@mqtt_action`` Decorator

The recommended way to handle incoming MQTT commands is the @mqtt_action decorator. You do not need to call register_handler() manually — the framework does it automatically at startup.

See MQTT Command Handlers (@mqtt_action) in the Adding a New Petal guide for full details on @mqtt_action and the cpu_heavy parameter.

from petal_app_manager.plugins.base import Petal
from petal_app_manager.plugins.decorators import http_action, mqtt_action

class YourPetal(Petal):
    name = "petal-example"

    def get_required_proxies(self) -> List[str]:
        return ["mqtt"]

    @mqtt_action(command="do_something")
    async def _do_something(self, topic: str, message: dict):
        """Handles ``petal-example/do_something`` on ``command/edge``."""
        msg_id = message.get("messageId", "unknown")
        payload = message.get("payload", {})

        # ... business logic ...

        mqtt_proxy = self._proxies.get("mqtt")
        await mqtt_proxy.send_command_response(
            message_id=msg_id,
            response_data={"status": "success"},
        )

    @mqtt_action(command="heavy_compute", cpu_heavy=True)
    async def _heavy_compute(self, topic: str, message: dict):
        """CPU-intensive handler — offloaded to a thread-pool executor."""
        result = expensive_calculation(message.get("payload", {}))
        mqtt_proxy = self._proxies.get("mqtt")
        await mqtt_proxy.send_command_response(
            message_id=message.get("messageId", "unknown"),
            response_data={"status": "success", "result": result},
        )

Publishing Messages — Public API Methods:

mqtt_proxy = self._proxies.get("mqtt")

# Publish to the ``command/web`` topic (for web/mobile clients)
await mqtt_proxy.publish_message(
    payload={"key": "value", "status": "update"},
    qos=1,  # QoS 0, 1, or 2
)

# Send a direct response to a specific command (correlates via messageId)
await mqtt_proxy.send_command_response(
    message_id="original-msg-id",
    response_data={"status": "success", "data": {...}},
)

Method Reference:

Method

Description

publish_message(payload, qos=1)

Publish a message to the command/web topic for web/mobile clients.

send_command_response(message_id, response_data)

Send a correlated response to an incoming command (published to the response topic with the original messageId).

register_handler(handler, cpu_heavy=False)

Register an async handler on the command/edge topic. Prefer ``@mqtt_action`` instead — manual registration is only needed for advanced cases.

unregister_handler(subscription_id)

Unregister a previously registered handler by subscription ID.

health_check()

Return proxy health status including connection state.

The ``cpu_heavy`` Flag:

Both @mqtt_action(cpu_heavy=True) and register_handler(handler, cpu_heavy=True) cause the handler’s execution to be offloaded to a thread-pool executor. This prevents CPU-intensive work (e.g. NumPy, image processing, large JSON serialization) from blocking the asyncio event loop that services all other MQTT, HTTP, and WebSocket traffic.

The default is cpu_heavy=False — use it for typical I/O-bound handlers that await proxy calls, database queries, or network requests.

Example — Status Broadcasting:

class TelemetryPetal(Petal):
    name = "petal-telemetry"

    def get_required_proxies(self) -> List[str]:
        return ["mqtt", "ext_mavlink"]

    async def broadcast_position(self, lat: float, lon: float, alt: float):
        """Publish a position update to connected web clients."""
        mqtt_proxy = self._proxies.get("mqtt")
        await mqtt_proxy.publish_message(
            payload={
                "type": "position_update",
                "latitude": lat,
                "longitude": lon,
                "altitude": alt,
            }
        )

Error Handling and Best Practices

Proxy Availability:

class YourPetal(Petal):
    async def safe_proxy_usage(self):
        try:
            redis_proxy = self.get_proxy("redis")
            if redis_proxy is None:
                logger.warning("Redis proxy not available")
                return

            result = await redis_proxy.get("key")
        except Exception as e:
            logger.error(f"Proxy operation failed: {e}")

Dependency Declaration:

def get_required_proxies(self) -> List[str]:
    # Always declare required proxies
    return ["redis", "ext_mavlink"]

def get_optional_proxies(self) -> List[str]:
    # Declare optional proxies for graceful degradation
    return ["cloud", "bucket"]

Health Checks:

async def check_proxy_health(self):
    for proxy_name in self.get_required_proxies():
        proxy = self.get_proxy(proxy_name)
        if proxy and hasattr(proxy, 'health_check'):
            health = await proxy.health_check()
            logger.info(f"{proxy_name} health: {health}")

Configuration Management:

# Proxies automatically use configuration from .env file
# PETAL_REDIS_HOST, PETAL_REDIS_PORT, PETAL_MAVLINK_ENDPOINT, etc.
# No manual configuration needed in petal code

Next Steps

  • Review the Adding a New Petal Guide guide for complete petal development

  • Check Contribution Guidelines for release and versioning

  • Explore existing petals for real-world proxy usage examples

  • Use the Admin Dashboard to monitor proxy health and status

Note

Documentation Status: The MavlinkExternalProxy, S3BucketProxy, RedisProxy, and MQTTProxy sections include comprehensive examples. CloudDBProxy and LocalDBProxy sections contain placeholders that will be populated with detailed examples in future updates.