Skip to content

Advanced Patterns

This guide explores advanced patterns and techniques for building complex workflows with FuturableTask.

Complex Workflows

Pipeline Pattern

Build complex data processing pipelines:

typescript
const processPipeline = FuturableTask
  .of(() => fetchRawData())
  .map(data => validateData(data))
  .filter(data => data.isValid, 'Invalid data')
  .map(data => normalizeData(data))
  .map(data => enrichData(data))
  .flatMap(data =>
    FuturableTask.of(() => saveToDatabase(data))
  )
  .tap(result => logSuccess(result))
  .recover(error => {
    logError(error);
    return fallbackData;
  });

Saga Pattern

Coordinate multiple operations with rollback:

typescript
class Saga<T> {
  private compensations: Array<() => Promise<void>> = [];

  step<U>(
    forward: () => Promise<U>,
    backward: () => Promise<void>
  ): FuturableTask<U> {
    return FuturableTask.of(async () => {
      try {
        const result = await forward();
        this.compensations.unshift(backward);
        return result;
      } catch (error) {
        await this.rollback();
        throw error;
      }
    });
  }

  async rollback() {
    for (const compensate of this.compensations) {
      try {
        await compensate();
      } catch (error) {
        console.error('Compensation failed:', error);
      }
    }
    this.compensations = [];
  }
}

// Usage
const saga = new Saga();

const transaction = FuturableTask.sequence([
  saga.step(
    () => createUser(userData),
    () => deleteUser(userId)
  ),
  saga.step(
    () => createAccount(accountData),
    () => deleteAccount(accountId)
  ),
  saga.step(
    () => sendWelcomeEmail(email),
    () => {} // Email can't be unsent
  )
]);

Fan-Out/Fan-In Pattern

Parallelize work then aggregate results:

typescript
async function fanOutFanIn<T, U>(
  items: T[],
  processor: (item: T) => FuturableTask<U>,
  aggregator: (results: U[]) => any
) {
  const limiter = FuturableTask.createLimiter(10);

  const tasks = items.map(item =>
    limiter(processor(item))
  );

  const results = await FuturableTask
    .parallel(tasks)
    .run();

  return aggregator(results);
}

// Usage
const result = await fanOutFanIn(
  urls,
  url => FuturableTask.fetch(url).map(r => r.json()),
  results => results.reduce((acc, r) => ({ ...acc, ...r }), {})
);

State Machines

Finite State Machine

typescript
type State = 'idle' | 'loading' | 'success' | 'error';

class TaskStateMachine<T> {
  private state: State = 'idle';
  private data: T | null = null;
  private error: any = null;

  execute(task: FuturableTask<T>): FuturableTask<void> {
    return FuturableTask.of(async () => {
      this.transition('loading');

      const result = await task.runSafe();

      if (result.success) {
        this.data = result.data;
        this.transition('success');
      } else {
        this.error = result.error;
        this.transition('error');
      }
    });
  }

  private transition(newState: State) {
    console.log(`${this.state} → ${newState}`);
    this.state = newState;
  }

  getState() {
    return {
      state: this.state,
      data: this.data,
      error: this.error
    };
  }
}

Caching Strategies

LRU Cache with Tasks

typescript
class TaskCache<K, V> {
  private cache = new Map<K, { data: V; timestamp: number }>();
  private maxSize = 100;
  private ttl = 60000; // 1 minute

  getOrCompute(
    key: K,
    compute: () => FuturableTask<V>
  ): FuturableTask<V> {
    return FuturableTask.of(async () => {
      const cached = this.cache.get(key);

      if (cached && Date.now() - cached.timestamp < this.ttl) {
        return cached.data;
      }

      const data = await compute().run();

      this.cache.set(key, {
        data,
        timestamp: Date.now()
      });

      // Evict oldest if cache is full
      if (this.cache.size > this.maxSize) {
        const oldest = Array.from(this.cache.keys())[0];
        this.cache.delete(oldest);
      }

      return data;
    });
  }
}

Stale-While-Revalidate

typescript
function staleWhileRevalidate<T>(
  fetch: () => FuturableTask<T>,
  cache: Map<string, T>,
  key: string
): FuturableTask<T> {
  return FuturableTask.of(async () => {
    const cached = cache.get(key);

    // Start revalidation in background
    fetch().run().then(fresh => {
      cache.set(key, fresh);
    });

    // Return stale data immediately if available
    if (cached) {
      return cached;
    }

    // Otherwise wait for fresh data
    return await fetch().run();
  });
}

Coordination Patterns

Barrier Pattern

Wait for multiple tasks before proceeding:

typescript
class Barrier {
  private tasks: FuturableTask<any>[] = [];

  add<T>(task: FuturableTask<T>) {
    this.tasks.push(task);
    return this;
  }

