Back to Documentation

Workflows

Build complex multi-step AI pipelines with branching, parallel execution, retry logic, and human-in-the-loop capabilities.

npm install @rana/core

Workflow Definition

Define multi-step AI workflows with dependencies

import { Workflow, Step } from '@rana/core';

const contentWorkflow = new Workflow({
  name: 'content-pipeline',
  description: 'Generate and review content'
});

// Add steps
contentWorkflow.addStep(new Step({
  name: 'research',
  handler: async (input) => {
    return await agent.run(`Research: ${input.topic}`);
  }
}));

contentWorkflow.addStep(new Step({
  name: 'draft',
  dependsOn: ['research'],
  handler: async (input, context) => {
    const research = context.getOutput('research');
    return await agent.run(`Write article using: ${research}`);
  }
}));

contentWorkflow.addStep(new Step({
  name: 'review',
  dependsOn: ['draft'],
  handler: async (input, context) => {
    const draft = context.getOutput('draft');
    return await agent.run(`Review and improve: ${draft}`);
  }
}));

// Run workflow
const result = await contentWorkflow.run({ topic: 'AI trends' });
console.log(result.outputs.review);  // Final reviewed content

Conditional Branching

Branch workflows based on conditions

import { Workflow, Step, Branch } from '@rana/core';

const supportWorkflow = new Workflow({ name: 'support-ticket' });

// Classify the ticket
supportWorkflow.addStep(new Step({
  name: 'classify',
  handler: async (input) => {
    return await classify(input.message, [
      'billing', 'technical', 'sales', 'other'
    ]);
  }
}));

// Branch based on classification
supportWorkflow.addBranch(new Branch({
  name: 'route',
  dependsOn: ['classify'],
  conditions: [
    {
      when: (ctx) => ctx.getOutput('classify') === 'billing',
      then: 'billing-handler'
    },
    {
      when: (ctx) => ctx.getOutput('classify') === 'technical',
      then: 'technical-handler'
    },
    {
      when: (ctx) => ctx.getOutput('classify') === 'sales',
      then: 'sales-handler'
    }
  ],
  default: 'general-handler'
}));

// Define handlers for each branch
supportWorkflow.addStep(new Step({
  name: 'billing-handler',
  handler: async (input, ctx) => {
    return await billingAgent.run(input.message);
  }
}));

// ... other handlers

Parallel Execution

Run steps in parallel for performance

import { Workflow, Step, Parallel } from '@rana/core';

const analysisWorkflow = new Workflow({ name: 'document-analysis' });

// Single input step
analysisWorkflow.addStep(new Step({
  name: 'extract-text',
  handler: async (input) => {
    return await extractText(input.document);
  }
}));

// Parallel analysis steps
analysisWorkflow.addParallel(new Parallel({
  name: 'analyze',
  dependsOn: ['extract-text'],
  steps: [
    new Step({
      name: 'sentiment',
      handler: async (_, ctx) => {
        return await sentiment(ctx.getOutput('extract-text'));
      }
    }),
    new Step({
      name: 'entities',
      handler: async (_, ctx) => {
        return await extractEntities(ctx.getOutput('extract-text'));
      }
    }),
    new Step({
      name: 'summary',
      handler: async (_, ctx) => {
        return await summarize(ctx.getOutput('extract-text'));
      }
    }),
    new Step({
      name: 'keywords',
      handler: async (_, ctx) => {
        return await extractKeywords(ctx.getOutput('extract-text'));
      }
    })
  ]
}));

// Merge results
analysisWorkflow.addStep(new Step({
  name: 'merge',
  dependsOn: ['analyze'],
  handler: async (_, ctx) => ({
    sentiment: ctx.getOutput('sentiment'),
    entities: ctx.getOutput('entities'),
    summary: ctx.getOutput('summary'),
    keywords: ctx.getOutput('keywords')
  })
}));

Retry & Timeout

Built-in retry logic and timeouts

import { Workflow, Step, RetryPolicy } from '@rana/core';

const workflow = new Workflow({ name: 'reliable-workflow' });

