Laravel-style MQTT routing for Python: controllers, middleware, jobs, and shared-subscription scaling without callback spaghetti.
Status: Beta. Use RouteMQ in test/staging while we harden the v1 surface. The framework is fully tested (581 unit tests) and supply-chain hardened, but APIs may shift before 1.0.
Documentation: docs/ · Source: github.com/ardzz/RouteMQ · PyPI: routemq
RouteMQ is a Python 3.12+ MQTT application framework that turns topics into async controller methods through middleware chains, with optional background jobs and shared-subscription workers.
RouteMQ gives you:
- Route topics like web routes. Declare
devices/{id}/statusonce; receiveidas a typed handler argument. - Controllers and middleware. Keep handlers in
app/controllers; layer auth, logging, rate limiting, and validation as reusable middleware. - Async by default. Use async Redis, MySQL (SQLAlchemy), ClickHouse, and job dispatch naturally inside handlers. RouteMQ bridges
paho-mqtt's sync callbacks for you. - Shared-subscription scaling. Flip
shared=Trueon a high-volume route; RouteMQ spawns worker processes against$share/<group>/<topic>without you wiring multiple clients. - Background jobs. Laravel-style
Jobclasses with retries, delays, timeouts, and Redis or MySQL queue backends. - Built-in observability. Optional
/health,/ready, and/metricsHTTP endpoints, lifecycle counters, latency histograms, and OpenTelemetry-shaped spans. No mandatory vendor backend. - Optional integrations. Redis, MySQL, ClickHouse for time-series telemetry, and a Prometheus client adapter. All opt-in extras.
- Supply-chain hardened. OpenSSF Scorecard, SLSA L3 provenance, signed CycloneDX SBOMs, Bandit, pip-audit, and Dependabot on every release.
Install the mode you need:
| Install | Use it for |
|---|---|
routemq |
Runtime engine: routing, middleware, jobs, MySQL queue, app boot. |
routemq[cli] |
Runtime plus the routemq new scaffolder. Start here for a new project. |
routemq[redis] |
Runtime plus Redis support for queues, cache, rate limits, and shared state. |
routemq[all] |
CLI, Redis, Prometheus, and ClickHouse extras in one install. |
uv add "routemq[cli]" # add to an existing uv project
pip install "routemq[cli]" # install into the active Python environmentCreate a project and one route:
uvx --from "routemq[cli]" routemq new sensor-demo
# or, after pip install "routemq[cli]": routemq new sensor-demo
cd sensor-demo# app/controllers/device_controller.py
from routemq.controller import Controller
class DeviceController(Controller):
@staticmethod
async def status(device_id, payload, client):
print(f"device {device_id}: {payload}")
return {"ok": True}
# app/routers/devices.py
from routemq.router import Router
from app.controllers.device_controller import DeviceController
router = Router()
router.on("devices/{device_id}/status", DeviceController.status, qos=1)Point .env at a broker and run the app:
MQTT_BROKER=test.mosquitto.org
MQTT_PORT=1883uv run routemq run
mosquitto_pub -h test.mosquitto.org -t devices/42/status -m '{"temp":21}'RouteMQ imports app.routers.*, subscribes to devices/+/status, and calls DeviceController.status(device_id="42", payload={"temp": 21}, ...).
| If you need... | Use |
|---|---|
| Low-level MQTT protocol control, custom session/QoS handling | paho-mqtt |
| A web-framework-style structure for an MQTT-first app | RouteMQ |
| Multi-broker streaming across Kafka, RabbitMQ, NATS, Redis, MQTT | FastStream |
| General distributed task queues independent of a broker protocol | Celery |
RouteMQ sits on top of paho-mqtt. You keep proven protocol behavior and add structure, async handlers, and scaling.
The minimal example above scales up cleanly:
from routemq.router import Router
from app.middleware.rate_limit import RateLimit
from app.controllers.device_controller import DeviceController
router = Router()
with router.group(prefix="devices", middleware=[RateLimit(60)]) as devices:
devices.on(
"{device_id}/status",
DeviceController.handle_status,
qos=1,
shared=True,
worker_count=3,
)- The
{device_id}token compiles to a regex with a named group and to a+wildcard for the MQTT subscription. shared=Trueswitches the subscription to$share/<group>/devices/+/statusand spawns three worker processes.RateLimit(60)runs as middleware before the handler. Auth, logging, and validation work the same way.
Push slow work out of the MQTT handler. Register concrete jobs so workers can deserialize them safely:
# app/jobs/send_alert_job.py
from routemq.job import Job
@Job.register
class SendAlertJob(Job):
queue = "alerts"
max_tries = 3
retry_after = 10
def __init__(self):
super().__init__()
self.device_id = None
self.payload = {}
async def handle(self):
print(f"alert for {self.device_id}: {self.payload}")Dispatch the job from a controller or handler:
from routemq.queue import dispatch
from app.jobs.send_alert_job import SendAlertJob
async def handler(device_id, payload, client):
if payload.get("status") == "critical":
job = SendAlertJob()
job.device_id = device_id
job.payload = payload
await dispatch(job)
return {"ok": True}Run a worker:
routemq queue-work --queue alerts --connection redisQueue backends: Redis with routemq[redis], or MySQL with base routemq when ENABLE_MYSQL=true.
A sensor pipeline usually has three parts: MQTT routing, queued processing, and a local stack with a broker plus Redis.
# app/routers/sensors.py
from routemq.router import Router
from app.controllers.sensor_controller import SensorController
router = Router()
router.on("sensors/{device_id}/telemetry", SensorController.ingest, qos=1)# app/controllers/sensor_controller.py
from routemq.controller import Controller
from routemq.queue import dispatch
from app.jobs.store_telemetry_job import StoreTelemetryJob
class SensorController(Controller):
@staticmethod
async def ingest(device_id, payload, client):
job = StoreTelemetryJob()
job.device_id = device_id
job.payload = payload
await dispatch(job)
return {"accepted": True, "device_id": device_id}# app/jobs/store_telemetry_job.py
from routemq.job import Job
@Job.register
class StoreTelemetryJob(Job):
queue = "telemetry"
max_tries = 5
def __init__(self):
super().__init__()
self.device_id = None
self.payload = {}
async def handle(self):
temperature = self.payload.get("temperature")
print(f"store {self.device_id}: temperature={temperature}")Run it against a local broker and Redis queue:
# docker-compose.yml
services:
mosquitto:
image: eclipse-mosquitto:2
ports: ["1883:1883"]
redis:
image: redis:7-alpine
ports: ["6379:6379"]MQTT_BROKER=localhost
MQTT_PORT=1883
ENABLE_REDIS=true
QUEUE_CONNECTION=redisdocker compose up -d
uv run routemq run
uv run routemq queue-work --queue telemetry --connection redis
mosquitto_pub -h localhost -t sensors/pump-7/telemetry -m '{"temperature":31.2}'RouteMQ ships health, readiness, and OpenMetrics endpoints, off by default. Set METRICS_HTTP_ENABLED=true to expose them:
curl http://localhost:8080/health # liveness
curl http://localhost:8080/ready # MQTT readiness
curl http://localhost:8080/metrics # OpenMetrics / Prometheus textBuilt-in metric families include mqtt_messages_*, router_dispatch_*, queue_job_*, queue-depth gauges, tsdb_write_*, and latency histograms for each. Spans follow OpenTelemetry-shaped semantics (db.system, db.operation, messaging.system, kind=client|consumer|producer|internal).
For details: Metrics · Health checks · Pool tuning evidence
uv add routemq # base runtime
uv add "routemq[cli]" # scaffolder and rich terminal prompts
uv add "routemq[redis]" # Redis queue + rate limiting backend
uv add "routemq[clickhouse]" # ClickHouse time-series telemetry
uv add "routemq[prometheus]" # multiprocess-safe Prometheus client adapter
uv add "routemq[all]" # everything above plus CLI
# pip works too:
pip install routemq
pip install "routemq[cli]"
pip install "routemq[redis]"
pip install "routemq[all]"The scaffolder can drop a docker-compose.yml with Redis, MySQL, the app, and queue workers:
uvx --from "routemq[cli]" routemq new my-app --with-docker --with-redis --with-mysql --with-queue
cd my-app
docker compose up -d
docker compose up -d --scale queue-worker-default=5For a local MQTT broker, add Mosquitto to the same compose file:
services:
mosquitto:
image: eclipse-mosquitto:2
ports:
- "1883:1883"Then set MQTT_BROKER=mosquitto for containers, or MQTT_BROKER=localhost when running RouteMQ on your host.
- Getting Started, installation, first route, environment
- Architecture, message flow diagram and runtime components
- Configuration, every environment variable, with defaults
- Routing · Controllers · Middleware
- Queue System, jobs, workers, drivers
- Rate Limiting, strategies and Redis backend
- Redis · Database · TSDB / ClickHouse
- Monitoring, metrics, health, traces
- Docker Deployment · Testing
- Examples · API Reference · FAQ
- Release Conformance, SLSA, Scorecard, SBOM, SemVer
- Security Policy, private vulnerability reporting and supported versions
- Contributing, issues, PRs, tests, coding standards
- Code of Conduct
- Changelog
- Issue Tracker
git clone https://github.com/ardzz/RouteMQ.git
cd RouteMQ
uv sync --all-extras --dev
uv run python run_tests.py # 581 tests, ~3 secondsSee TEMPLATE.md if you want to fork the framework rather than depend on the published wheel.
MIT, see LICENSE.
