← Back to Blog

Building Pulse: A Type-Safe Business Logic Framework

TypeScriptFrameworkBullMQArchitecture

Building Pulse: A Type-Safe Business Logic Framework

Over the past year, I've built several production systems where business logic became scattered across controllers, services, and background jobs. The same patterns kept appearing: validating input, executing logic, handling async operations, and managing failures. This repetition led me to create Pulse - a type-safe framework that centralizes business logic execution.

The Problem Space

Traditional backend architectures often suffer from:

  1. Logic Dispersion: Business rules scattered across HTTP handlers, service classes, and worker processes
  2. Inconsistent Validation: Different validation approaches for synchronous vs asynchronous operations
  3. Poor Observability: Difficult to track and monitor business operations across the system
  4. Type Unsafety: Runtime errors from mismatched inputs and outputs

Here's what typical scattered logic looks like:

// HTTP Controller
app.post('/users', async (req, res) => {
const { email, name } = req.body;
if (!email || !validateEmail(email)) {
return res.status(400).json({ error: 'Invalid email' });
}
const user = await db.users.create({ email, name });
await emailQueue.add('welcome', { userId: user.id });
res.json(user);
});
// Background Job
emailQueue.process('welcome', async (job) => {
const { userId } = job.data;
const user = await db.users.findById(userId);
await sendEmail(user.email, 'Welcome!');
});

Notice the problems:

  • Validation logic mixed with business logic
  • No type safety between controller and background job
  • Duplicate data fetching
  • Hard to test in isolation

Core Architecture

Pulse introduces the concept of Actions - typed business operations that can run synchronously or asynchronously. Here's the high-level architecture:

┌─────────────────────────────────────────────────────────┐
│ Action Definition │
│ ┌─────────────┐ ┌──────────────┐ ┌───────────────┐ │
│ │ Schema │ │ Handler │ │ Metadata │ │
│ │ (Zod) │ │ (Function) │ │ (Config) │ │
│ └─────────────┘ └──────────────┘ └───────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Pulse Runtime │
│ │
│ ┌──────────────┐ ┌─────────────────┐ │
│ │ Validate │────────▶│ Execute │ │
│ │ Input │ │ Handler │ │
│ └──────────────┘ └─────────────────┘ │
│ │ │
│ ┌────────┴─────────┐ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Sync │ │ Async │ │
│ │ Return │ │ Queue │ │
│ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────┘

Building an Action

Let's build a complete action from scratch. Here's how Pulse works:

1. Define the Schema

First, we define the input/output schema using Zod:

import { z } from 'zod';
import { defineAction } from '@pulse/core';
const CreateUserAction = defineAction({
name: 'users.create',
input: z.object({
email: z.string().email(),
name: z.string().min(2),
role: z.enum(['user', 'admin']).default('user'),
}),
output: z.object({
id: z.string(),
email: z.string(),
name: z.string(),
createdAt: z.date(),
}),
});

This gives us:

  • Compile-time types extracted from Zod schemas
  • Runtime validation ensuring data integrity
  • Documentation through schema inspection

2. Implement the Handler

Next, we implement the business logic:

CreateUserAction.handler(async ({ input, context }) => {
// Input is fully typed and validated
const existingUser = await context.db.users.findByEmail(
input.email
);
if (existingUser) {
throw new ActionError('User already exists', {
code: 'USER_EXISTS',
field: 'email',
});
}
const user = await context.db.users.create({
email: input.email,
name: input.name,
role: input.role,
});
// Trigger async action
await context.actions.dispatch('emails.welcome', {
userId: user.id,
});
// Return is type-checked against output schema
return {
id: user.id,
email: user.email,
name: user.name,
createdAt: user.createdAt,
};
});

3. Execute the Action

Now we can execute it from anywhere:

// HTTP handler
app.post('/users', async (req, res) => {
const result = await pulse.execute('users.create', req.body);
res.json(result);
});
// Or queue it for async processing
await pulse.dispatch('users.create', {
email: 'user@example.com',
name: 'John Doe',
});

Synchronous vs Asynchronous Execution

Pulse provides two execution modes with identical interfaces:

┌──────────────────────────────────────────────────────┐
│ Sync Execution │
│ │
│ HTTP Request │
│ │ │
│ ▼ │
│ ┌─────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Validate│─────▶│ Execute │─────▶│ Return │ │
│ └─────────┘ └──────────┘ └──────────┘ │
│ │ │
│ ▼ │
│ HTTP Response │
└──────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────┐
│ Async Execution │
│ │
│ Event/Trigger │
│ │ │
│ ▼ │
│ ┌─────────┐ ┌──────────┐ │
│ │ Validate│─────▶│ Queue │ │
│ └─────────┘ └──────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ BullMQ│ Worker │─────▶│ Execute │ │
│ └──────────┘ └──────────┘ │
└──────────────────────────────────────────────────────┘

Implementation:

// Synchronous - waits for result
const user = await pulse.execute('users.create', {
email: 'user@example.com',
name: 'John Doe',
});
// Asynchronous - returns job ID immediately
const jobId = await pulse.dispatch('users.create', {
email: 'user@example.com',
name: 'John Doe',
});
// Check job status later
const job = await pulse.getJob(jobId);
console.log(job.status); // 'completed' | 'failed' | 'pending'

Error Handling

Pulse provides structured error handling with type safety:

