Fractor fully supports both Ruby 3.x and Ruby 4.0+:
-
Ruby 3.0+: Uses
Ractor.yieldfor message passing from workers -
Ruby 4.0+: Uses
Ractor::Portfor more efficient communication patterns
Fractor automatically detects your Ruby version and uses the appropriate internal implementation. The user-facing API is identical across versions — write your code once, and Fractor handles the differences internally.
See Architecture: Ruby Version Compatibility for details on the internal differences.
require 'fractor'
# Define your work
class MyWork < Fractor::Work
def initialize(value)
super({ value: value })
end
def value
input[:value]
end
end
# Define your worker
class MyWorker < Fractor::Worker
def process(work)
result = work.value * 2
Fractor::WorkResult.new(result: result, work: work)
rescue => e
Fractor::WorkResult.new(error: e.message, work: work)
end
end
# Create supervisor and process work
supervisor = Fractor::Supervisor.new(
worker_pools: [{ worker_class: MyWorker }]
)
supervisor.add_work_items([
MyWork.new(1),
MyWork.new(2),
MyWork.new(3)
])
supervisor.run
puts "Results: #{supervisor.results.results.map(&:result)}"
# => Results: [2, 4, 6]Fractor supports flexible timeout configuration at three levels:
| Level | Syntax | Description |
|---|---|---|
Global |
|
Sets default timeout for all workers |
Worker |
|
Sets timeout for all work processed by a worker |
Work item |
|
Overrides worker timeout for specific work item |
# 1. Global default (optional)
Fractor.configure do |config|
config.default_worker_timeout = 60 # 60 seconds
end
# 2. Worker-level timeout
class FastWorker < Fractor::Worker
timeout 10 # 10 second timeout for this worker
def process(work)
Fractor::WorkResult.new(result: work.input * 2, work: work)
end
end
# 3. Per-work-item timeout (overrides worker timeout)
class MyWork < Fractor::Work
def initialize(value, timeout: nil)
super({ value: value }, timeout: timeout)
end
end
supervisor = Fractor::Supervisor.new(
worker_pools: [{ worker_class: FastWorker }]
)
# Mix work items with different timeouts
fast_work = MyWork.new(1, timeout: 5) # 5 seconds
normal_work = MyWork.new(2) # uses worker's 10s timeout
slow_work = MyWork.new(3, timeout: 30) # 30 seconds
supervisor.add_work_items([fast_work, normal_work, slow_work])
supervisor.runWhen a timeout occurs, the work item is marked as failed with error_category: :timeout and can be retried if using the workflow system.
For critical applications, work queues can be persisted to disk for crash recovery:
# Create a persistent queue with automatic saving
queue = Fractor::PersistentWorkQueue.new("data/queue.json")
# Add work items - automatically saved
queue << MyWork.new(data1)
queue << MyWork.new(data2)
# Use with ContinuousServer for crash recovery
server = Fractor::ContinuousServer.new(
worker_pools: [{ worker_class: MyWorker }],
work_queue: queue
)
# Load any previous work items on startup
queue.load
server.runSupported persistence formats: * JSON (default) - Human-readable, widely compatible * YAML - More readable than JSON * Marshal - Binary format, faster but Ruby-specific
For expensive, deterministic operations, Fractor provides a result cache to avoid redundant processing of identical work items:
# Create a cache with TTL (time-to-live)
cache = Fractor::ResultCache.new(ttl: 300) # 5 minutes
# Or with size limit
cache = Fractor::ResultCache.new(max_size: 1000)
# Or with memory limit
cache = Fractor::ResultCache.new(max_memory: 1024 * 1024) # 1MB
# Use the cache
result = cache.get(work) do
# This block only runs if work is not cached
expensive_operation(work)
end
# Check if work is cached
if cache.has?(work)
puts "Result is cached!"
end
# Manual cache operations
cache.set(work, result) # Store a result
cache.invalidate(work) # Remove a cached result
cache.clear # Remove all cached results
# Get cache statistics
stats = cache.stats
puts "Cache hit rate: #{stats[:hit_rate]}%"The cache generates consistent keys based on: * Work class name * Work input data * Work timeout (if set)
This means identical work items with the same input and timeout will share cached results.
Cache eviction policies: * TTL - Entries expire after a configured time * LRU - Least-recently-used entries are evicted when max_size is reached * Memory-based - Entries are evicted when max_memory is reached
-
Function-driven: Define processing logic by subclassing
Fractor::Worker -
Parallel execution: Work automatically distributed across Ractor workers
-
Two operating modes:
-
Pipeline mode for batch processing
-
Continuous mode for long-running servers
-
-
Workflow system: GitHub Actions-style declarative workflows
-
Error handling: Retry logic, circuit breakers, dead letter queues, error reporting
-
Production-ready: Signal handling, logging, monitoring, graceful shutdown
-
Performance tools: Built-in monitoring, benchmarking, and error analytics
-
High-level primitives: WorkQueue and ContinuousServer eliminate boilerplate
-
Installation - System requirements and installation methods
-
Getting Started - Quick start guides for both modes
-
Core Concepts - Understanding Fractor components
-
Pipeline Mode - Batch processing with predefined work
-
Continuous Mode - Long-running servers and streaming
-
Workflows - Declarative workflow system for complex pipelines
-
Error Handling - Retry logic, circuit breakers, and dead letter queues
-
Monitoring - Performance monitoring and metrics
-
Signal Handling - Process monitoring and graceful shutdown
-
API Reference - Complete API documentation
-
Examples - Complete examples for all patterns
-
Troubleshooting - Common issues and solutions
Fractor supports two distinct modes:
| Mode | Best for | Example use cases |
|---|---|---|
Pipeline Mode |
Batch processing, one-time jobs |
File processing, ETL pipelines, data transformations |
Continuous Mode |
Long-running servers, streaming |
Chat servers, job processors, event streams |
See Choosing Your Mode for detailed guidance.
supervisor = Fractor::Supervisor.new(
worker_pools: [{ worker_class: DataWorker }]
)
supervisor.add_work_items(dataset.map { |item| DataWork.new(item) })
supervisor.run
puts "Processed #{supervisor.results.results.size} items"See Simple Example and more examples.
work_queue = Fractor::WorkQueue.new
server = Fractor::ContinuousServer.new(
worker_pools: [{ worker_class: MessageWorker, num_workers: 4 }],
work_queue: work_queue
)
server.on_result { |result| puts "Processed: #{result.result}" }
server.on_error { |error| puts "Error: #{error.error}" }
Thread.new { server.run }
# Add work dynamically
work_queue << MessageWork.new(client_id: 1, message: "Hello")See Chat Server Example and more examples.
# Define workflow with simplified syntax
workflow = Fractor::Workflow.define("data-pipeline") do
job :extract, ExtractWorker
job :transform, TransformWorker, needs: :extract
job :load, LoadWorker, needs: :transform
end
# Execute workflow
result = workflow.new.execute(input_data)See Simplified Workflows and Workflow Guide.
Fractor includes production-ready features:
-
Signal handling: SIGTERM, SIGINT, SIGUSR1/SIGBREAK
-
Graceful shutdown: Complete in-progress work before exit
-
Process monitoring: Runtime status via signals
-
Structured logging: JSON logging with correlation IDs
-
Workflow visualization: Mermaid, DOT, ASCII diagrams
See Signal Handling Guide for deployment patterns.
Bug reports and pull requests are welcome on GitHub at https://github.com/metanorma/fractor.