Skip to content

background_tasks

Celery + Redis task queue with persistent history, an admin UI for monitoring + retrying tasks, and a live worker dashboard. Other modules drop @celery.task-decorated functions into their tasks.py and they're auto-discovered at boot.

ModuleMeta

FieldValue
nameBackgroundTasks
route_prefix/api/background_tasks
view_prefix/admin/background-tasks
depends_on["Users"]

Writing a task

python
# modules/orders/orders/tasks.py
from celery import shared_task

@shared_task(name="orders.send_receipt")
def send_receipt(order_id: int) -> None:
    ...

That's it. Celery's autodiscover_tasks(packages, related_name="tasks") finds it at worker startup — no extra registration. From request code:

python
from orders.tasks import send_receipt

send_receipt.delay(order_id=42)

Every dispatch / start / completion / failure / retry / revoke is recorded in the background_tasks_task_execution table by Celery signal handlers, so you can see history even after the Celery result backend has expired the result.

Scheduling periodic work (beat)

A module schedules recurring work the same way it registers tasks — by shipping a tasks.py — and additionally exporting a module-level BEAT_SCHEDULE dict. build_celery merges every installed module's BEAT_SCHEDULE into the beat schedule at boot (identically in the web and worker processes), so make beat runs them alongside the built-ins:

python
# modules/invoices/invoices/tasks.py
from celery import shared_task
from celery.schedules import crontab

@shared_task(name="invoices.generate_recurring")
def generate_recurring() -> int:
    ...

# Discovered by build_celery and merged into the beat schedule.
BEAT_SCHEDULE = {
    "invoices-generate-recurring-daily": {
        "task": "invoices.generate_recurring",
        "schedule": crontab(hour=6, minute=0),  # 0 6 * * *
    },
}

Entry values are plain Celery beat entries: schedule accepts a number of seconds, a crontab(...), or a solar(...). The two built-in entry names (background-tasks-sweep-stuck, background-tasks-purge-old) are authoritative — a module reusing one is ignored with a warning.

Don't reach for from celery.signals import on_after_configure. It's an app-instance signal (not a member of celery.signals, so the import raises), and build_celery runs conf.update(...) — which fires it — before autodiscover_tasks imports your tasks.py, so a handler would miss the window. The declarative BEAT_SCHEDULE dict above is the supported mechanism. If you need entries computed at runtime, Celery's @app.on_after_finalize.connect + sender.add_periodic_task(...) also works.

Running workers

CommandUse case
make workerLocal Celery worker against the configured broker.
make beatLocal beat scheduler (sweep_stuck_tasks, purge_old_executions, plus any beat schedules other modules add).
make worker-dockerWorker + beat services via docker compose — matches the prod image.

Beat persists its schedule (celery.beat:PersistentScheduler) so it survives restarts.

Routes

Admin API

All require background_tasks.view; retry additionally needs background_tasks.manage.

Method + pathBody / responseNotes
GET /api/background_tasks/admin/executions?status=&task_name=&page=&per_page=TaskExecutionListResponseper_page max 200
GET /api/background_tasks/admin/executions/{execution_id}TaskExecutionDetail404 if missing
POST /api/background_tasks/admin/executions/{execution_id}/retryTaskExecutionDetail409 unless status is failed or stuck
GET /api/background_tasks/admin/workersWorkerSnapshotlive ping; consumed by the Workers page

View

Method + pathInertia componentPermission
GET /admin/background-tasks/BackgroundTasks/Indexbackground_tasks.view
GET /admin/background-tasks/workersBackgroundTasks/Workersbackground_tasks.view
GET /admin/background-tasks/{execution_id}BackgroundTasks/Detailbackground_tasks.view

Public contracts

python
from background_tasks.contracts.events import TaskFailed, TaskRetried
from background_tasks.contracts.schemas import (
    TaskExecutionListItem, TaskExecutionDetail, TaskExecutionListResponse,
    WorkerInfo, WorkerSnapshot,
)
ClassPurpose
TaskFailed (event)Published when a task transitions to failed. Fields: task_execution_id, task_name, exception_type.
TaskRetried (event)Published when an admin retries via the UI. Fields: original_id, new_id, task_name.
TaskExecutionListItemCompact row for the listing table.
TaskExecutionDetailFull record incl. args, kwargs, result, traceback.
WorkerInfo / WorkerSnapshotLive worker probe response shape.