  wait(): FuturableTask<any[]> {
    return FuturableTask.parallel(this.tasks);
  }
}

const barrier = new Barrier();

barrier.add(task1);
barrier.add(task2);
barrier.add(task3);

const results = await barrier.wait().run();

Rendezvous Pattern

Synchronize two concurrent operations:

typescript
function rendezvous<T, U, R>(
  task1: FuturableTask<T>,
  task2: FuturableTask<U>,
  combiner: (a: T, b: U) => R
): FuturableTask<R> {
  return task1.zip(task2).map(([a, b]) => combiner(a, b));
}

const result = await rendezvous(
  FuturableTask.fetch('/api/users'),
  FuturableTask.fetch('/api/settings'),
  (users, settings) => ({ users, settings })
).run();

Error Recovery Strategies

Graceful Degradation

typescript
function gracefulDegradation<T>(
  primary: FuturableTask<T>,
  fallbacks: FuturableTask<T>[]
): FuturableTask<T> {
  let task = primary;

  for (const fallback of fallbacks) {
    task = task.orElse(() => fallback);
  }

  return task;
}

const data = await gracefulDegradation(
  FuturableTask.fetch('/api/v2/data'),
  [
    FuturableTask.fetch('/api/v1/data'),
    FuturableTask.of(() => loadFromCache()),
    FuturableTask.resolve(DEFAULT_DATA)
  ]
).run();

Bulkhead Pattern

Isolate failures:

typescript
class Bulkhead {
  private limiters = new Map<string, FuturableTaskLimiter>();

  getLimiter(name: string, concurrency: number) {
    if (!this.limiters.has(name)) {
      this.limiters.set(
        name,
        FuturableTask.createLimiter(concurrency)
      );
    }
    return this.limiters.get(name)!;
  }

  wrap<T>(
    name: string,
    task: FuturableTask<T>,
    concurrency: number = 10
  ): FuturableTask<T> {
    const limiter = this.getLimiter(name, concurrency);
    return limiter(task);
  }
}

// Isolate different services
const bulkhead = new Bulkhead();

const userService = bulkhead.wrap(
  'users',
  FuturableTask.fetch('/api/users'),
  5
);

const orderService = bulkhead.wrap(
  'orders',
  FuturableTask.fetch('/api/orders'),
  3
);

Performance Optimization

Request Deduplication

typescript
class RequestDeduplicator<K, V> {
  private pending = new Map<K, FuturableTask<V>>();

  deduplicate(
    key: K,
    factory: () => FuturableTask<V>
  ): FuturableTask<V> {
    return FuturableTask.of(async () => {
      if (this.pending.has(key)) {
        return await this.pending.get(key)!.run();
      }

      const task = factory();
      this.pending.set(key, task);

      try {
        const result = await task.run();
        return result;
      } finally {
        this.pending.delete(key);
      }
    });
  }
}

Batch Processing

typescript
function batchProcess<T, U>(
  items: T[],
  batchSize: number,
  processor: (batch: T[]) => FuturableTask<U[]>
): FuturableTask<U[]> {
  const batches: T[][] = [];

  for (let i = 0; i < items.length; i += batchSize) {
    batches.push(items.slice(i, i + batchSize));
  }

  return FuturableTask
    .traverse(batches, batch => processor(batch))
    .map(results => results.flat());
}

// Process 1000 items in batches of 100
const results = await batchProcess(
  items,
  100,
  batch => FuturableTask.of(() => processBatch(batch))
).run();

Testing Patterns

Mock Tasks

typescript
class MockTask {
  static success<T>(value: T, delay: number = 0) {
    return FuturableTask
      .resolve(value)
      .delay(delay);
  }

  static failure(error: any, delay: number = 0) {
    return FuturableTask
      .reject(error)
      .delay(delay);
  }

  static flaky<T>(
    value: T,
    failureRate: number = 0.5
  ): FuturableTask<T> {
    return FuturableTask.of(() => {
      if (Math.random() < failureRate) {
        throw new Error('Random failure');
      }
      return value;
    });
  }
}

// Usage in tests
const mockAPI = MockTask.success({ id: 1 }, 100);
const result = await mockAPI.run();

Test Utilities

typescript
async function expectTaskToSucceed<T>(
  task: FuturableTask<T>,
  expected: T
) {
  const result = await task.runSafe();
  expect(result.success).toBe(true);
  if (result.success) {
    expect(result.data).toEqual(expected);
  }
}

async function expectTaskToFail(
  task: FuturableTask<any>,
  errorMessage?: string
) {
  const result = await task.runSafe();
  expect(result.success).toBe(false);
  if (!result.success && errorMessage) {
    expect(result.error.message).toBe(errorMessage);
  }
}

See Also

Released under the MIT License.