Building Pulse: A Type-Safe Business Logic Framework
Building Pulse: A Type-Safe Business Logic Framework
Over the past year, I've built several production systems where business logic became scattered across controllers, services, and background jobs. The same patterns kept appearing: validating input, executing logic, handling async operations, and managing failures. This repetition led me to create Pulse - a type-safe framework that centralizes business logic execution.
The Problem Space
Traditional backend architectures often suffer from:
- Logic Dispersion: Business rules scattered across HTTP handlers, service classes, and worker processes
- Inconsistent Validation: Different validation approaches for synchronous vs asynchronous operations
- Poor Observability: Difficult to track and monitor business operations across the system
- Type Unsafety: Runtime errors from mismatched inputs and outputs
Here's what typical scattered logic looks like:
// HTTP Controllerapp.post('/users', async (req, res) => {const { email, name } = req.body;if (!email || !validateEmail(email)) {return res.status(400).json({ error: 'Invalid email' });}const user = await db.users.create({ email, name });await emailQueue.add('welcome', { userId: user.id });res.json(user);});// Background JobemailQueue.process('welcome', async (job) => {const { userId } = job.data;const user = await db.users.findById(userId);await sendEmail(user.email, 'Welcome!');});
Notice the problems:
- Validation logic mixed with business logic
- No type safety between controller and background job
- Duplicate data fetching
- Hard to test in isolation
Core Architecture
Pulse introduces the concept of Actions - typed business operations that can run synchronously or asynchronously. Here's the high-level architecture:
┌─────────────────────────────────────────────────────────┐│ Action Definition ││ ┌─────────────┐ ┌──────────────┐ ┌───────────────┐ ││ │ Schema │ │ Handler │ │ Metadata │ ││ │ (Zod) │ │ (Function) │ │ (Config) │ ││ └─────────────┘ └──────────────┘ └───────────────┘ │└─────────────────────────────────────────────────────────┘│▼┌─────────────────────────────────────────────────────────┐│ Pulse Runtime ││ ││ ┌──────────────┐ ┌─────────────────┐ ││ │ Validate │────────▶│ Execute │ ││ │ Input │ │ Handler │ ││ └──────────────┘ └─────────────────┘ ││ │ ││ ┌────────┴─────────┐ ││ ▼ ▼ ││ ┌──────────┐ ┌──────────┐ ││ │ Sync │ │ Async │ ││ │ Return │ │ Queue │ ││ └──────────┘ └──────────┘ │└─────────────────────────────────────────────────────────┘
Building an Action
Let's build a complete action from scratch. Here's how Pulse works:
1. Define the Schema
First, we define the input/output schema using Zod:
import { z } from 'zod';import { defineAction } from '@pulse/core';const CreateUserAction = defineAction({name: 'users.create',input: z.object({email: z.string().email(),name: z.string().min(2),role: z.enum(['user', 'admin']).default('user'),}),output: z.object({id: z.string(),email: z.string(),name: z.string(),createdAt: z.date(),}),});
This gives us:
- Compile-time types extracted from Zod schemas
- Runtime validation ensuring data integrity
- Documentation through schema inspection
2. Implement the Handler
Next, we implement the business logic:
CreateUserAction.handler(async ({ input, context }) => {// Input is fully typed and validatedconst existingUser = await context.db.users.findByEmail(input.email);if (existingUser) {throw new ActionError('User already exists', {code: 'USER_EXISTS',field: 'email',});}const user = await context.db.users.create({email: input.email,name: input.name,role: input.role,});// Trigger async actionawait context.actions.dispatch('emails.welcome', {userId: user.id,});// Return is type-checked against output schemareturn {id: user.id,email: user.email,name: user.name,createdAt: user.createdAt,};});
3. Execute the Action
Now we can execute it from anywhere:
// HTTP handlerapp.post('/users', async (req, res) => {const result = await pulse.execute('users.create', req.body);res.json(result);});// Or queue it for async processingawait pulse.dispatch('users.create', {email: 'user@example.com',name: 'John Doe',});
Synchronous vs Asynchronous Execution
Pulse provides two execution modes with identical interfaces:
┌──────────────────────────────────────────────────────┐│ Sync Execution ││ ││ HTTP Request ││ │ ││ ▼ ││ ┌─────────┐ ┌──────────┐ ┌──────────┐ ││ │ Validate│─────▶│ Execute │─────▶│ Return │ ││ └─────────┘ └──────────┘ └──────────┘ ││ │ ││ ▼ ││ HTTP Response │└──────────────────────────────────────────────────────┘┌──────────────────────────────────────────────────────┐│ Async Execution ││ ││ Event/Trigger ││ │ ││ ▼ ││ ┌─────────┐ ┌──────────┐ ││ │ Validate│─────▶│ Queue │ ││ └─────────┘ └──────────┘ ││ │ ││ ▼ ││ ┌──────────┐ ┌──────────┐ ││ BullMQ│ Worker │─────▶│ Execute │ ││ └──────────┘ └──────────┘ │└──────────────────────────────────────────────────────┘
Implementation:
// Synchronous - waits for resultconst user = await pulse.execute('users.create', {email: 'user@example.com',name: 'John Doe',});// Asynchronous - returns job ID immediatelyconst jobId = await pulse.dispatch('users.create', {email: 'user@example.com',name: 'John Doe',});// Check job status laterconst job = await pulse.getJob(jobId);console.log(job.status); // 'completed' | 'failed' | 'pending'
Error Handling
Pulse provides structured error handling with type safety:
import { ActionError } from '@pulse/core';CreateUserAction.handler(async ({ input }) => {if (await isEmailBlacklisted(input.email)) {throw new ActionError('Email is blacklisted', {code: 'BLACKLISTED_EMAIL',field: 'email',retry: false, // Don't retry this error});}try {return await createUser(input);} catch (error) {throw new ActionError('Database error', {code: 'DB_ERROR',retry: true, // Can retrycause: error,});}});
Errors propagate with full context:
try {await pulse.execute('users.create', invalidData);} catch (error) {if (error instanceof ActionError) {console.log(error.code); // 'BLACKLISTED_EMAIL'console.log(error.field); // 'email'console.log(error.canRetry); // falseconsole.log(error.action); // 'users.create'}}
Middleware & Observability
Actions support middleware for cross-cutting concerns:
pulse.use(async (context, next) => {const start = Date.now();console.log(`Executing: ${context.action}`);try {const result = await next();metrics.timing('action.duration', Date.now() - start, {action: context.action,status: 'success',});return result;} catch (error) {metrics.timing('action.duration', Date.now() - start, {action: context.action,status: 'error',});logger.error('Action failed', {action: context.action,error,input: context.input,});throw error;}});
This enables:
- Logging: Centralized logging for all actions
- Metrics: Performance monitoring
- Tracing: Distributed tracing integration
- Authentication: User context injection
Real-World Example: Payment Processing
Here's a complete payment processing flow:
// Define payment actionconst ProcessPaymentAction = defineAction({name: 'payments.process',input: z.object({userId: z.string(),amount: z.number().positive(),currency: z.enum(['USD', 'EUR', 'GBP']),paymentMethod: z.string(),}),output: z.object({transactionId: z.string(),status: z.enum(['succeeded', 'pending', 'failed']),amount: z.number(),}),});ProcessPaymentAction.handler(async ({ input, context }) => {// 1. Validate userconst user = await context.db.users.findById(input.userId);if (!user) {throw new ActionError('User not found', { code: 'USER_NOT_FOUND' });}// 2. Check balanceif (input.amount > user.balance) {throw new ActionError('Insufficient funds', {code: 'INSUFFICIENT_FUNDS',retry: false,});}// 3. Create pending transactionconst transaction = await context.db.transactions.create({userId: input.userId,amount: input.amount,status: 'pending',});try {// 4. Process with payment providerconst result = await context.stripe.charges.create({amount: input.amount * 100,currency: input.currency,customer: user.stripeCustomerId,payment_method: input.paymentMethod,});// 5. Update transactionawait context.db.transactions.update(transaction.id, {status: 'succeeded',providerTransactionId: result.id,});// 6. Trigger async actionsawait context.actions.dispatch('emails.receipt', {userId: user.id,transactionId: transaction.id,});await context.actions.dispatch('analytics.track', {event: 'payment_succeeded',userId: user.id,properties: { amount: input.amount },});return {transactionId: transaction.id,status: 'succeeded',amount: input.amount,};} catch (error) {// 7. Handle failureawait context.db.transactions.update(transaction.id, {status: 'failed',errorMessage: error.message,});throw new ActionError('Payment failed', {code: 'PAYMENT_FAILED',retry: true,cause: error,});}});
Benefits Realized
After deploying Pulse in production:
- Type Safety: Zero runtime type errors in action execution
- Testability: Actions are pure functions, easy to test in isolation
- Observability: Centralized metrics and logging for all business logic
- Maintainability: Business logic in one place, not scattered
- Developer Experience: Auto-complete and type checking throughout
Performance Characteristics
Action Execution Overhead┌─────────────────────────────────────┐│ ││ Validation: ~2ms ││ Queue Management: ~5ms ││ Serialization: ~1ms ││ ───────────────────────── ││ Total Overhead: ~8ms ││ ││ Typical Action: 50-200ms ││ Overhead Impact: ~4% ││ │└─────────────────────────────────────┘
The framework adds minimal overhead while providing significant value.
Lessons Learned
- Schema-First Design: Zod schemas as source of truth enabled powerful type inference
- Separation of Concerns: Runtime and execution logic cleanly separated
- Queue Integration: BullMQ was the right choice for reliability
- Middleware Pattern: Enabled extensibility without framework bloat
What's Next
Future enhancements planned:
- Saga Pattern: Multi-step transactions with rollback
- Caching Layer: Automatic result caching for idempotent actions
- Streaming Support: Real-time action progress updates
- GraphQL Integration: Auto-generate GraphQL mutations from actions
Conclusion
Pulse transformed how we write business logic. By centralizing operations behind typed actions, we achieved:
- Consistency: All business operations follow the same pattern
- Reliability: Built-in retry logic and error handling
- Velocity: New features ship faster with less boilerplate
- Confidence: Type safety catches bugs at compile time
The framework is open-source and battle-tested in production. Check it out at github.com/femitubosun/pulse.
Building frameworks is hard, but when done right, they compound value across every feature you build on top of them.