Source code for petal_app_manager.proxies.bucket

import asyncio
import concurrent.futures
import json
import logging
import time
import uuid
from pathlib import Path
from typing import Dict, Any, Optional, List
from urllib.parse import urlparse
from datetime import datetime

import boto3
import requests
from botocore.exceptions import ClientError, NoCredentialsError

from .base import BaseProxy
from ..organization_manager import get_organization_manager

_ULOG_MAGIC   = b"ULog\x01\x12\x35"     # 7‑byte magic    :contentReference[oaicite:0]{index=0}
_ULOG_VERSION = 1                       # current spec
_ROSBAG_MAGIC = b"#ROSBAG"              # first 7 bytes   :contentReference[oaicite:1]{index=1}
_SIZE_LIMIT   = 2 * 1024**3             # 2 GiB

[docs] class S3BucketProxy(BaseProxy): """ Proxy for communicating with an S3 bucket for flight log storage. Supports upload, download, and listing of .ulg and .bag files. """ # Allowed file extensions for flight logs ALLOWED_EXTENSIONS = {'.ulg', '.bag'} def __init__( self, session_token_url: str, bucket_name: str, upload_prefix: str = 'flight_logs/', debug: bool = False, request_timeout: int = 30 ): self.session_token_url = session_token_url self.bucket_name = bucket_name self.upload_prefix = upload_prefix.rstrip('/') + '/' self.debug = debug self.request_timeout = request_timeout self._loop = None self._exe = concurrent.futures.ThreadPoolExecutor(max_workers=2, thread_name_prefix="S3BucketProxy") self.log = logging.getLogger("S3BucketProxy") # Session management self._session_cache = { 'credentials': None, 'expires_at': 0 } # S3 client (will be initialized in start()) self.s3_client = None
[docs] async def start(self): """Initialize the S3 proxy and fetch initial credentials.""" self._loop = asyncio.get_running_loop() self.log.info("Initializing S3BucketProxy connection") # Validate configuration if not self.session_token_url or not self.bucket_name: raise ValueError("SESSION_TOKEN_URL and BUCKET_NAME must be configured") # Fetch initial credentials and create S3 client try: credentials = await self._get_session_credentials() self.s3_client = self._create_s3_client(credentials) self.log.info("S3BucketProxy started successfully") except Exception as e: self.log.error(f"Failed to initialize S3BucketProxy: {e}")
# raise
[docs] async def stop(self): """Clean up resources when shutting down.""" self._exe.shutdown(wait=False) self.s3_client = None self.log.info("S3BucketProxy stopped")
def _get_machine_id(self) -> Optional[str]: """ Get the machine ID from the OrganizationManager. Returns: The machine ID if available, None otherwise """ try: org_manager = get_organization_manager() machine_id = org_manager.machine_id if not machine_id: self.log.error("Machine ID not available from OrganizationManager") return None return machine_id except Exception as e: self.log.error(f"Error getting machine ID from OrganizationManager: {e}") return None def _get_organization_id(self) -> Optional[str]: """ Get the organization ID from the OrganizationManager. Returns: The organization ID if available, None otherwise """ try: org_manager = get_organization_manager() org_id = org_manager.organization_id if not org_id: self.log.debug("Organization ID not yet available from OrganizationManager") return None return org_id except Exception as e: self.log.error(f"Error getting organization ID from OrganizationManager: {e}") return None def _validate_file_extension(self, filename: str) -> bool: """ Validate that the file has an allowed extension for flight logs. Args: filename: The filename to validate Returns: True if the file extension is allowed, False otherwise """ if not filename: return False file_path = Path(filename) extension = file_path.suffix.lower() return extension in self.ALLOWED_EXTENSIONS def _validate_file_content(self, file_path: Path) -> Dict[str, Any]: """ Basic validation of PX4 ULog (*.ulg) and ROS1 bag (*.bag) flight‑log files. Returns a dict with either {"valid": True, ...} or {"valid": False, "error": ...}. """ try: # ---------- existence & size ------------------------------------------------- if not file_path.exists(): return {"valid": False, "error": "File does not exist"} size = file_path.stat().st_size if size == 0: return {"valid": False, "error": "File is empty"} if size > _SIZE_LIMIT: return {"valid": False, "error": "File too large (max 2 GiB)"} # ---------- content sniff ---------------------------------------------------- ext = file_path.suffix.lower() if ext == ".ulg": with file_path.open("rb") as f: header = f.read(8) # 7‑byte magic + 1‑byte version if not (header[:7] == _ULOG_MAGIC and header[7] == _ULOG_VERSION): return {"valid": False, "error": "Invalid ULog header (magic/version mismatch)"} elif ext == ".bag": with file_path.open("rb") as f: header = f.read(8) # '#ROSBAG' if not header.startswith(_ROSBAG_MAGIC): return {"valid": False, "error": "Invalid ROS bag header (expected '#ROSBAG')"} else: return {"valid": False, "error": f"Unsupported extension '{ext}'"} # ---------- success ---------------------------------------------------------- return {"valid": True, "file_size": size, "extension": ext} except Exception as e: # log & bubble up a clean error object self.log.error("File validation error: %s", e) return {"valid": False, "error": f"Validation failed: {e}"} async def _get_session_credentials(self) -> Dict[str, Any]: """ Fetch AWS session credentials from the session manager with caching. Returns: Dictionary containing AWS session credentials """ current_time = time.time() # Check if we have cached credentials that haven't expired if (self._session_cache['credentials'] and current_time < self._session_cache['expires_at']): return self._session_cache['credentials'] def _fetch_credentials(): try: self.log.debug("Fetching new session credentials") response = requests.post( self.session_token_url, timeout=self.request_timeout ) response.raise_for_status() credentials = response.json() # Validate required fields required_fields = ['accessKeyId', 'secretAccessKey', 'sessionToken'] for field in required_fields: if field not in credentials: raise ValueError(f"Missing required field: {field}") # Cache credentials for 50 minutes (assume 1-hour expiry) self._session_cache['credentials'] = credentials self._session_cache['expires_at'] = current_time + (50 * 60) self.log.info("Session credentials updated successfully") return credentials except requests.exceptions.Timeout: self.log.debug("Session service timeout") raise Exception("Session service timeout") except requests.exceptions.RequestException as e: self.log.debug(f"Session service unreachable: {type(e).__name__}") raise Exception(f"Failed to fetch session credentials: {str(e)}") except ValueError as e: self.log.debug(f"Invalid credentials response: {type(e).__name__}") raise Exception(f"Invalid credentials response: {str(e)}") except Exception as e: self.log.debug(f"Session service error: {type(e).__name__}") raise Exception(f"Authentication service error: {str(e)}") try: return await self._loop.run_in_executor(self._exe, _fetch_credentials) except Exception as e: self.log.debug(f"Credential fetch failed: {type(e).__name__}") # Re-raise with a more user-friendly message or handle as needed raise Exception(f"Credential fetch operation failed: {str(e)}") def _create_s3_client(self, credentials: Dict[str, Any]) -> boto3.client: """Create S3 client with session credentials""" try: return boto3.client( 's3', aws_access_key_id=credentials['accessKeyId'], aws_secret_access_key=credentials['secretAccessKey'], aws_session_token=credentials['sessionToken'] ) except Exception as e: self.log.error(f"Failed to create S3 client: {str(e)}") raise Exception("Failed to initialize storage client") def _generate_s3_key(self, filename: str, machine_id: str) -> str: """ Generate a unique S3 key for the file. Args: filename: Original filename machine_id: Machine ID for organization Returns: S3 key string """ # Create timestamp for organization timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S') # Clean filename and add UUID for uniqueness clean_filename = Path(filename).name unique_filename = f"{uuid.uuid4()}_{clean_filename}" # Generate key: machine_id/flight-logs/YYYYMMDD_HHMMSS_uuid_filename return f"{machine_id}/flight-logs/{timestamp}_{unique_filename}" async def _refresh_s3_client(self): """Refresh the S3 client with new credentials if needed.""" try: credentials = await self._get_session_credentials() self.s3_client = self._create_s3_client(credentials) except Exception as e: self.log.error(f"Failed to refresh S3 client: {e}") # raise # ------ Public API methods ------ #
[docs] async def upload_file( self, file_path: Path, custom_filename: Optional[str] = None, custom_s3_key: Optional[str] = None ) -> Dict[str, Any]: """ Upload a flight log file to S3. Args: file_path: Path to the local file to upload custom_filename: Optional custom filename (defaults to original) custom_s3_key: Optional custom S3 key (overrides default key generation) Returns: Dictionary with upload results """ # Get machine ID from LocalDBProxy machine_id = self._get_machine_id() if not machine_id: return {"error": "Machine ID not available"} def _upload(): try: filename = custom_filename or file_path.name # Validate file extension if not self._validate_file_extension(filename): return { "error": f"Invalid file extension. Only {', '.join(self.ALLOWED_EXTENSIONS)} files are allowed", "allowed_extensions": list(self.ALLOWED_EXTENSIONS) } # Validate file content validation_result = self._validate_file_content(file_path) if not validation_result["valid"]: return {"error": f"File validation failed: {validation_result['error']}"} # Generate S3 key if custom_s3_key: s3_key = custom_s3_key else: s3_key = self._generate_s3_key(filename, machine_id) # Determine content type extension = Path(filename).suffix.lower() content_type = 'application/octet-stream' if extension == '.ulg': content_type = 'application/x-ulog' elif extension == '.bag': content_type = 'application/x-rosbag' # Upload to S3 using boto3 self.s3_client.upload_file( str(file_path), self.bucket_name, s3_key, ExtraArgs={ 'ContentType': content_type, 'ServerSideEncryption': 'AES256', 'Metadata': { 'original-filename': filename, 'upload-timestamp': str(int(time.time())), 'machine-id': machine_id } } ) file_url = f"https://{self.bucket_name}.s3.amazonaws.com/{s3_key}" self.log.info(f"Successfully uploaded {filename} to {s3_key}") return { "success": True, "s3_key": s3_key, "file_url": file_url, "file_size": validation_result["file_size"], "content_type": content_type } except ClientError as e: error_code = e.response['Error']['Code'] self.log.error(f"S3 upload failed: {error_code}") return {"error": f"Upload failed: {error_code}"} except Exception as e: self.log.error(f"Upload error: {e}") return {"error": f"Upload failed: {str(e)}"} # Ensure we have a valid S3 client try: await self._refresh_s3_client() return await self._loop.run_in_executor(self._exe, _upload) except Exception as e: return {"error": f"Client initialization failed: {str(e)}"}
[docs] async def list_files(self, prefix: Optional[str] = None, max_keys: int = 100) -> Dict[str, Any]: """ List files in the S3 bucket for the current machine. Args: prefix: Optional additional prefix to filter by max_keys: Maximum number of files to return (default 100, max 1000) Returns: Dictionary with list of files """ # Get machine ID from LocalDBProxy machine_id = self._get_machine_id() if not machine_id: return {"error": "Machine ID not available"} def _list(): try: # Build the prefix for listing with new structure: machine_id/flight-logs/ list_prefix = f"{machine_id}/" if prefix: list_prefix += prefix # Limit max_keys to prevent excessive results limited_max_keys = min(max_keys, 1000) # List objects using boto3 response = self.s3_client.list_objects_v2( Bucket=self.bucket_name, Prefix=list_prefix, MaxKeys=limited_max_keys ) files = [] if 'Contents' in response: for obj in response['Contents']: files.append({ 'key': obj['Key'], 'size': obj['Size'], 'last_modified': obj['LastModified'].isoformat(), 'url': f"https://{self.bucket_name}.s3.amazonaws.com/{obj['Key']}" }) self.log.info(f"Listed {len(files)} files with prefix: {list_prefix}") return { "success": True, "files": files, "total_count": len(files), "is_truncated": response.get('IsTruncated', False), "prefix": list_prefix } except ClientError as e: error_code = e.response['Error']['Code'] self.log.error(f"S3 list failed: {error_code}") return {"error": f"List failed: {error_code}"} except Exception as e: self.log.error(f"List error: {e}") return {"error": f"List failed: {str(e)}"} # Ensure we have a valid S3 client try: await self._refresh_s3_client() return await self._loop.run_in_executor(self._exe, _list) except Exception as e: return {"error": f"Client initialization failed: {str(e)}"}
[docs] async def download_file(self, s3_key: str, local_path: Path) -> Dict[str, Any]: """ Download a file from S3. Args: s3_key: S3 key of the file to download local_path: Local path where to save the file Returns: Dictionary with download results """ def _download(): try: # Validate that the key is for an allowed file type if not self._validate_file_extension(s3_key): return {"error": "File type not allowed for download"} # Create parent directories if they don't exist local_path.parent.mkdir(parents=True, exist_ok=True) # Download from S3 using boto3 self.s3_client.download_file( self.bucket_name, s3_key, str(local_path) ) # Get file size after download file_size = local_path.stat().st_size self.log.info(f"Successfully downloaded {s3_key} to {local_path}") return { "success": True, "local_path": str(local_path), "file_size": file_size, "s3_key": s3_key } except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'NoSuchKey': self.log.warning(f"File not found: {s3_key}") return {"error": "File not found"} else: self.log.error(f"S3 download failed: {error_code}") return {"error": f"Download failed: {error_code}"} except Exception as e: self.log.error(f"Download error: {e}") return {"error": f"Download failed: {str(e)}"} # Ensure we have a valid S3 client try: await self._refresh_s3_client() return await self._loop.run_in_executor(self._exe, _download) except Exception as e: return {"error": f"Client initialization failed: {str(e)}"}
[docs] async def delete_file(self, s3_key: str) -> Dict[str, Any]: """ Delete a file from S3. Args: s3_key: S3 key of the file to delete Returns: Dictionary with deletion results """ def _delete(): try: # Delete file from S3 using boto3 self.s3_client.delete_object( Bucket=self.bucket_name, Key=s3_key ) self.log.info(f"Successfully deleted {s3_key}") return { "success": True, "s3_key": s3_key, "message": "File deleted successfully" } except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'NoSuchKey': self.log.warning(f"File not found: {s3_key}") return {"error": "File not found"} else: self.log.error(f"S3 delete failed: {error_code}") return {"error": f"Delete failed: {error_code}"} except Exception as e: self.log.error(f"Delete error: {e}") return {"error": f"Delete failed: {str(e)}"} # Ensure we have a valid S3 client try: await self._refresh_s3_client() return await self._loop.run_in_executor(self._exe, _delete) except Exception as e: return {"error": f"Client initialization failed: {str(e)}"}
[docs] async def move_file(self, source_key: str, dest_key: str) -> Dict[str, Any]: """ Move (rename) a file within the S3 bucket. Args: source_key: Current S3 key of the file dest_key: New S3 key for the file Returns: Dictionary with move results """ def _move(): try: # Copy the object to the new key copy_source = { 'Bucket': self.bucket_name, 'Key': source_key } self.s3_client.copy_object( Bucket=self.bucket_name, CopySource=copy_source, Key=dest_key ) # Delete the original object self.s3_client.delete_object( Bucket=self.bucket_name, Key=source_key ) self.log.info(f"Successfully moved {source_key} to {dest_key}") return { "success": True, "source_key": source_key, "dest_key": dest_key, "message": "File moved successfully" } except ClientError as e: error_code = e.response['Error']['Code'] self.log.error(f"S3 move failed: {error_code}") return {"error": f"Move failed: {error_code}"} except Exception as e: self.log.error(f"Move error: {e}") return {"error": f"Move failed: {str(e)}"} # Ensure we have a valid S3 client try: await self._refresh_s3_client() return await self._loop.run_in_executor(self._exe, _move) except Exception as e: return {"error": f"Client initialization failed: {str(e)}"}
[docs] async def head_object(self, s3_key: str) -> Dict[str, Any]: """ Check if an object exists in S3 and retrieve its metadata. Args: s3_key: S3 key of the object to check Returns: Dictionary with head object results """ def _head(): try: response = self.s3_client.head_object( Bucket=self.bucket_name, Key=s3_key ) self.log.info(f"Successfully retrieved metadata for {s3_key}") return { "success": True, "s3_key": s3_key, "metadata": response.get('Metadata', {}), "content_length": response.get('ContentLength', 0), "content_type": response.get('ContentType', ''), "last_modified": response.get('LastModified').isoformat() if response.get('LastModified') else None } except ClientError as e: error_code = e.response['Error']['Code'] if error_code == '404': self.log.warning(f"File not found: {s3_key}") return {"error": "File not found"} else: self.log.error(f"S3 head object failed: {error_code}") return {"error": f"Head object failed: {error_code}"} except Exception as e: self.log.error(f"Head object error: {e}") return {"error": f"Head object failed: {str(e)}"} # Ensure we have a valid S3 client try: await self._refresh_s3_client() return await self._loop.run_in_executor(self._exe, _head) except Exception as e: return {"error": f"Client initialization failed: {str(e)}"}