// Step with retry policy
workflow.addStep(new Step({
  name: 'api-call',
  timeout: 30000,  // 30 second timeout
  retry: new RetryPolicy({
    maxAttempts: 3,
    backoff: 'exponential',
    initialDelay: 1000,
    maxDelay: 10000,
    retryOn: ['timeout', 'rate_limit', '5xx']
  }),
  handler: async (input) => {
    return await externalAPI.call(input);
  },
  onRetry: (attempt, error) => {
    console.log(`Retry attempt ${attempt}: ${error.message}`);
  }
}));

// Step with fallback
workflow.addStep(new Step({
  name: 'with-fallback',
  handler: async (input) => {
    return await primaryService.call(input);
  },
  fallback: async (input, error) => {
    console.log(`Primary failed: ${error.message}`);
    return await backupService.call(input);
  }
}));

// Global error handler
workflow.onError(async (step, error, context) => {
  await logError({ step: step.name, error, context });
  await notifyOps(`Workflow error in ${step.name}`);
});

Human-in-the-Loop

Pause workflows for human review

import { Workflow, Step, HumanReview } from '@rana/core';

const approvalWorkflow = new Workflow({ name: 'content-approval' });

// Generate content
approvalWorkflow.addStep(new Step({
  name: 'generate',
  handler: async (input) => {
    return await agent.run(`Write: ${input.brief}`);
  }
}));

// Human review gate
approvalWorkflow.addStep(new HumanReview({
  name: 'review',
  dependsOn: ['generate'],
  reviewers: ['editor@company.com'],
  timeout: '24h',
  ui: {
    title: 'Content Review Required',
    showOutput: 'generate',
    actions: ['approve', 'reject', 'request-changes']
  },
  onApprove: async (ctx) => {
    return { approved: true, content: ctx.getOutput('generate') };
  },
  onReject: async (ctx, feedback) => {
    // Re-generate with feedback
    return await agent.run(`Revise based on: ${feedback}`);
  },
  onTimeout: async (ctx) => {
    await notify('Review timed out');
    return { approved: false, reason: 'timeout' };
  }
}));

// Continue after approval
approvalWorkflow.addStep(new Step({
  name: 'publish',
  dependsOn: ['review'],
  condition: (ctx) => ctx.getOutput('review').approved,
  handler: async (_, ctx) => {
    return await publishContent(ctx.getOutput('review').content);
  }
}));

Workflow Composition

Compose workflows from reusable sub-workflows

import { Workflow, SubWorkflow } from '@rana/core';

// Define reusable sub-workflows
const validationWorkflow = new Workflow({ name: 'validation' });
validationWorkflow.addStep(/* validation steps */);

const enrichmentWorkflow = new Workflow({ name: 'enrichment' });
enrichmentWorkflow.addStep(/* enrichment steps */);

const notificationWorkflow = new Workflow({ name: 'notification' });
notificationWorkflow.addStep(/* notification steps */);

// Compose into main workflow
const mainWorkflow = new Workflow({ name: 'main-pipeline' });

mainWorkflow.addStep(new Step({
  name: 'input',
  handler: async (input) => input.data
}));

mainWorkflow.addSubWorkflow(new SubWorkflow({
  name: 'validate',
  workflow: validationWorkflow,
  dependsOn: ['input']
}));

mainWorkflow.addSubWorkflow(new SubWorkflow({
  name: 'enrich',
  workflow: enrichmentWorkflow,
  dependsOn: ['validate']
}));

mainWorkflow.addSubWorkflow(new SubWorkflow({
  name: 'notify',
  workflow: notificationWorkflow,
  dependsOn: ['enrich'],
  parallel: true  // Run in background
}));

// Run composed workflow
const result = await mainWorkflow.run({ data: inputData });

Workflow Monitoring

Built-in monitoring and visualization for your workflows:

import { WorkflowMonitor } from '@rana/core';

const monitor = new WorkflowMonitor({
  storage: 'postgresql',
  dashboard: true
});

// Track workflow execution
workflow.use(monitor.middleware());

// Get execution history
const runs = await monitor.getRuns({
  workflow: 'content-pipeline',
  status: 'completed',
  since: '7d ago'
});

// Get performance metrics
const metrics = await monitor.getMetrics('content-pipeline');
console.log(metrics.avgDuration);    // 45s
console.log(metrics.successRate);    // 98.5%
console.log(metrics.avgCost);        // $0.15

// Start dashboard server
await monitor.startDashboard({ port: 3001 });