Events
EventBus is an in-process pub/sub mechanism. It's not a message broker — no persistence, no retries, no cross-process delivery. For durable async work, use the background_tasks module (Celery) or publish a domain event and have a handler enqueue a Celery task.
Events are the preferred way for modules to react to other modules' actions without creating a direct code dependency.
Defining events
Base class is Event from simple_module_core.events. Subclass per domain event. Use @dataclass:
# modules/orders/orders/contracts/events.py
from dataclasses import dataclass
from decimal import Decimal
from simple_module_core.events import Event
@dataclass
class OrderPlaced(Event):
order_id: int
customer_email: str
total: Decimal
@dataclass
class OrderCancelled(Event):
order_id: int
reason: strEvents live in contracts/events.py — the public surface of the module. Other modules import them with from orders.contracts.events import OrderPlaced.
Subscribing
Inside register_event_handlers:
# modules/invoices/invoices/module.py
from orders.contracts.events import OrderPlaced
class InvoicesModule(ModuleBase):
meta = ModuleMeta(
name="Invoices",
depends_on=["Orders"], # subscribe to its events
...
)
def register_event_handlers(self, bus: EventBus) -> None:
bus.subscribe(OrderPlaced, self._on_order_placed)
async def _on_order_placed(self, event: OrderPlaced) -> None:
await self._invoice_service.create_for(event.order_id)depends_on=["Orders"] is needed so Invoices.register_event_handlers runs after Orders has been discovered. Cross-module subscriptions should always have the corresponding depends_on.
Handlers can be sync or async — the bus detects coroutines and awaits them.
Publishing
Grab the bus from the framework services and await the publish:
class OrderService:
def __init__(self, session: AsyncSession, bus: EventBus) -> None:
self.session = session
self.bus = bus
async def place(self, data: OrderCreate) -> Order:
order = Order(**data.model_dump())
self.session.add(order)
await self.session.flush() # need the id
await self.bus.publish(OrderPlaced(
order_id=order.id,
customer_email=order.customer_email,
total=order.total,
))
return orderGet the bus as a FastAPI dependency:
# modules/orders/orders/deps.py
from fastapi import Depends, Request
from simple_module_core.events import EventBus
def _event_bus(request: Request) -> EventBus:
return request.app.state.sm.event_bus
EventBusDep = Annotated[EventBus, Depends(_event_bus)]MRO dispatch
The bus walks the event's MRO, so subscribing to a base class delivers every subclass event:
@dataclass
class OrderEvent(Event): ...
@dataclass
class OrderPlaced(OrderEvent): ...
@dataclass
class OrderCancelled(OrderEvent): ...
bus.subscribe(OrderEvent, audit_handler) # receives both
bus.subscribe(OrderPlaced, specific_handler) # receives only placedUse base-class subscriptions sparingly — they're magnets for unintended coupling when new subclasses appear.
Delivery semantics
- Synchronous from the publisher's perspective:
await bus.publish(...)resolves after all handlers have run (or raised). - In-process only. Not delivered to other uvicorn workers, other processes, or other hosts.
- No persistence. A crash mid-publish loses undelivered events.
- Handler failures don't stop the publisher. The bus logs handler exceptions and continues with the next handler. The publish itself does not raise.
If you need durable delivery across processes, handlers should enqueue a Celery task:
def register_event_handlers(self, bus: EventBus) -> None:
bus.subscribe(OrderPlaced, self._enqueue_invoice)
async def _enqueue_invoice(self, event: OrderPlaced) -> None:
from invoices.tasks import create_invoice_task
create_invoice_task.delay(event.order_id)Testing
Subscribe a spy in a test fixture:
@pytest.mark.asyncio
async def test_place_order_publishes_event(db_session, app):
received: list[OrderPlaced] = []
app.state.sm.event_bus.subscribe(
OrderPlaced, lambda e: received.append(e)
)
service = OrderService(db_session, app.state.sm.event_bus)
await service.place(OrderCreate(customer_email="a@b.c", total=Decimal("1")))
assert len(received) == 1
assert received[0].customer_email == "a@b.c"The app fixture from conftest.py provides a fresh app with a fresh EventBus per test.
Design guidelines
- Events describe facts about the past (
OrderPlaced), not commands (PlaceOrder). If you're tempted to name an event imperatively, you actually want a service call. - Keep events small and stable. They're a public API. Breaking the shape of
OrderPlacedbreaks every subscriber. Add new fields with defaults; don't remove or rename. - Don't use events for request/response. If the publisher needs a result, call a service directly. Events are fire-and-observe.
- Avoid circular publishing. Handler A publishes event X; handler B (subscribed to X) publishes event Y; handler C (subscribed to Y) publishes X. The bus doesn't detect this — it'll run forever. Guard with a flag or redesign the flow.