Taming the Restarting Pipeline: An SSE & tRPC Saga
Ever had a background process mysteriously restart just because a user navigated away and then back? We wrestled with an elusive pipeline bug that taught us critical lessons about SSE, idempotency, and the subtle art of state management.
The Invisible Chaos of Restarting Pipelines
Picture this: You're building an intelligent agent that can refactor or auto-fix code. It's a multi-phase process – scanning, analyzing, generating, applying. Users kick off a job, see a beautiful real-time progress stream, and feel productive. But then, a subtle, infuriating bug emerges: if a user navigates away from the detail page of an active job and then navigates back, the entire pipeline restarts from Phase 1. Poof! All progress gone, a new job silently spawned.
This was the challenge that landed on my plate last month. Our goal was clear: fix the pipeline restart bug where navigating back to a running AutoFix/Refactor detail page restarted the entire pipeline. The fix, now committed as bd96b5a on main, brought a wave of relief and some valuable lessons.
The Problem: Unconditional Execution Meets SSE Reconnects
Our system uses Server-Sent Events (SSE) to stream real-time updates from our backend pipeline to the frontend. This provides that satisfying, interactive progress bar. Each AutoFix or Refactor job has a unique ID, and our SSE routes (src/app/api/v1/events/auto-fix/[id]/route.ts and src/app/api/v1/events/refactor/[id]/route.ts) were designed to listen for these updates.
Here's where the insidious bug hid: these SSE routes unconditionally called runRefactor() or runAutoFix() on every connection.
Think about the user journey:
- User starts a Refactor.
- SSE connection opens,
runRefactor()is called, pipeline begins. - User navigates away (e.g., to the dashboard). The SSE connection silently closes.
- Crucially, the
for awaitloop withinrunRefactor()(which yields events from the pipeline generator) continues to consume events. Writes to the now-closed SSE stream fail via oursafeEnqueuehelper, but the pipeline itself runs to completion in the background, updating the database. So, the job does finish on the server. - User navigates back to the Refactor detail page.
- A new SSE connection opens.
- Because the
runRefactor()function is called again unconditionally, a brand new pipeline starts from scratch, overwriting the run's status in the database back to "scanning," even if the original job had completed.
This was a classic case of an operation that should be idempotent but wasn't, coupled with a misunderstanding of SSE lifecycle behavior. The frontend was expecting to reconnect and pick up the status, but the backend was effectively saying, "New connection? Must be a new job!"
The Solution: A Guard, and an Elegant Dance Between SSE and Polling
We considered an alternative: adding complex resume logic inside the pipeline generators themselves. This would involve checking the current phase in the database and skipping already completed steps. However, this felt overly complex, fragile, and introduced ambiguities about what "mid-phase" state truly means. We opted for a simpler, more robust approach.
The fix involved a two-pronged strategy:
1. The SSE Route Guard
The core of the server-side fix was to introduce a guard in our SSE routes. The runAutoFix() and runRefactor() functions should only be called if the run's status is "pending". For any other status (active, completed, failed), the SSE stream should simply send a single status event and then close immediately.
Here's a simplified look at the change in src/app/api/v1/events/auto-fix/[id]/route.ts:
// Before (simplified):
// export async function GET(request: Request, { params }: { params: { id: string } }) {
// // ... setup response headers ...
// const run = await getAutoFixRun(params.id); // Fetches current run status
// const eventStream = runAutoFix(params.id); // ALWAYS starts a new pipeline
// for await (const event of eventStream) {
// await safeEnqueue(controller, event);
// }
// // ... cleanup ...
// }
// After (simplified):
export async function GET(request: Request, { params }: { params: { id: string } }) {
// ... setup response headers ...
const run = await getAutoFixRun(params.id); // Fetch current run status
if (run.status === "pending") {
// Only start the pipeline if it's truly pending
const eventStream = runAutoFix(params.id);
for await (const event of eventStream) {
await safeEnqueue(controller, event);
}
} else {
// For active/completed/failed runs, just send the current status
// and close the stream. The client will poll for ongoing updates.
await safeEnqueue(controller, { type: "status", data: run });
}
// ... cleanup ...
}
This simple if statement was the game-changer. It ensures that reconnecting clients for non-pending jobs don't trigger a new pipeline.
2. Client-Side Polling for Active Runs
With the SSE stream now closing immediately for non-pending runs, the frontend needed a way to continue receiving updates for jobs that were already active or completed in the background. This is where client-side polling came in.
For the src/app/(dashboard)/dashboard/auto-fix/[id]/page.tsx (and its refactor counterpart), we added a refetchInterval to our tRPC query specifically when the run is in one of its ACTIVE_STATUSES.
// src/app/(dashboard)/dashboard/auto-fix/[id]/page.tsx (simplified)
import { api } from '@/trpc/react'; // Assuming tRPC client
const ACTIVE_STATUSES = ['scanning', 'analyzing', 'generating', 'applying'];
export default function AutoFixDetailPage({ params }: { params: { id: string } }) {
const { data: run, isLoading, error } = api.autoFix.getRun.useQuery(
{ id: params.id },
{
// Only poll if the run is active
refetchInterval: run && ACTIVE_STATUSES.includes(run.status) ? 3000 : false,
staleTime: Infinity, // Prevent unnecessary refetches when not polling
}
);
// ... render UI based on run status ...
}
Now, the dance is elegant:
- When a run is
pending, the SSE connection is active, streaming real-time events. - Once the run transitions to an
ACTIVE_STATUS, the SSE stream (if reconnected) will immediately close after sending the current status. - The frontend then switches to polling every 3 seconds to fetch the latest status from our tRPC endpoint, ensuring the UI stays updated without restarting the pipeline.
Lessons Learned: Beyond the Fix
This debugging session wasn't just about fixing a bug; it was a masterclass in several key areas:
- Idempotency is King: Operations that modify state should ideally be idempotent. Calling
runAutoFix()multiple times with the same ID should either do nothing (if already complete) or continue from where it left off, not restart. Our fix pushed the idempotency check to the API gateway, which was simpler than embedding it deep in the pipeline logic. - Understand Your Connection Lifecycles: Especially with real-time technologies like SSE or WebSockets, a client reconnecting isn't always a signal for a fresh start. It could be a brief network blip, a tab switch, or a navigation event. Design your backend to handle these gracefully, often by re-sending the current state rather than re-initiating a process.
- SSE vs. Polling: Choose the Right Tool: SSE is fantastic for true, low-latency, event-driven updates. But for periodically checking the status of a long-running background job where a few seconds of delay are acceptable, polling can be simpler and more robust, especially when dealing with client reconnects and state management. We now have a hybrid approach that leverages the strengths of both.
- The Danger of Implicit State Changes: The "silent consumption" of generator events was a subtle but critical detail. Even though the stream was closed, the pipeline kept running, updating the DB. This meant the server knew the job was complete, but the client (on reconnect) would see it restart, creating a frustrating desync.
What's Next? Robustness and User Experience
With the core bug squashed, a few immediate next steps and considerations emerged:
- Manual Testing: Thoroughly test both AutoFix and Refactor runs: start, navigate away mid-process, navigate back, and verify no restart.
- "Re-run" Button: For completed or failed runs, a "Re-run" button that explicitly resets the status to "pending" and reconnects SSE would be a great UX improvement, offering a clear way to retry or re-initiate a job.
- Stuck Runs Cleanup: A future consideration is handling cases where a background pipeline might crash (e.g., server restart), leaving runs stuck in an
ACTIVE_STATUS. A cleanup job or a manual "retry" mechanism would be essential for long-term robustness.
This fix was a satisfying win, not just for the immediate bug, but for the deeper understanding it brought to our system's architecture. It's a reminder that even in complex systems, sometimes the most elegant solutions are born from a clear understanding of fundamental principles like idempotency and connection lifecycles.
{"thingsDone":["Fixed pipeline restart on SSE reconnect","Implemented guard for runAutoFix/runRefactor calls based on run status","Added client-side polling for active runs using tRPC refetchInterval"],"pains":["Unconditional execution of pipeline on SSE reconnect","Overwriting run status from completed/active to 'pending'/'scanning'","Silent consumption of generator events after client disconnect","Complexity of alternative 'resume logic' within generators"],"successes":["Achieved idempotent pipeline initiation","Improved understanding of SSE lifecycle and reconnection patterns","Implemented a robust hybrid SSE/polling solution","Clean code changes with minimal complexity"],"techStack":["Next.js","tRPC","Server-Sent Events (SSE)","TypeScript","Node.js"]}