Skip to content

tsimons/cephlo

Repository files navigation

Cephlo

Cephlo logo

A TypeScript workflow management tool for composable task execution. Cephlo helps you build and manage complex workflows by breaking them down into smaller, reusable tasks.

Features

  • 🧩 Composable task-based workflows
  • 🔄 Automatic retry handling
  • 🎯 Input and output validation, BYO framework
  • 📊 Pluggable logging and tracing
  • 🎭 Hooks for workflow events
  • ⚡ Parallel task execution
  • 🛡️ Type-safe task definitions

Installation

npm install cephlo
# or
yarn add cephlo
# or
pnpm add cephlo

Quick Start

import { defineTask, createWorkflow, runWorkflow } from 'cephlo';

// Define tasks
const fetchData = defineTask({
  name: 'fetchData',
  run: async (apiUrl: string): Promise<{ foo: string }> => {
    const response = await fetch(apiUrl);
    return response.json();
  },
  inputSchema: z.string(),
  outputSchema: z.object({
    foo: z.string(),
  }),
});

const processData = defineTask({
  name: 'processData',
  // return types from dependent tasks map to run arguments
  run: async ([fetchDataResponse]) => {
    // Process the data
    return { processed: true, foo: fetchDataResponse.foo };
  },
  deps: [fetchData] as const,
  // typebox example
  outputSchema: ajv.compile(
    Type.Object({
      processed: Type.Boolean(),
      foo: Type.String(),
    })
  ),
});

// Create a workflow
const workflow = createWorkflow({
  name: 'dataProcessing',
  tasks: [fetchData, processData],
});

// Run the workflow
const result = await runWorkflow(workflow, 'https://api.example.com/data');

if (result.status === 'completed') {
  console.log('Workflow completed successfully!');
  console.log('Processed data:', result.outputs.get('processData'));
} else {
  console.error('Workflow failed:', result.error);
}

Task Definition

Tasks are the building blocks of workflows. Each task can:

  • Accept inputs
  • Return outputs
  • Depend on other tasks
  • Define input and output schemas
    • Input schema only runs when the task has no deps
  • Handle errors
const task = defineTask({
  name: 'myTask',
  run: async (input: unknown, context?: WorkflowContext) => {
    // Task implementation
    return result;
  },
  deps: [
    /* dependent tasks */
  ],
  inputSchema: {
    /* Your favorite schema flavor */
  },
  outputSchema: {
    /* Your favorite schema flavor */
  },
});

Workflow Configuration

Configure the workflow engine with custom settings:

import { configureWorkflowEngine } from 'cephlo';

configureWorkflowEngine({
  defaultTaskTimeoutMs: 5000,
  retryPolicy: {
    maxAttempts: 3,
    backoffMs: 1000,
  },
  logger: {
    info: console.info,
    debug: console.debug,
    error: console.error,
    warn: console.warn,
  },
  validate: (data, schema) => {
    // zod
    const result = schema.safeParse(data);
    let valid = result.success;

    return {
      valid,
      data: result.data,
      error: result.error,
    };

    // typebox
    // ajv.compile called as schema definition
    const validateFn = schema;
    const valid = validateFn(data);
    return {
      valid,
      data,
      error: schema.errors,
    };
  },
});

Custom tracing

Tracing Integration

The workflow engine supports tracing through a simple adapter interface. This allows you to integrate with any tracing system that supports spans, such as OpenTelemetry or Datadog.

OpenTelemetry Integration

Here's how to integrate with OpenTelemetry:

import { configureWorkflowEngine, WorkflowSpan } from '../src/config';
import { trace, context, SpanStatusCode, Span } from '@opentelemetry/api';

// Create an OpenTelemetry span adapter
class OpenTelemetrySpan implements WorkflowSpan {
  constructor(public readonly span: Span) {}

  setAttribute(key: string, value: unknown) {
    this.span.setAttribute(key, value);
  }

  setAttributes(attributes: Record<string, unknown>) {
    Object.entries(attributes).forEach(([key, value]) => {
      this.span.setAttribute(key, value);
    });
  }

  addEvent(name: string, attributes?: Record<string, unknown>) {
    this.span.addEvent(name, attributes);
  }

  setStatus(status: { code: 'OK' | 'ERROR' | 'UNSET'; message?: string }) {
    this.span.setStatus({
      code:
        status.code === 'OK'
          ? SpanStatusCode.OK
          : status.code === 'ERROR'
          ? SpanStatusCode.ERROR
          : SpanStatusCode.UNSET,
      message: status.message,
    });
  }

  end() {
    this.span.end();
  }
}

