Session lifecycle
Each request opens exactly one AsyncSession. The framework commits only when there are pending writes; read-only requests rollback. Service code should not call session.commit() directly.
The get_db dependency
# simple_module_db.session
async def get_db(request: Request) -> AsyncIterator[AsyncSession]:
engine = request.app.state.sm.db.engine_for(...)
async with AsyncSession(engine) as session:
try:
yield session
if _has_writes(session):
await session.commit()
else:
await session.rollback()
except Exception:
await session.rollback()
raiseUsage in endpoints:
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from simple_module_db.session import get_db
SessionDep = Annotated[AsyncSession, Depends(get_db)]
@router.post("")
async def create_order(data: OrderCreate, session: SessionDep): ...Most modules wrap get_db in their own deps.py so the service is injected rather than the raw session:
# modules/orders/orders/deps.py
from orders.service import OrderService
def _order_service(session: SessionDep) -> OrderService:
return OrderService(session)
OrderServiceDep = Annotated[OrderService, Depends(_order_service)]The has_writes flag
To distinguish "this request wrote something" from "this request only read", an after_flush event listener sets session.info["has_writes"] = True. The dependency checks:
if session.info.get("has_writes") or session.new or session.dirty or session.deleted:
await session.commit()
else:
await session.rollback()Why both? session.new/dirty/deleted show pending writes that haven't flushed yet; has_writes catches the case where writes flushed but were already synced. Together they cover every path to "we touched the DB with intent to change it."
Why not always commit?
- Cheaper. A rollback on a clean session is a single Postgres message; a commit with no writes still goes through two-phase round-trips on some drivers.
- Observability. The write/read distinction is useful in profiling and logs — you can answer "how many requests mutated data?" without guessing.
- Safety. If a GET handler accidentally stages a write (rare, but happens with
session.merge), the rollback prevents silent persistence.
Why service code shouldn't commit
The per-request session is owned by the dependency. If your service calls commit() mid-request, two problems arise:
- The dependency commits again on exit — harmless but confusing.
- If a later step in the request fails, the partial writes are already persisted and can't be rolled back.
Instead, flush when you need a DB-assigned value (e.g. an auto-generated id):
async def create(self, data: OrderCreate) -> OrderOut:
order = Order(**data.model_dump())
self.session.add(order)
await self.session.flush() # populates order.id, stays inside the transaction
# further logic can see order.id but won't persist until the dependency commits
return OrderOut.model_validate(order)Flushing sends the INSERT but keeps the transaction open. Rollback still works until the dependency commits.
Manual transactions
If you need finer control — e.g. a background worker that processes many items in its own transactions — use DatabaseState directly:
from simple_module_db.state import DatabaseState
async def worker(db: DatabaseState):
async with db.session() as session:
async with session.begin():
# explicit transaction block
session.add(...)
# commits on exit, rolls back on exceptionThe async with session.begin() pattern opens a sub-transaction you control. Use it in code that runs outside a request scope.
Sessions and MultiTenantMixin
get_db captures request.state.tenant_id into the session's info dict. The tenant listeners read it to filter SELECTs and stamp INSERTs. This is why sessions are request-scoped — sharing one across tenants would silently leak data.
For cross-tenant admin ops, use no_tenant() context or the skip_tenant_filter=True execution option. See Mixins → MultiTenantMixin.
Sessions in tests
The db_session fixture in conftest.py creates a fresh in-memory SQLite DB, creates every module's tables, stamps alembic_version at head, and yields an AsyncSession. Each test gets a fresh one — no shared state, no transaction rollback hacks.
@pytest.mark.asyncio
async def test_order_insert(db_session):
order = Order(customer_email="a@b.c", total=Decimal("1"))
db_session.add(order)
await db_session.flush()
assert order.id is not NoneFor endpoint tests, use client or authenticated_client — they share the same db_session via dependency override, so writes performed by the handler are visible to test-side queries.
Commit vs. flush checklist
| Situation | Call |
|---|---|
| Inside an endpoint handler / service, need DB-assigned id | await session.flush() |
| Inside an endpoint handler / service, done with work | nothing — dependency commits on exit |
| Outside a request (worker, CLI, lifespan) | explicit async with session.begin(): |
| A handler that reads-only but wants to persist an audit row | stage the row; the dependency detects pending writes and commits |
| A handler that must explicitly discard partial changes | raiseing propagates to the dependency, which rolls back |
If you find yourself writing await session.commit() inside a service, step back — there's a cleaner way using flush or a dedicated transactional helper.