from fastapi import APIRouter, HTTPException
from typing import Dict, Any, Optional
from pydantic import BaseModel
import logging
# Import proxy types for type hints
from ..proxies.mqtt import MQTTProxy
from ..api import get_proxies
router = APIRouter(tags=["mqtt"])
_logger: Optional[logging.Logger] = None
def _set_logger(logger: logging.Logger):
"""Set the logger for api endpoints."""
global _logger
_logger = logger
if not isinstance(_logger, logging.Logger):
raise ValueError("Logger must be an instance of logging.Logger")
if not _logger.name:
raise ValueError("Logger must have a name set")
if not _logger.handlers:
raise ValueError("Logger must have at least one handler configured")
[docs]
def get_logger() -> Optional[logging.Logger]:
"""Get the logger instance."""
global _logger
if not _logger:
raise ValueError("Logger has not been set. Call _set_logger first.")
return _logger
# Request models
[docs]
class PublishMessageRequest(BaseModel):
topic: str
payload: Dict[str, Any]
qos: int = 1
[docs]
class SubscribeTopicRequest(BaseModel):
topic: str
[docs]
class UnsubscribeTopicRequest(BaseModel):
topic: str
[docs]
class SubscribePatternRequest(BaseModel):
pattern: str
# Response models
[docs]
class MQTTResponse(BaseModel):
status: str
message: Optional[str] = None
[docs]
@router.post(
"/publish",
summary="Publish message to MQTT topic",
description="Publish a message to a specified MQTT topic.",
)
async def publish_message(request: PublishMessageRequest) -> MQTTResponse:
"""Publish a message to an MQTT topic."""
proxies = get_proxies()
logger = get_logger()
if "mqtt" not in proxies:
logger.error("MQTT proxy not available")
raise HTTPException(
status_code=503,
detail="MQTT proxy not available",
headers={"source": "publish_message"}
)
mqtt_proxy: MQTTProxy = proxies["mqtt"]
try:
success = await mqtt_proxy.publish_message(
topic=request.topic,
payload=request.payload,
qos=request.qos
)
if success:
logger.info(f"Published message to topic: {request.topic}")
return MQTTResponse(status="success", message=f"Message published to {request.topic}")
else:
logger.error(f"Failed to publish message to topic: {request.topic}")
raise HTTPException(
status_code=500,
detail=f"Failed to publish message to topic: {request.topic}",
headers={"source": "publish_message"}
)
except Exception as e:
logger.error(f"Error publishing message: {e}")
raise HTTPException(
status_code=500,
detail=f"Error publishing message: {str(e)}",
headers={"source": "publish_message"}
)
[docs]
@router.post(
"/subscribe",
summary="Subscribe to MQTT topic",
description="Subscribe to a specified MQTT topic.",
)
async def subscribe_topic(request: SubscribeTopicRequest) -> MQTTResponse:
"""Subscribe to an MQTT topic."""
proxies = get_proxies()
logger = get_logger()
if "mqtt" not in proxies:
logger.error("MQTT proxy not available")
raise HTTPException(
status_code=503,
detail="MQTT proxy not available",
headers={"source": "subscribe_topic"}
)
mqtt_proxy: MQTTProxy = proxies["mqtt"]
try:
success = await mqtt_proxy.subscribe_to_topic(topic=request.topic)
if success:
logger.info(f"Subscribed to topic: {request.topic}")
return MQTTResponse(status="success", message=f"Subscribed to {request.topic}")
else:
logger.error(f"Failed to subscribe to topic: {request.topic}")
raise HTTPException(
status_code=500,
detail=f"Failed to subscribe to topic: {request.topic}",
headers={"source": "subscribe_topic"}
)
except Exception as e:
logger.error(f"Error subscribing to topic: {e}")
raise HTTPException(
status_code=500,
detail=f"Error subscribing to topic: {str(e)}",
headers={"source": "subscribe_topic"}
)
[docs]
@router.post(
"/unsubscribe",
summary="Unsubscribe from MQTT topic",
description="Unsubscribe from a specified MQTT topic.",
)
async def unsubscribe_topic(request: UnsubscribeTopicRequest) -> MQTTResponse:
"""Unsubscribe from an MQTT topic."""
proxies = get_proxies()
logger = get_logger()
if "mqtt" not in proxies:
logger.error("MQTT proxy not available")
raise HTTPException(
status_code=503,
detail="MQTT proxy not available",
headers={"source": "unsubscribe_topic"}
)
mqtt_proxy: MQTTProxy = proxies["mqtt"]
try:
success = await mqtt_proxy.unsubscribe_from_topic(topic=request.topic)
if success:
logger.info(f"Unsubscribed from topic: {request.topic}")
return MQTTResponse(status="success", message=f"Unsubscribed from {request.topic}")
else:
logger.error(f"Failed to unsubscribe from topic: {request.topic}")
raise HTTPException(
status_code=500,
detail=f"Failed to unsubscribe from topic: {request.topic}",
headers={"source": "unsubscribe_topic"}
)
except Exception as e:
logger.error(f"Error unsubscribing from topic: {e}")
raise HTTPException(
status_code=500,
detail=f"Error unsubscribing from topic: {str(e)}",
headers={"source": "unsubscribe_topic"}
)
[docs]
@router.get(
"/status",
summary="Get MQTT proxy status",
description="Get the current status and health of the MQTT proxy.",
)
async def get_mqtt_status() -> Dict[str, Any]:
"""Get MQTT proxy status and health information."""
proxies = get_proxies()
logger = get_logger()
if "mqtt" not in proxies:
logger.error("MQTT proxy not available")
raise HTTPException(
status_code=503,
detail="MQTT proxy not available",
headers={"source": "get_mqtt_status"}
)
mqtt_proxy: MQTTProxy = proxies["mqtt"]
try:
health_status = await mqtt_proxy.health_check()
logger.debug("Retrieved MQTT proxy health status")
return health_status
except Exception as e:
logger.error(f"Error getting MQTT status: {e}")
raise HTTPException(
status_code=500,
detail=f"Error getting MQTT status: {str(e)}",
headers={"source": "get_mqtt_status"}
)
[docs]
@router.get(
"/subscriptions",
summary="List current MQTT subscriptions",
description="Get list of currently active MQTT topic subscriptions.",
)
async def list_subscriptions() -> Dict[str, Any]:
"""List current MQTT subscriptions."""
proxies = get_proxies()
logger = get_logger()
if "mqtt" not in proxies:
logger.error("MQTT proxy not available")
raise HTTPException(
status_code=503,
detail="MQTT proxy not available",
headers={"source": "list_subscriptions"}
)
mqtt_proxy: MQTTProxy = proxies["mqtt"]
try:
subscriptions = {
"topics": list(mqtt_proxy._subscriptions.keys()),
"patterns": list(mqtt_proxy._subscription_patterns.keys()),
"count": len(mqtt_proxy._subscriptions) + len(mqtt_proxy._subscription_patterns)
}
logger.debug(f"Retrieved {subscriptions['count']} MQTT subscriptions")
return subscriptions
except Exception as e:
logger.error(f"Error listing subscriptions: {e}")
raise HTTPException(
status_code=500,
detail=f"Error listing subscriptions: {str(e)}",
headers={"source": "list_subscriptions"}
)