// Create an OpenTelemetry tracer adapter
const otelTracer = {
  startSpan(
    name: string,
    options?: {
      attributes?: Record<string, unknown>;
      parent?: WorkflowSpan;
    }
  ): WorkflowSpan {
    const tracer = trace.getTracer('workflow-engine');

    // Create span options with parent context if provided
    const spanOptions: Parameters<ReturnType<typeof trace.getTracer>['startSpan']>[1] = {
      attributes: options?.attributes,
    };

    // If parent span is provided, create a new context with that span
    const ctx = options?.parent
      ? context.with(trace.setSpan(context.active(), (options.parent as OpenTelemetrySpan).span))
      : context.active();

    // Start the span in the appropriate context
    const span = tracer.startSpan(name, spanOptions, ctx);

    // Set the new span as active in the current context
    context.with(trace.setSpan(context.active(), span), () => {
      // Span is now active in this context
    });

    return new OpenTelemetrySpan(span);
  },

  getCurrentSpan(): WorkflowSpan | undefined {
    const span = trace.getActiveSpan();
    return span ? new OpenTelemetrySpan(span) : undefined;
  },
};

// Configure the workflow engine with OpenTelemetry
configureWorkflowEngine({
  tracer: otelTracer,
  // ... other config
});

Datadog Integration

Here's how to integrate with Datadog:

import { configureWorkflowEngine, WorkflowSpan } from '../src/config';
import tracer from 'dd-trace';

// Create a Datadog span adapter
class DatadogSpan implements WorkflowSpan {
  constructor(private span: ReturnType<typeof tracer.startSpan>) {}

  setAttribute(key: string, value: unknown) {
    this.span.setTag(key, value);
  }

  setAttributes(attributes: Record<string, unknown>) {
    Object.entries(attributes).forEach(([key, value]) => {
      this.span.setTag(key, value);
    });
  }

  addEvent(name: string, attributes?: Record<string, unknown>) {
    this.span.addTags({
      [`event.${name}`]: true,
      ...attributes,
    });
  }

  setStatus(status: { code: 'OK' | 'ERROR' | 'UNSET'; message?: string }) {
    if (status.code === 'ERROR') {
      this.span.setError(new Error(status.message));
    }
  }

  end() {
    this.span.finish();
  }
}

// Create a Datadog tracer adapter
const ddTracer = {
  startSpan(
    name: string,
    options?: {
      attributes?: Record<string, unknown>;
      parent?: WorkflowSpan;
    }
  ): WorkflowSpan {
    // If parent span is provided, use it as the parent
    const parentSpan = options?.parent ? (options.parent as DatadogSpan).span : undefined;

    // Start a new span
    const span = tracer.startSpan(name, {
      childOf: parentSpan,
      tags: options?.attributes,
    });

    return new DatadogSpan(span);
  },

  getCurrentSpan(): WorkflowSpan | undefined {
    const span = tracer.scope().active();
    return span ? new DatadogSpan(span) : undefined;
  },
};

// Configure the workflow engine with Datadog
configureWorkflowEngine({
  tracer: ddTracer,
  // ... other config
});

What Gets Traced

The workflow engine automatically traces:

  1. Workflow execution

    • Start and end of workflow
    • Input and output values
    • Success/failure status
  2. Task execution

    • Start and end of each task
    • Task name and dependencies
    • Input and output values
    • Success/failure status
  3. Error handling

    • Error messages and stack traces
    • Task failure context

Error Handling

Cephlo provides robust error handling with automatic retries:

const workflow = createWorkflow({
  name: 'errorHandling',
  tasks: [task1, task2],
  hooks: {
    onTaskError: (error, context) => {
      // Custom error handling
      // Return true to skip retry, false to allow retry
      return false;
    },
    onWorkflowError: (error, context) => {
      // Handle workflow-level errors
    },
  },
});

Hooks

Customize workflow behavior with hooks:

const workflow = createWorkflow({
  name: 'withHooks',
  tasks: [task1, task2],
  hooks: {
    onWorkflowStart: context => {
      console.log('Workflow started');
    },
    onWorkflowEnd: context => {
      console.log('Workflow ended');
    },
    onTaskStart: context => {
      console.log('Task started:', context.task.name);
    },
    onTaskEnd: (error, result, context) => {
      console.log('Task ended:', context.task.name);
    },
    onTaskSuccess: (result, context) => {
      console.log('Task succeeded:', context.task.name);
    },
    onTaskError: (error, context) => {
      console.error('Task failed:', context.task.name);
      return false; // Allow retry
    },
    onTaskRetry: context => {
      console.log('Retrying task:', context.task.name);
    },
  },
});

Development

# Install dependencies
pnpm install

# Build the project
pnpm run build

# Run tests
pnpm run test

# Run tests in watch mode
pnpm run test:watch

# Run tests with coverage
pnpm run test:coverage

# Lint the code
pnpm run lint

# Format the code
pnpm run format

License

MIT © Tj Simons

About

A workflow builder

Resources

Stars

Watchers

Forks

Packages

No packages published