FailureSource
FailureSource
Base class and implementations for failure case sources.
FailureSource (Base Class)
Abstract base class for implementing custom failure sources.
Import
import { FailureSource } from '@korvol/fixwright';Abstract Methods
start()
Start watching for failures.
abstract start(): Promise<void>stop()
Stop watching for failures.
abstract stop(): Promise<void>Events
interface FailureSourceEvents { failure: (failureCase: FailureCase) => void; error: (error: Error) => void;}Implementing a Custom Source
import { FailureSource } from '@korvol/fixwright';import type { FailureCase } from '@korvol/shared';
class MyCustomSource extends FailureSource { private polling = false;
async start(): Promise<void> { this.polling = true; this.poll(); }
async stop(): Promise<void> { this.polling = false; }
private async poll(): Promise<void> { while (this.polling) { try { const failure = await this.fetchNextFailure(); if (failure) { this.emit('failure', failure); } } catch (error) { this.emit('error', error as Error); } await this.sleep(5000); } }
private async fetchNextFailure(): Promise<FailureCase | null> { // Your implementation }
private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); }}FileFailureSource
Watches a directory for failure case JSON files.
Import
import { FileFailureSource } from '@korvol/fixwright';Constructor
constructor(directory: string)Parameters:
| Name | Type | Description |
|---|---|---|
directory | string | Directory to watch |
Usage
const source = new FileFailureSource('./failure-cases');
source.on('failure', (failureCase) => { console.log('New failure:', failureCase.id);});
source.on('error', (error) => { console.error('Source error:', error);});
await source.start();Directory Structure
failure-cases/├── fc-2024-01-15-abc123.json├── fc-2024-01-15-def456.json└── fc-2024-01-16-ghi789.jsonEach file should contain a valid FailureCase JSON object.
With FixWright
const fixwright = new FixWright(config);fixwright.useFileSource('./failure-cases');await fixwright.start();DatabaseFailureSource
Fetches failure cases from a database.
Import
import { DatabaseFailureSource } from '@korvol/fixwright';Constructor
constructor(config: DatabaseConfig)interface DatabaseConfig { connectionString: string; tableName?: string; pollInterval?: number;}Parameters:
| Name | Type | Default | Description |
|---|---|---|---|
connectionString | string | - | Database connection string |
tableName | string | 'failure_cases' | Table name |
pollInterval | number | 5000 | Poll interval in ms |
Usage
const source = new DatabaseFailureSource({ connectionString: process.env.DATABASE_URL!, tableName: 'stepwright_failures', pollInterval: 10000,});
fixwright.useSource(source);Expected Table Schema
CREATE TABLE failure_cases ( id VARCHAR(255) PRIMARY KEY, status VARCHAR(50) DEFAULT 'pending', timestamp TIMESTAMP DEFAULT NOW(), data JSONB NOT NULL);Queue-Based Source Example
import { FailureSource } from '@korvol/fixwright';import type { FailureCase } from '@korvol/shared';import Redis from 'ioredis';
class RedisQueueSource extends FailureSource { private redis: Redis; private running = false;
constructor(private queueName: string) { super(); this.redis = new Redis(process.env.REDIS_URL); }
async start(): Promise<void> { this.running = true; this.processQueue(); }
async stop(): Promise<void> { this.running = false; await this.redis.quit(); }
private async processQueue(): Promise<void> { while (this.running) { try { // Blocking pop with 5 second timeout const result = await this.redis.brpop(this.queueName, 5);
if (result) { const [, data] = result; const failureCase: FailureCase = JSON.parse(data); this.emit('failure', failureCase); } } catch (error) { this.emit('error', error as Error); await this.sleep(1000); } } }
private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); }}
// Usageconst source = new RedisQueueSource('stepwright:failures');fixwright.useSource(source);Webhook Source Example
import { FailureSource } from '@korvol/fixwright';import type { FailureCase } from '@korvol/shared';import express from 'express';
class WebhookSource extends FailureSource { private app: express.Application; private server: any;
constructor(private port: number) { super(); this.app = express(); this.app.use(express.json());
this.app.post('/failure', (req, res) => { const failureCase: FailureCase = req.body; this.emit('failure', failureCase); res.status(200).json({ received: true }); }); }
async start(): Promise<void> { return new Promise((resolve) => { this.server = this.app.listen(this.port, () => { console.log(`Webhook listening on port ${this.port}`); resolve(); }); }); }
async stop(): Promise<void> { if (this.server) { return new Promise((resolve) => { this.server.close(resolve); }); } }}
// Usageconst source = new WebhookSource(3001);fixwright.useSource(source);Multiple Sources
Process failures from multiple sources:
class MultiSource extends FailureSource { private sources: FailureSource[];
constructor(sources: FailureSource[]) { super(); this.sources = sources;
// Forward events from all sources for (const source of sources) { source.on('failure', (fc) => this.emit('failure', fc)); source.on('error', (err) => this.emit('error', err)); } }
async start(): Promise<void> { await Promise.all(this.sources.map(s => s.start())); }
async stop(): Promise<void> { await Promise.all(this.sources.map(s => s.stop())); }}
// Usageconst source = new MultiSource([ new FileFailureSource('./failures'), new DatabaseFailureSource({ connectionString: '...' }),]);fixwright.useSource(source);See Also
- FixWright Class - Main class
- Failure Cases - Failure case format
- Types - All type definitions