Source code for petal_app_manager.plugins.base

from abc import ABC
from typing import Mapping, Dict, Any, Callable, Optional, List, Tuple
import asyncio
import inspect

from fastapi.templating import Jinja2Templates
from ..proxies.base import BaseProxy

import logging

logger = logging.getLogger("PluginsLoader")

[docs] class Petal(ABC): """ Petal authors only inherit this; NO FastAPI import, no routers. """ name: str version: str def __init__(self) -> None: self._proxies: Mapping[str, BaseProxy] = {} # Populated by _collect_mqtt_actions() during startup self._mqtt_command_handlers: Optional[Dict[str, Callable]] = None self._mqtt_cpu_heavy_commands: Optional[Dict[str, bool]] = None # define a startup method that can be overridden
[docs] def startup(self) -> None: """ Called when the petal is started. """ logger.info(f"Starting petal {self.name} ({self.version})") pass
[docs] def shutdown(self) -> None: """ Called when the petal is stopped. """ logger.info(f"Shutting down petal {self.name} ({self.version})") pass
[docs] async def async_startup(self) -> None: """ Called after startup to handle async operations like MQTT subscriptions. """ logger.info(f"Starting async operations for petal {self.name} ({self.version})") pass
[docs] async def async_shutdown(self) -> None: """ Called before shutdown to handle async operations like MQTT unsubscriptions. """ logger.info(f"Shutting down async operations for petal {self.name} ({self.version})") pass
[docs] def inject_proxies(self, proxies: Mapping[str, BaseProxy]) -> None: # Skip isinstance check for now due to import issues # TODO: Debug why isinstance(proxy, BaseProxy) fails during app startup self._proxies = proxies
[docs] def inject_templates(self, templates: Mapping[str, Jinja2Templates]) -> None: """ Inject Jinja2 templates into the petal. """ self.templates = templates
# ────────────────────────── MQTT command handler scaffolding ────────────── def _collect_mqtt_actions(self) -> Tuple[Dict[str, Callable], Dict[str, bool]]: """Scan ``self`` for methods decorated with ``@mqtt_action`` and build two parallel dicts keyed by the fully-qualified command string (``"{petal_name}/{command_suffix}"``): * **handlers** – the async method reference * **cpu_heavy** – whether the handler should be offloaded Returns ``(handlers, cpu_heavy_flags)``. """ handlers: Dict[str, Callable] = {} cpu_heavy_flags: Dict[str, bool] = {} for attr_name in dir(self): try: attr = getattr(self, attr_name) except Exception: continue meta = getattr(attr, "__mqtt_action__", None) if meta is None: continue command_suffix = meta["command"] full_command = f"{self.name}/{command_suffix}" handlers[full_command] = attr cpu_heavy_flags[full_command] = meta.get("cpu_heavy", False) return handlers, cpu_heavy_flags
[docs] def has_mqtt_actions(self) -> bool: """Return ``True`` if any method is decorated with ``@mqtt_action``.""" for attr_name in dir(self): try: attr = getattr(self, attr_name) except Exception: continue if getattr(attr, "__mqtt_action__", None) is not None: return True return False
async def _mqtt_master_command_handler(self, topic: str, message: Dict[str, Any]): """Auto-generated master command handler that dispatches to ``@mqtt_action``-decorated methods. This is the single handler registered with the MQTT proxy. It reads the ``command`` field from the incoming message and looks up the matching handler in ``self._mqtt_command_handlers``. If the target handler is marked ``cpu_heavy``, the dispatch info is stored on the message so the proxy can offload it. """ mqtt_proxy = self._proxies.get("mqtt") try: if self._mqtt_command_handlers is None: logger.warning( "Petal %s not fully initialized yet, MQTT command handlers not available", self.name, ) return # Guard: ensure MQTT topics are fully initialised (org ID available) if mqtt_proxy is not None and getattr(mqtt_proxy, "organization_id", None) is None: logger.warning( "Petal %s MQTT topics not yet initialized (organization_id missing), " "cannot process commands", self.name, ) return command = message.get("command", "") logger.info("Petal %s master handler received command: %s", self.name, command) if command in self._mqtt_command_handlers: handler = self._mqtt_command_handlers[command] is_cpu_heavy = self._mqtt_cpu_heavy_commands.get(command, False) if is_cpu_heavy and self._loop and not self._loop.is_closed(): # Offload CPU-heavy handler to thread pool await self._loop.run_in_executor( None, lambda: asyncio.run(handler(topic, message)) ) else: await handler(topic, message) else: # Ignore commands not meant for this petal if not command.startswith(f"{self.name}/"): logger.debug("Ignoring command not meant for petal %s: %s", self.name, command) return error_msg = f"Unknown command: {command}" logger.error(error_msg) if message.get("waitResponse", False) and mqtt_proxy: await mqtt_proxy.send_command_response( message_id=message.get("messageId", "unknown"), response_data={ "status": "error", "message": error_msg, "error_code": "UNKNOWN_COMMAND", "available_commands": list(self._mqtt_command_handlers.keys()), }, ) except Exception as exc: error_msg = f"Master command handler error: {exc}" logger.error(error_msg) try: message_id = message.get("messageId", "unknown") if message.get("waitResponse", False) and mqtt_proxy: await mqtt_proxy.send_command_response( message_id=message_id, response_data={ "status": "error", "message": error_msg, "error_code": "HANDLER_ERROR", }, ) except Exception as resp_exc: logger.error("Failed to send error response: %s", resp_exc) async def _setup_mqtt_actions(self) -> Optional[str]: """Collect ``@mqtt_action``-decorated methods, build the dispatch table, and register the master handler with the MQTT proxy. Returns the subscription ID on success, ``None`` on failure. """ handlers, cpu_heavy_flags = self._collect_mqtt_actions() if not handlers: logger.debug("Petal %s has no @mqtt_action handlers", self.name) return None self._mqtt_command_handlers = handlers self._mqtt_cpu_heavy_commands = cpu_heavy_flags logger.info( "Petal %s registered %d MQTT command(s): %s", self.name, len(handlers), ", ".join(handlers.keys()), ) mqtt_proxy = self._proxies.get("mqtt") if mqtt_proxy is None: logger.warning("MQTT proxy not available for petal %s", self.name) return None # Any handler marked cpu_heavy means the *master* handler itself should # be marked cpu_heavy=False — the offloading happens *inside* the # master handler per-command, not at the proxy level. subscription_id = mqtt_proxy.register_handler(self._mqtt_master_command_handler) if subscription_id is None: logger.error("Failed to register MQTT master handler for petal %s", self.name) return None logger.info( "Petal %s MQTT master handler registered with subscription ID: %s", self.name, subscription_id, ) return subscription_id