GitHub Repository

You can find the project source code which uses real APIs on Github.

Upstash Workflow integrates with the Vercel AI SDK to provide durable and reliable AI applications. This allows you to:

  • Build resilient AI applications with automatic retries
  • Manage AI operations with workflow steps
  • Implement tools and function calling with durability
  • Handle errors gracefully across your AI operations
  • Handle long-running AI operations with extended timeouts

This guide will walk you through setting up and implementing AI features using Upstash Workflow’s durability guarantees with Vercel AI SDK’s capabilities.

Prerequisites

Before getting started, make sure you have:

  • An OpenAI API key
  • Basic familiarity with Upstash Workflow and Vercel AI SDK
  • Vercel AI SDK version 4.0.12 or higher (required for ToolExecutionError handling)

Installation

Install the required packages:

Implementation

Creating OpenAI client

AI SDKs (Vercel AI SDK, OpenAI SDK etc.) uses the client’s default fetch implementation to make API requests, but allows you to provide a custom fetch implementation.

In the case of Upstash Workflow, we need to use the context.call method to make HTTP requests. We can create a custom fetch implementation that uses context.call to make requests. By using context.call, Upstash Workflow is the one making the HTTP request and waiting for the response, even if it takes too long to receive response from the LLM.

The following code snippet can also be generalized to work with other LLM SDKs, such as Anthropic or Google.

import { createOpenAI } from '@ai-sdk/openai';
import { HTTPMethods } from '@upstash/qstash';
import { WorkflowAbort, WorkflowContext } from '@upstash/workflow';

export const createWorkflowOpenAI = (context: WorkflowContext) => {
  return createOpenAI({
    compatibility: "strict",
    fetch: async (input, init) => {
      try {
        // Prepare headers from init.headers
        const headers = init?.headers
          ? Object.fromEntries(new Headers(init.headers).entries())
          : {};

        // Prepare body from init.body
        const body = init?.body ? JSON.parse(init.body as string) : undefined;

        // Make network call
        const responseInfo = await context.call("openai-call-step", {
          url: input.toString(),
          method: init?.method as HTTPMethods,
          headers,
          body,
        });

        // Construct headers for the response
        const responseHeaders = new Headers(
          Object.entries(responseInfo.header).reduce((acc, [key, values]) => {
            acc[key] = values.join(", ");
            return acc;
          }, {} as Record<string, string>)
        );

        // Return the constructed response
        return new Response(JSON.stringify(responseInfo.body), {
          status: responseInfo.status,
          headers: responseHeaders,
        });
      } catch (error) {
        if (error instanceof WorkflowAbort) {
          throw error
        } else {
          console.error("Error in fetch implementation:", error);
          throw error; // Rethrow error for further handling
        }
      }
    },
  });
};

Using OpenAI client to generate text

Now that we’ve created the OpenAI client, we can use it to generate the text.

For that, we’re going to create a new workflow endpoint that uses the payload as prompt to generate text using the OpenAI client.

import { serve } from "@upstash/workflow/nextjs";
import { WorkflowAbort } from '@upstash/workflow';
import { generateText, ToolExecutionError } from 'ai';

import { createWorkflowOpenAI } from './model';

export const { POST } = serve<{ prompt: string }>(async (context) => {
  const openai = createWorkflowOpenAI(context);

  // Important: Must have a step before generateText
  const prompt = await context.run("get prompt", async () => {
    return context.requestPayload.prompt;
  });

  try {
    const result = await generateText({
      model: openai('gpt-3.5-turbo'),
      maxTokens: 2048,
      prompt,
    });

    await context.run("text", () => {
      console.log(`TEXT: ${result.text}`);
      return result.text;
    });
    
  } catch (error) {    
    if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) {
      throw error.cause;
    } else {
      throw error;
    }
  }
});

We can either run the app locally or deploy it. Once the app is running, we can trigger the workflow using the following code:

import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" });
const { workflowRunId } = await client.trigger({
  url: "<WORKFLOW_ENDPOINT>",
  body: { "prompt": "How is the weather in San Francisco around this time?" },
});

The workflow will execute, and we can view the logs in the Workflow dashboard:

Advanced Implementation with Tools

Tools allow the AI model to perform specific actions during text generation. You can learn more about tools in the Vercel AI SDK documentation.

When using tools with Upstash Workflow, each tool execution must be wrapped in a workflow step.

The maxSteps parameter must be greater than 1 when using tools to allow the model to process tool results and generate final responses. See the tool steps documentation for detailed explanation.

import { z } from 'zod';
import { serve } from "@upstash/workflow/nextjs";
import { WorkflowAbort } from '@upstash/workflow';
import { generateText, ToolExecutionError, tool } from 'ai';

import { createWorkflowOpenAI } from './model';

export const { POST } = serve<{ prompt: string }>(async (context) => {
  const openai = createWorkflowOpenAI(context);

  const prompt = await context.run("get prompt", async () => {
    return context.requestPayload.prompt;
  });

  try {
    const result = await generateText({
      model: openai('gpt-3.5-turbo'),
      tools: {
        weather: tool({
          description: 'Get the weather in a location',
          parameters: z.object({
            location: z.string().describe('The location to get the weather for'),
          }),
          execute: ({ location }) => context.run("weather tool", () => {
	        // Mock data, replace with actual weather API call
            return {
              location,
              temperature: 72 + Math.floor(Math.random() * 21) - 10,
            };
          })
        }),
      },
      maxSteps: 2,
      prompt,
    });
    
    await context.run("text", () => {
      console.log(`TEXT: ${result.text}`);
      return result.text;
    });
  } catch (error) {
    if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) {
      throw error.cause;
    } else {
      throw error;
    }
  }
});

When called with the same prompt as above, we will see the following logs:

Important Considerations

When using Upstash Workflow with the Vercel AI SDK, there are several critical requirements that must be followed:

Step Execution Order

The most critical requirement is that generateText cannot be called before any workflow step. Always have a step before generateText. This could be a step which gets the prompt:

Error Handling Pattern

You must use the following error handling pattern exactly as shown. The conditions and their handling should not be modified:

try {
  // Your generation code
} catch (error) {    
  if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) {
    throw error.cause;
  } else {
    throw error;
  }
}

Tool Implementation

When implementing tools:

  • Each tool’s execute function must be wrapped in a context.run() call
  • Tool steps should have descriptive names for tracking
  • Tools must follow the same error handling pattern as above

Example:

execute: ({ location }) => context.run("weather tool", () => {
  // Mock data, replace with actual weather API call
  return {
    location,
    temperature: 72 + Math.floor(Math.random() * 21) - 10,
  };
})