Source code for petal_app_manager.api.cloud_api


from fastapi import APIRouter, Depends
from typing import Dict, Any, Optional
from fastapi.exceptions import HTTPException
from pydantic import BaseModel
import logging

# Import proxy types for type hints
from ..proxies.localdb import LocalDBProxy
from ..proxies.cloud import CloudDBProxy
from ..api import get_proxies

router = APIRouter(tags=["cloud"])

_logger: Optional[logging.Logger] = None

def _set_logger(logger: logging.Logger):
    """Set the logger for api endpoints."""
    global _logger
    _logger = logger
    if not isinstance(_logger, logging.Logger):
        raise ValueError("Logger must be an instance of logging.Logger")
    if not _logger.name:
        raise ValueError("Logger must have a name set")
    if not _logger.handlers:
        raise ValueError("Logger must have at least one handler configured")

[docs] def get_logger() -> Optional[logging.Logger]: """Get the logger instance.""" global _logger if not _logger: raise ValueError("Logger has not been set. Call _set_logger first.") return _logger
# Request models
[docs] class ScanTableRequest(BaseModel): table_name: str filters: list = []
[docs] class GetItemRequest(BaseModel): table_name: str key_name: str key_value: str
[docs] class SetItemRequest(BaseModel): table_name: str item_data: Dict[str, Any]
[docs] class UpdateItemRequest(BaseModel): table_name: str key_name: str key_value: str update_data: Dict[str, Any]
[docs] @router.post( "/scan-table", summary="Get all records from cloud for a specific table", description="Retrieves all records for the current organization from the cloud database.", ) async def scan_table(request: ScanTableRequest) -> Dict[str, Any]: """List all data in a particular table in the cloud database.""" proxies = get_proxies() logger = get_logger() if "cloud" not in proxies: logger.error("CloudDBProxy is not enabled. Please enable 'cloud' in proxies.yaml.") raise HTTPException( status_code=503, detail="CloudDBProxy is not enabled. Please enable 'cloud' in proxies.yaml." ) cloud_proxy: CloudDBProxy = proxies["cloud"] # get table name from request data table_name = request.table_name if not table_name: logger.error("Table name is required to scan items in cloud") raise HTTPException( status_code=400, detail="Table name is required", headers={"source": "scan_table"} ) try: # Use the filters from request - robot_instance_id filtering is handled automatically by the proxy result = await cloud_proxy.scan_items( table_name=table_name, filters=request.filters ) if "error" in result: logger.error(f"Failed to retrieve records from cloud: {result['error']}") raise HTTPException( status_code=500, detail=f"Failed to retrieve records from cloud: {result['error']}", headers={"source": "scan_table"} ) records = result.get("data", []) logger.info( f"Retrieved {len(records)} records from cloud table {table_name} " f"for machine {cloud_proxy._get_machine_id()}" ) # Process and return the records return {"records": records, "count": len(records)} except HTTPException: raise except Exception as e: logger.error(f"Unexpected error retrieving cloud records: {str(e)}") raise HTTPException( status_code=500, detail=f"Unexpected error retrieving cloud records: {str(e)}", headers={"source": "scan_table"} )
[docs] @router.post( "/get-item", summary="Get a specific item from cloud database", description="Retrieves a specific item by key from the cloud database.", ) async def get_item(request: GetItemRequest) -> Dict[str, Any]: """Get a specific item from the cloud database.""" proxies = get_proxies() logger = get_logger() if "cloud" not in proxies: logger.error("CloudDBProxy is not enabled. Please enable 'cloud' in proxies.yaml.") raise HTTPException( status_code=503, detail="CloudDBProxy is not enabled. Please enable 'cloud' in proxies.yaml." ) cloud_proxy: CloudDBProxy = proxies["cloud"] if not request.table_name or not request.key_name or not request.key_value: logger.error("Table name, key name, and key value are required") raise HTTPException( status_code=400, detail="Table name, key name, and key value are required", headers={"source": "get_item"} ) try: result = await cloud_proxy.get_item( table_name=request.table_name, partition_key=request.key_name, partition_value=request.key_value ) if "error" in result: logger.error(f"Failed to retrieve item from cloud: {result['error']}") raise HTTPException( status_code=500, detail=f"Failed to retrieve item from cloud: {result['error']}", headers={"source": "get_item"} ) item = result.get("data") if not item: raise HTTPException( status_code=404, detail="Item not found in cloud database", headers={"source": "get_item"} ) logger.info( f"Retrieved item {request.key_value} from cloud table {request.table_name}" ) return {"item": item} except HTTPException: raise except Exception as e: logger.error(f"Unexpected error retrieving cloud item: {str(e)}") raise HTTPException( status_code=500, detail=f"Unexpected error retrieving cloud item: {str(e)}", headers={"source": "get_item"} )
[docs] @router.post( "/set-item", summary="Create or overwrite an item in cloud database", description="Creates a new item or overwrites an existing item in the cloud database.", ) async def set_item(request: SetItemRequest) -> Dict[str, Any]: """Create or overwrite an item in the cloud database.""" proxies = get_proxies() logger = get_logger() if "cloud" not in proxies: logger.error("CloudDBProxy is not enabled. Please enable 'cloud' in proxies.yaml.") raise HTTPException( status_code=503, detail="CloudDBProxy is not enabled. Please enable 'cloud' in proxies.yaml." ) cloud_proxy: CloudDBProxy = proxies["cloud"] if not request.table_name or not request.item_data: logger.error("Table name and item data are required") raise HTTPException( status_code=400, detail="Table name and item data are required", headers={"source": "set_item"} ) try: # Add robot_instance_id to the item data if not present - this is handled automatically by the proxy item_data = request.item_data.copy() result = await cloud_proxy.set_item( table_name=request.table_name, filter_key="id", # Use id as the primary key instead of organization_id filter_value=item_data.get("id", ""), # Get id from item_data data=item_data ) if "error" in result: logger.error(f"Failed to set item in cloud: {result['error']}") raise HTTPException( status_code=500, detail="Failed to set item in cloud", headers={"source": "set_item"}, extra={"error": result["error"]} ) logger.info(f"Successfully set item in cloud table {request.table_name}") return {"success": True, "message": "Item created/updated successfully"} except HTTPException: raise except Exception as e: logger.error(f"Unexpected error setting cloud item: {str(e)}") raise HTTPException( status_code=500, detail=f"Unexpected error setting cloud item: {str(e)}", headers={"source": "set_item"} )
[docs] @router.post( "/update-item", summary="Update an existing item in cloud database", description="Updates specific fields of an existing item in the cloud database.", ) async def update_item(request: UpdateItemRequest) -> Dict[str, Any]: """Update an existing item in the cloud database.""" proxies = get_proxies() logger = get_logger() if "cloud" not in proxies: logger.error("CloudDBProxy is not enabled. Please enable 'cloud' in proxies.yaml.") raise HTTPException( status_code=503, detail="CloudDBProxy is not enabled. Please enable 'cloud' in proxies.yaml." ) cloud_proxy: CloudDBProxy = proxies["cloud"] if not request.table_name or not request.key_name or not request.key_value or not request.update_data: logger.error("Table name, key name, key value, and update data are required") raise HTTPException( status_code=400, detail="Table name, key name, key value, and update data are required", headers={"source": "update_item"} ) try: result = await cloud_proxy.update_item( table_name=request.table_name, filter_key=request.key_name, filter_value=request.key_value, data=request.update_data ) if "error" in result: logger.error(f"Failed to update item in cloud: {result['error']}") raise HTTPException( status_code=500, detail=f"Failed to update item in cloud {result['error']}", headers={"source": "update_item"} ) logger.info( f"Successfully updated item {request.key_value} in cloud table {request.table_name}" ) return {"success": True, "message": "Item updated successfully"} except HTTPException: raise except Exception as e: logger.error(f"Unexpected error updating cloud item: {str(e)}") raise HTTPException( status_code=500, detail=f"Unexpected error updating cloud item: {str(e)}", headers={"source": "update_item"} )
[docs] @router.delete( "/delete-item", summary="Delete an item from cloud database", description="Deletes a specific item from the cloud database.", ) async def delete_item(request: GetItemRequest) -> Dict[str, Any]: """Delete an item from the cloud database.""" proxies = get_proxies() logger = get_logger() if "cloud" not in proxies: logger.error("CloudDBProxy is not enabled. Please enable 'cloud' in proxies.yaml.") raise HTTPException( status_code=503, detail="CloudDBProxy is not enabled. Please enable 'cloud' in proxies.yaml." ) cloud_proxy: CloudDBProxy = proxies["cloud"] if not request.table_name or not request.key_name or not request.key_value: logger.error("Table name, key name, and key value are required") raise HTTPException( status_code=400, detail="Table name, key name, and key value are required", headers={"source": "delete_item"} ) try: result = await cloud_proxy.delete_item( table_name=request.table_name, filter_key=request.key_name, filter_value=request.key_value ) if "error" in result: logger.error(f"Failed to delete item from cloud: {result['error']}") raise HTTPException( status_code=500, detail=f"Failed to delete item from cloud: {result['error']}", headers={"source": "delete_item"} ) logger.info( f"Successfully deleted item {request.key_value} from cloud table {request.table_name}" ) return {"success": True, "message": "Item deleted successfully"} except HTTPException: raise except Exception as e: logger.error(f"Unexpected error deleting cloud item: {str(e)}") raise HTTPException( status_code=500, detail=f"Unexpected error deleting cloud item: {str(e)}", headers={"source": "delete_item"} )