Models

TaskExecution (table background_tasks_task_execution)

ColumnTypeNotes
idUUIDPK; row stays stable across the task's lifecycle
celery_task_idstr(64)indexed; updated by signal handlers
task_namestr(255)indexed
statusTaskStatusindexed; one of pending running success failed stuck revoked retrying
queuestr(64)default "default"
args / kwargsJSON
resultJSON | None
tracebackstr | None
exception_typestr(255) | None
workerstr(255) | Nonehostname that ran it
retriesint
retried_from_idUUID | Noneself-FK; tracks retry chains
queued_at / started_at / finished_at / heartbeat_atdatetime
auditfrom AuditMixin

Composite index on (status, queued_at) keeps the listing snappy under load.

Settings

DB-backed; defaults are in BackgroundTasksSettings. Several are marked requires_restart=True because Celery reads them at process start.

FieldDefaultNotes
broker_urlredis://localhost:6379/0restart required; rejected as localhost in production
result_backendredis://localhost:6379/1restart required
task_default_queue"default"restart required
task_always_eagerFalseruns tasks in-process (test mode)
task_eager_propagatesTruere-raises eager-mode errors
stuck_after_seconds300heartbeat-staleness threshold
stuck_sweep_interval_seconds60beat cadence for the stuck sweep
purge_interval_seconds86_400beat cadence for old-row purge
retention_days14how long to keep terminal rows
max_retries3informational; tasks define their own policies

Bootstrap env-var equivalents (SM_BG_TASKS_*) only seed pydantic defaults at first boot — once a value lives in the DB, it's authoritative.

Permissions

CodePurpose
background_tasks.viewlist + detail + worker snapshot
background_tasks.manageretry failed / stuck tasks
LabelURLIconSectionGroupOrderRoles
Background Tasks/admin/background-tasksactivitySIDEBARAdministration120["admin"]

Beat schedule (built-in)

TaskScheduleWhat
background_tasks.sweep_stuck_tasksevery stuck_sweep_interval_secondsflips running rows with stale heartbeats to stuck
background_tasks.purge_old_executionsevery purge_interval_secondsdeletes terminal rows older than retention_days

A background_tasks.demo_echo task is also registered for smoke tests; not scheduled.

Other modules contribute their own periodic entries via tasks.BEAT_SCHEDULE — see Scheduling periodic work (beat).

Events

Published (and consumable from any other module):

  • TaskFailed — fired from task_failure signal when an event bus is bound (web process). Workers don't fire it directly.
  • TaskRetried — fired from the admin retry path.

Worker introspection

WorkerInspector (background_tasks/worker_inspector.py) wraps celery.control.inspect():

MethodReturns
snapshot()WorkerSnapshot — one round-trip ping + stats + active queues + active tasks. Sets broker_reachable=False and an error string if it can't reach the broker.

It's synchronous; the view route calls it via asyncio.to_thread.

Celery signal hooks

Synchronous handlers in signals.py, all bound through Celery's signals.<name>.connect, persisting through a separate sync SQLAlchemy engine (sync_db.py) so they don't have to spin up an event loop:

SignalWhat it writes
before_task_publishinserts a pending row
task_prerunflips to running, stamps started_at + heartbeat_at
task_postrunrefreshes heartbeat_at
task_successflips to success, stores result, sets finished_at
task_failureflips to failed, stores traceback + exception_type, publishes TaskFailed
task_retryflips to retrying, increments retries
task_revokedflips to revoked

Production deployments running multiple workers can write concurrently because every update keys off celery_task_id.

Inertia pages

  • BackgroundTasks/Index.tsx — paginated execution list with status / task-name filters.
  • BackgroundTasks/Detail.tsx — full execution view with retry button (background_tasks.manage).
  • BackgroundTasks/Workers.tsx — live worker dashboard.
  • Components: ExecutionRow.tsx, RetryConfirmDialog.tsx.

Locales

Top-level keys in background_tasks/locales/en.json: index, filters, status (per-state labels), table, detail, retry_dialog, toasts.

Released under the MIT License.