import { ActionError } from '@pulse/core';
CreateUserAction.handler(async ({ input }) => {
if (await isEmailBlacklisted(input.email)) {
throw new ActionError('Email is blacklisted', {
code: 'BLACKLISTED_EMAIL',
field: 'email',
retry: false, // Don't retry this error
});
}
try {
return await createUser(input);
} catch (error) {
throw new ActionError('Database error', {
code: 'DB_ERROR',
retry: true, // Can retry
cause: error,
});
}
});

Errors propagate with full context:

try {
await pulse.execute('users.create', invalidData);
} catch (error) {
if (error instanceof ActionError) {
console.log(error.code); // 'BLACKLISTED_EMAIL'
console.log(error.field); // 'email'
console.log(error.canRetry); // false
console.log(error.action); // 'users.create'
}
}

Middleware & Observability

Actions support middleware for cross-cutting concerns:

pulse.use(async (context, next) => {
const start = Date.now();
console.log(`Executing: ${context.action}`);
try {
const result = await next();
metrics.timing('action.duration', Date.now() - start, {
action: context.action,
status: 'success',
});
return result;
} catch (error) {
metrics.timing('action.duration', Date.now() - start, {
action: context.action,
status: 'error',
});
logger.error('Action failed', {
action: context.action,
error,
input: context.input,
});
throw error;
}
});

This enables:

  • Logging: Centralized logging for all actions
  • Metrics: Performance monitoring
  • Tracing: Distributed tracing integration
  • Authentication: User context injection

Real-World Example: Payment Processing

Here's a complete payment processing flow:

// Define payment action
const ProcessPaymentAction = defineAction({
name: 'payments.process',
input: z.object({
userId: z.string(),
amount: z.number().positive(),
currency: z.enum(['USD', 'EUR', 'GBP']),
paymentMethod: z.string(),
}),
output: z.object({
transactionId: z.string(),
status: z.enum(['succeeded', 'pending', 'failed']),
amount: z.number(),
}),
});
ProcessPaymentAction.handler(async ({ input, context }) => {
// 1. Validate user
const user = await context.db.users.findById(input.userId);
if (!user) {
throw new ActionError('User not found', { code: 'USER_NOT_FOUND' });
}
// 2. Check balance
if (input.amount > user.balance) {
throw new ActionError('Insufficient funds', {
code: 'INSUFFICIENT_FUNDS',
retry: false,
});
}
// 3. Create pending transaction
const transaction = await context.db.transactions.create({
userId: input.userId,
amount: input.amount,
status: 'pending',
});
try {
// 4. Process with payment provider
const result = await context.stripe.charges.create({
amount: input.amount * 100,
currency: input.currency,
customer: user.stripeCustomerId,
payment_method: input.paymentMethod,
});
// 5. Update transaction
await context.db.transactions.update(transaction.id, {
status: 'succeeded',
providerTransactionId: result.id,
});
// 6. Trigger async actions
await context.actions.dispatch('emails.receipt', {
userId: user.id,
transactionId: transaction.id,
});
await context.actions.dispatch('analytics.track', {
event: 'payment_succeeded',
userId: user.id,
properties: { amount: input.amount },
});
return {
transactionId: transaction.id,
status: 'succeeded',
amount: input.amount,
};
} catch (error) {
// 7. Handle failure
await context.db.transactions.update(transaction.id, {
status: 'failed',
errorMessage: error.message,
});
throw new ActionError('Payment failed', {
code: 'PAYMENT_FAILED',
retry: true,
cause: error,
});
}
});

Benefits Realized

After deploying Pulse in production:

  1. Type Safety: Zero runtime type errors in action execution
  2. Testability: Actions are pure functions, easy to test in isolation
  3. Observability: Centralized metrics and logging for all business logic
  4. Maintainability: Business logic in one place, not scattered
  5. Developer Experience: Auto-complete and type checking throughout

Performance Characteristics

Action Execution Overhead
┌─────────────────────────────────────┐
│ │
│ Validation: ~2ms │
│ Queue Management: ~5ms │
│ Serialization: ~1ms │
│ ───────────────────────── │
│ Total Overhead: ~8ms │
│ │
│ Typical Action: 50-200ms │
│ Overhead Impact: ~4% │
│ │
└─────────────────────────────────────┘

The framework adds minimal overhead while providing significant value.

Lessons Learned

  1. Schema-First Design: Zod schemas as source of truth enabled powerful type inference
  2. Separation of Concerns: Runtime and execution logic cleanly separated
  3. Queue Integration: BullMQ was the right choice for reliability
  4. Middleware Pattern: Enabled extensibility without framework bloat

What's Next

Future enhancements planned:

  • Saga Pattern: Multi-step transactions with rollback
  • Caching Layer: Automatic result caching for idempotent actions
  • Streaming Support: Real-time action progress updates
  • GraphQL Integration: Auto-generate GraphQL mutations from actions

Conclusion

Pulse transformed how we write business logic. By centralizing operations behind typed actions, we achieved:

  • Consistency: All business operations follow the same pattern
  • Reliability: Built-in retry logic and error handling
  • Velocity: New features ship faster with less boilerplate
  • Confidence: Type safety catches bugs at compile time

The framework is open-source and battle-tested in production. Check it out at github.com/femitubosun/pulse.

Building frameworks is hard, but when done right, they compound value across every feature you build on top of them.