The ingest worker is the operational backend for Beacon Pulse. It handles uploads, queue-driven processing, replay and regeneration workflows, media analysis, quota controls, and scheduled repair jobs.
Access Model
All ingest admin routes are gated in code.
- If
ADMIN_TOKENorADMIN_SECRETis set, requests must send a matchingx-admin-tokenorauthorizationheader. - Otherwise the worker accepts
cf-access-jwt-assertion.
Upload Paths
| Path | Method | Current role |
|---|---|---|
/presign | POST | Placeholder helper that returns an object key and a sample upload URL; not the live admin upload path |
/upload | POST | Direct upload for smaller exports |
/upload/multipart/init | POST | Start multipart upload |
/upload/multipart/part | POST | Upload one multipart chunk |
/upload/multipart/complete | POST | Finalize multipart upload, create export row, queue processing |
/upload/multipart/abort | POST | Cancel multipart upload |
Current upload behavior:
- the admin UI accepts
.txt,.chat, and.zip - non-zip text exports are stored as
.txt - direct Worker uploads reject payloads over roughly 75 MiB
- multipart uploads currently use 16 MiB parts
Admin API Surface
Export management
| Path | Method | Purpose |
|---|---|---|
/files | GET, DELETE | List exports or delete one raw export and its record |
/exports/progress | GET | Poll processing status |
/exports/details | GET | Return the richer detail payload used by the admin modal |
/clear | POST | Clear daily digests, weekly summaries, and optionally message hashes for a date range |
/communities/{community_id} | DELETE | Delete a whole community from R2 and D1 |
Regeneration and replay
| Path | Method | Purpose |
|---|---|---|
/regenerate/daily | POST | Rebuild one daily digest and then update the containing week |
/regenerate/weekly | POST | Queue weekly regeneration for one week or week range |
/regenerate/weekly/all | POST | Queue all-weeks weekly regeneration for a community |
/replay/export | POST | Re-enqueue one export from raw storage |
/replay/all | POST | Re-enqueue all matching exports for a community, with optional clear-first behavior |
/replay/stuck | POST | Re-enqueue stalled exports |
Media and quota
| Path | Method | Purpose |
|---|---|---|
/media/list | GET | List media rows with filters |
/media/replay | POST | Reset selected media back to pending analysis and requeue it |
/quota/status | GET | Return text and media quota status, usage breakdown, model info, and bypass state |
/quota/set | POST | Manually set quota usage |
/quota/bypass | GET, POST | Read or change text/media bypass flags |
/quota/config | POST | Update quota configuration |
/pipeline/daily-config | GET, POST | Read or change daily digest pipeline mode and throttling |
Backfill tools
| Path | Method | Purpose |
|---|---|---|
/admin/backfill-concept-pillars | POST | Backfill concept-pillar tags |
/admin/backfill-strain-daily | POST | Recompute daily strain |
/admin/backfill-strain-weekly | POST | Recompute weekly strain |
/admin/backfill-media-summaries | POST | Rebuild media-summary enrichment |
Processing Flow
- Create or finalize an upload.
- Store the raw export in R2 and create an
exportsrow in D1. - Enqueue work on
beacon-pulse-uploads. - Parse the export, deduplicate messages against
message_hashes, and write daily digests. - Generate weekly summaries only when a week has at least 3 days of data or 20 total messages.
- For ZIP uploads, queue media-analysis work for images, audio, and sampled video frames.
Deduplication And All-Duplicate Exports
Message deduplication runs per-day per-source. For each day in the export, the worker computes SHA-256 content hashes, checks against existing rows in message_hashes, and classifies messages as new or duplicate.
Media content-hash deduplication is batched: before the media-artifact loop, the worker collects all content hashes and issues a single SELECT ... WHERE content_hash IN (...) query (chunked at 24 entries to stay within D1’s 50-variable limit), then uses a Map lookup per artifact instead of one query per artifact.
When new_message_count = 0 (all messages in the export are already present in message_hashes), the pipeline uses the duplicate-only path (daysWithOnlyDuplicates) to still generate AI daily digests from the existing messages. This handles the common case where the same export is re-uploaded after the source was added, or after a pipeline failure that wrote hashes but not digests.
The pipeline processes up to 15 days per queue invocation. When more days remain, the worker re-enqueues itself for the next batch. Progress is tracked in exports.days_processed and exports.days_total.
Important: when days_processed has already reached days_total from a previous run, the pipeline still generates digests for any days that have hashes but no corresponding rows in daily_digests. The days_processed counter is not the source of truth for whether digests actually exist — the worker always queries daily_digests directly to determine which days still need processing.
If AI quota is exhausted during this process, the queue message is retried with a delay until the daily quota resets at midnight UTC. This applies to both the new-messages path and the duplicate-only path.
Source Assignment
Uploads without an assigned source_id are recorded with status = 'pending_source' and not processed until a source is assigned.
If a community has exactly one source configured in chat_sources, the worker auto-assigns it. Otherwise the admin must assign a source through the UI or the /admin/exports/assign-source endpoint before the export will be processed.
Scheduled Self-Healing
The ingest worker runs two cron schedules:
*/5 * * * *(high-priority): stuck-export detection + all active-state retry sub-tasks, run in parallel.*/15 * * * *(low-priority): deferred media batch drain and raw export cleanup.
Splitting the schedules prevents slow cleanup work from delaying the core retry loop.
High-priority sub-tasks (every 5 min):
| Task | What it does |
|---|---|
finalizeCompletedWeeklyExports | Marks weekly_processing exports completed when weekly work is already done |
retryRecoverableFailedExports | Resets and re-enqueues recoverable failed exports (up to 10 per tick) |
retryRecoverableFailedMedia | Same for failed media items |
resetStuckDeferredProcessingMedia | Resets media stuck in deferred_processing for > 5 minutes back to pending_analysis |
requeueStalePendingMediaJobs | Re-enqueues stale pending_analysis media jobs older than 15 minutes |
Low-priority sub-tasks (every 15 min):
| Task | What it does |
|---|---|
enqueueDeferredMediaBatches | Resumes up to 12 deferred exports per tick |
cleanupRetainedRawExports | Deletes R2 raw exports past retention window (batch of 75 per tick) |
The scheduled retry only looks at processing, queued, and weekly_processing exports. Exports with status = 'completed' are not automatically retried, even if they have zero daily digests. If an export completed with no digests due to quota exhaustion, reset its status to processing (e.g., via D1 or the replay endpoint) to re-enter the retry cycle.
See queue-architecture for the stuck-export detection query and full sub-task reference.
Multi-Source Write Safety
daily_digests and weekly_summaries_public had a legacy primary key that did not include source_id — it was added via ALTER TABLE with only a secondary unique index. Writing with INSERT OR REPLACE would resolve the conflict on the PK and silently delete a row for a different source sharing the same day or week. This was fixed by migration fix_daily_digests_weekly_summaries_pk.sql (2026-04-25), which recreated both tables with source_id in the primary key.
All digest and summary writes in the ingest worker use explicit INSERT INTO ... ON CONFLICT(community_id, source_id, day_date/week_start) DO UPDATE SET upserts. Any new code writing to these tables should follow the same pattern.
Media Analysis
ZIP exports trigger media-analysis jobs for images, audio, and sampled video frames. Each artifact becomes a media_analysis queue message processed independently.
Image analysis: Stage 1 runs the image classifier and object detector in parallel (Promise.all). Stage 2 (vision caption) is conditional on Stage 1 results.
Video analysis (two-phase):
- Phase 1 (parallel): all frame timestamps are extracted and their Stage 1 classifier + object detector calls run in parallel across all frames.
- Phase 2 (sequential): Stage 2 caption decisions are applied one frame at a time to enforce the
maxVideoFrameStage2PerExportcap without race conditions.
Quota for both text and media AI is tracked in D1. The media quota bypass flag is read from KV (60-second TTL) when the optional beacon_pulse_cache KV namespace is provisioned, falling back to D1 on a cache miss. See quota-model for quota limits, exhaustion behavior, and the bypass flag.
Concept graph pillar assignment is asynchronous: the weekly summary pipeline saves the concept graph immediately with keyword-based pillar assignments (synchronous, no AI), then enqueues a backfill_concept_pillars queue message. The consumer runs the AI pillar classifier over each node and updates the stored graph. This removes N sequential AI calls from the weekly processing hot path. During the brief window before AI refinement completes, concept graph nodes may have keyword-derived or null pillar labels; the public API handles null pillar fields gracefully.
Current Rules And Caveats
- uploads without a source are recorded as
pending_source /replay/exportfails if the raw object is already gone from R2/replay/allskips exports whose raw object is missing and reports how many were skipped/replay/stuckdefaults to 30 minutes at the API layer if nomax_age_minutesis provided, even though the admin UI starts at 10 minutes- raw export cleanup defaults are 72 hours after successful processing and 168 hours after failure when delete-after-processing is enabled and no pending media remains
new_message_count = 0does not mean the export produced no digests — it means all messages were already inmessage_hashes; digests are still generated via the duplicate-only path
Verified against beacon-platform/apps/pulse-ingest/src/index.ts and the admin UI behavior on April 25, 2026.