Skip to content

FailureSource

FailureSource

Base class and implementations for failure case sources.

FailureSource (Base Class)

Abstract base class for implementing custom failure sources.

Import

import { FailureSource } from '@korvol/fixwright';

Abstract Methods

start()

Start watching for failures.

abstract start(): Promise<void>

stop()

Stop watching for failures.

abstract stop(): Promise<void>

Events

interface FailureSourceEvents {
failure: (failureCase: FailureCase) => void;
error: (error: Error) => void;
}

Implementing a Custom Source

import { FailureSource } from '@korvol/fixwright';
import type { FailureCase } from '@korvol/shared';
class MyCustomSource extends FailureSource {
private polling = false;
async start(): Promise<void> {
this.polling = true;
this.poll();
}
async stop(): Promise<void> {
this.polling = false;
}
private async poll(): Promise<void> {
while (this.polling) {
try {
const failure = await this.fetchNextFailure();
if (failure) {
this.emit('failure', failure);
}
} catch (error) {
this.emit('error', error as Error);
}
await this.sleep(5000);
}
}
private async fetchNextFailure(): Promise<FailureCase | null> {
// Your implementation
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

FileFailureSource

Watches a directory for failure case JSON files.

Import

import { FileFailureSource } from '@korvol/fixwright';

Constructor

constructor(directory: string)

Parameters:

NameTypeDescription
directorystringDirectory to watch

Usage

const source = new FileFailureSource('./failure-cases');
source.on('failure', (failureCase) => {
console.log('New failure:', failureCase.id);
});
source.on('error', (error) => {
console.error('Source error:', error);
});
await source.start();

Directory Structure

failure-cases/
├── fc-2024-01-15-abc123.json
├── fc-2024-01-15-def456.json
└── fc-2024-01-16-ghi789.json

Each file should contain a valid FailureCase JSON object.

With FixWright

const fixwright = new FixWright(config);
fixwright.useFileSource('./failure-cases');
await fixwright.start();

DatabaseFailureSource

Fetches failure cases from a database.

Import

import { DatabaseFailureSource } from '@korvol/fixwright';

Constructor

constructor(config: DatabaseConfig)
interface DatabaseConfig {
connectionString: string;
tableName?: string;
pollInterval?: number;
}

Parameters:

NameTypeDefaultDescription
connectionStringstring-Database connection string
tableNamestring'failure_cases'Table name
pollIntervalnumber5000Poll interval in ms

Usage

const source = new DatabaseFailureSource({
connectionString: process.env.DATABASE_URL!,
tableName: 'stepwright_failures',
pollInterval: 10000,
});
fixwright.useSource(source);

Expected Table Schema

CREATE TABLE failure_cases (
id VARCHAR(255) PRIMARY KEY,
status VARCHAR(50) DEFAULT 'pending',
timestamp TIMESTAMP DEFAULT NOW(),
data JSONB NOT NULL
);

Queue-Based Source Example

import { FailureSource } from '@korvol/fixwright';
import type { FailureCase } from '@korvol/shared';
import Redis from 'ioredis';
class RedisQueueSource extends FailureSource {
private redis: Redis;
private running = false;
constructor(private queueName: string) {
super();
this.redis = new Redis(process.env.REDIS_URL);
}
async start(): Promise<void> {
this.running = true;
this.processQueue();
}
async stop(): Promise<void> {
this.running = false;
await this.redis.quit();
}
private async processQueue(): Promise<void> {
while (this.running) {
try {
// Blocking pop with 5 second timeout
const result = await this.redis.brpop(this.queueName, 5);
if (result) {
const [, data] = result;
const failureCase: FailureCase = JSON.parse(data);
this.emit('failure', failureCase);
}
} catch (error) {
this.emit('error', error as Error);
await this.sleep(1000);
}
}
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Usage
const source = new RedisQueueSource('stepwright:failures');
fixwright.useSource(source);

Webhook Source Example

import { FailureSource } from '@korvol/fixwright';
import type { FailureCase } from '@korvol/shared';
import express from 'express';
class WebhookSource extends FailureSource {
private app: express.Application;
private server: any;
constructor(private port: number) {
super();
this.app = express();
this.app.use(express.json());
this.app.post('/failure', (req, res) => {
const failureCase: FailureCase = req.body;
this.emit('failure', failureCase);
res.status(200).json({ received: true });
});
}
async start(): Promise<void> {
return new Promise((resolve) => {
this.server = this.app.listen(this.port, () => {
console.log(`Webhook listening on port ${this.port}`);
resolve();
});
});
}
async stop(): Promise<void> {
if (this.server) {
return new Promise((resolve) => {
this.server.close(resolve);
});
}
}
}
// Usage
const source = new WebhookSource(3001);
fixwright.useSource(source);

Multiple Sources

Process failures from multiple sources:

class MultiSource extends FailureSource {
private sources: FailureSource[];
constructor(sources: FailureSource[]) {
super();
this.sources = sources;
// Forward events from all sources
for (const source of sources) {
source.on('failure', (fc) => this.emit('failure', fc));
source.on('error', (err) => this.emit('error', err));
}
}
async start(): Promise<void> {
await Promise.all(this.sources.map(s => s.start()));
}
async stop(): Promise<void> {
await Promise.all(this.sources.map(s => s.stop()));
}
}
// Usage
const source = new MultiSource([
new FileFailureSource('./failures'),
new DatabaseFailureSource({ connectionString: '...' }),
]);
fixwright.useSource(source);

See Also