Source code for petal_app_manager.proxies.localdb

"""
LocalDBProxy
============

• Provides access to the local DynamoDB instance through the controller dashboard API
• Handles machine ID retrieval and authentication for requests
• Abstracts the HTTP communication details away from petals
• Provides async CRUD operations for DynamoDB tables

This proxy allows petals to interact with DynamoDB without worrying about
the underlying HTTP communication details.
"""

from __future__ import annotations
from typing import List, Dict, Any, Optional
import asyncio
import concurrent.futures
import http.client
import json
import logging
import platform
import subprocess
from pathlib import Path
import os

from .base import BaseProxy
from ..organization_manager import get_organization_manager

[docs] class LocalDBProxy(BaseProxy): """ Proxy for communicating with a local DynamoDB instance through a custom API. """ def __init__( self, get_data_url: str, scan_data_url: str, update_data_url: str, set_data_url: str, host: str, port: int, debug: bool = False, ): self.host = host self.port = port self.get_data_url = get_data_url self.scan_data_url = scan_data_url self.update_data_url = update_data_url self.set_data_url = set_data_url self.debug = debug self._loop = None self._exe = concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="LocalDBProxy") self.log = logging.getLogger("LocalDBProxy") self._machine_id = None self._organization_id = None self._robot_type_id = None @property def machine_id(self) -> Optional[str]: """Get the machine ID for this instance.""" return self._machine_id @property def organization_id(self) -> Optional[str]: """Get the organization ID from OrganizationManager.""" try: org_manager = get_organization_manager() return org_manager.organization_id except Exception as e: self.log.error(f"Error getting organization ID from OrganizationManager: {e}") return None @property def robot_type_id(self) -> Optional[str]: """Get the robot type ID for this instance.""" return self._robot_type_id
[docs] async def start(self): """Initialize the connection to the local API service.""" self._loop = asyncio.get_running_loop() self.log.info("Initializing LocalDBProxy connection to %s:%s", self.host, self.port) # Get machine ID for use in all requests self._machine_id = await self._loop.run_in_executor( self._exe, self._get_machine_id ) if not self._machine_id: self.log.warning("Failed to get machine ID, some operations may fail") else: self.log.info("LocalDBProxy initialized with machine ID: %s", self._machine_id) current_instance = await self._get_current_instance() if current_instance is not None: self.log.info("Current robot instance: %s", current_instance) else: self.log.warning("No current robot instance found") # Get robot type ID from current instance data self._robot_type_id = current_instance.get("data",{}).get("robot_type_id", None) if self._robot_type_id is None: self.log.warning("Robot Type ID not found in current instance") else: self.log.info("Robot Type ID: %s", self._robot_type_id) self.log.info("LocalDBProxy started successfully")
[docs] async def stop(self): """Clean up resources when shutting down.""" self._exe.shutdown(wait=False) self.log.info("LocalDBProxy stopped")
def _get_machine_id(self) -> Optional[str]: """Get the machine ID using the machine ID executable.""" try: # Determine architecture arch = platform.machine().lower() is_arm = "aarch64" in arch # Build path to executable utils_dir = Path(__file__).parent.parent / "utils" machine_id_exe = utils_dir / ( "machineid_arm" if is_arm else "machineid_x86" ) # Check if executable exists if not machine_id_exe.exists(): self.log.error(f"Machine ID executable not found at {machine_id_exe}") return None # Execute and get output result = subprocess.run( [str(machine_id_exe)], capture_output=True, text=True, check=True ) return result.stdout.strip() except Exception as e: self.log.error(f"Failed to get machine ID: {e}") return None def _remote_file_request(self, body: Dict[str, Any], path: str, method: str) -> Any: """Make a request to the local API service.""" try: conn = http.client.HTTPConnection(self.host, self.port) headers = {'Content-Type': 'application/json'} body_json = json.dumps(body) if self.debug: self.log.debug(f"Making {method} request to {path} with body: {body}") conn.request(method, path, body=body_json, headers=headers) response = conn.getresponse() raw_data = b"" # Read all chunks of data from the response while chunk := response.read(4096): raw_data += chunk # Close the connection conn.close() # Parse the JSON response try: data = json.loads(raw_data.decode('utf-8')) return {"data": data, "success": True} except json.JSONDecodeError as e: self.log.error(f"Failed to parse response: {e}") return {"error": f"Failed to parse response: {e}"} except Exception as e: self.log.error(f"Request failed: {e}") return {"error": f"Request failed: {e}"} async def _get_current_instance(self) -> Dict[str, Any]: """ Retrieve the current robot instance data from the local database. Returns: The robot instance data as a dictionary if found, or None if an error occurs """ if not self._machine_id: self.log.warning("Machine ID not available, cannot get current instance") return None try: return await self.get_item( table_name="config-robot_instances", partition_key="id", partition_value=self._machine_id ) except Exception as e: self.log.error(f"Failed to get current instance: {e}") return None # ------ Public API methods ------ #
[docs] async def get_item( self, table_name: str, partition_key: str, partition_value: str ) -> Dict[str, Any]: """ Retrieve a single item from DynamoDB using its partition key. Only returns items that are not soft-deleted (deleted != True) and belong to this machine. Args: table_name: The DynamoDB table name partition_key: Name of the partition key (usually 'id') partition_value: Value of the partition key to look up Returns: The item as a dictionary if found, not deleted, and belongs to this machine, or an error dictionary """ if not self._machine_id: return {"error": "Machine ID not available"} body = { "onBoardId": self._machine_id, "table_name": table_name, "partition_key": partition_key, "partition_value": partition_value } path = self.get_data_url result = await self._loop.run_in_executor( self._exe, lambda: self._remote_file_request(body, path, 'POST') ) # Filter out soft-deleted items and items not belonging to this machine if result.get("success") and result.get("data"): item_data = result["data"] if item_data.get("deleted") is True: return {"error": "Item not found or has been deleted"} # Check if robot_instance_id matches machine_id if item_data.get("robot_instance_id") != self._machine_id: return {"error": "Item not found or access denied"} return result
[docs] async def scan_items( self, table_name: str, filters: Optional[List[Dict[str, str]]] = None ) -> List[Dict[str, Any]]: """ Scan a DynamoDB table with optional filters. Only returns items that are not soft-deleted (deleted != True) and belong to this machine. Args: table_name: The DynamoDB table name filters: List of filter dictionaries, each with 'filter_key_name' and 'filter_key_value' Returns: List of matching items that are not deleted and belong to this machine """ if not self._machine_id: return [{"error": "Machine ID not available"}] body = { "table_name": table_name, "onBoardId": self._machine_id } # Always add robot_instance_id filter to ensure only this machine's records are returned if filters is None: filters = [] else: filters = filters.copy() # Don't modify the original list # Add robot_instance_id filter filters.append({ "filter_key_name": "robot_instance_id", "filter_key_value": self._machine_id }) body["scanFilter"] = filters path = self.scan_data_url result = await self._loop.run_in_executor( self._exe, lambda: self._remote_file_request(body, path, 'POST') ) # Filter out soft-deleted items and double-check robot_instance_id if result.get("success") and result.get("data"): if isinstance(result["data"], list): # Filter out items where deleted is True or robot_instance_id doesn't match filtered_items = [ item for item in result["data"] if (item.get("deleted") is not True and item.get("robot_instance_id") == self._machine_id) ] return {"data": filtered_items, "success": True} else: # Single item response, check if it's deleted or doesn't belong to this machine if (result["data"].get("deleted") is True or result["data"].get("robot_instance_id") != self._machine_id): return {"data": [], "success": True} return result
[docs] async def update_item( self, table_name: str, filter_key: str, filter_value: str, data: Dict[str, Any] ) -> Dict[str, Any]: """ Update or insert an item in DynamoDB. Automatically adds robot_instance_id to ensure item belongs to this machine. Args: table_name: The DynamoDB table name filter_key: Name of the key to filter on (usually 'id') filter_value: Value of the key to update data: The complete item data to update or insert Returns: Response from the update operation """ if not self._machine_id: return {"error": "Machine ID not available"} # Ensure robot_instance_id is set to machine_id data_with_robot_id = data.copy() data_with_robot_id["robot_instance_id"] = self._machine_id body = { "onBoardId": self._machine_id, "table_name": table_name, "filter_key": filter_key, "filter_value": filter_value, "data": data_with_robot_id } path = self.update_data_url return await self._loop.run_in_executor( self._exe, lambda: self._remote_file_request(body, path, 'POST') )
[docs] async def set_item( self, table_name: str, filter_key: str, filter_value: str, data: Dict[str, Any] ) -> Dict[str, Any]: """ Puts an item in DynamoDB. Automatically adds robot_instance_id to ensure item belongs to this machine. Args: table_name: The DynamoDB table name filter_key: Name of the key to filter on (usually 'id') filter_value: Value of the key to update data: The complete item data to update or insert Returns: Response from the set operation """ if not self._machine_id: return {"error": "Machine ID not available"} # Ensure robot_instance_id is set to machine_id data_with_robot_id = data.copy() data_with_robot_id["robot_instance_id"] = self._machine_id body = { "onBoardId": self._machine_id, "table_name": table_name, "filter_key": filter_key, "filter_value": filter_value, "data": data_with_robot_id } path = self.set_data_url return await self._loop.run_in_executor( self._exe, lambda: self._remote_file_request(body, path, 'POST') )
[docs] async def delete_item( self, table_name: str, filter_key: str, filter_value: str ) -> Dict[str, Any]: """ Soft delete an item from DynamoDB by setting deleted=True. Args: table_name: The DynamoDB table name filter_key: Name of the key to filter on (usually 'id') filter_value: Value of the key to delete Returns: Response from the update operation """ if not self._machine_id: return {"error": "Machine ID not available"} # Get the existing item directly from the database (bypassing soft delete filter) body = { "onBoardId": self._machine_id, "table_name": table_name, "partition_key": filter_key, "partition_value": filter_value } path = self.get_data_url existing_item_result = await self._loop.run_in_executor( self._exe, lambda: self._remote_file_request(body, path, 'POST') ) if not existing_item_result.get("success") or not existing_item_result.get("data"): return {"error": "Item not found"} # Update the item with deleted=True while preserving existing data item_data = existing_item_result["data"].copy() item_data["deleted"] = True return await self.update_item( table_name=table_name, filter_key=filter_key, filter_value=filter_value, data=item_data )