NatsWork::Server

Job processing server for the NatsWork distributed job processing system.

Installation

Add to your application's Gemfile:

gem 'natswork-server'

And execute:

$ bundle install

Configuration

NatsWork::Server.configure do |config|
  config.nats_url = ENV['NATS_URL'] || 'nats://localhost:4222'
  config.concurrency = 25
  config.queues = ['critical', 'default', 'low']
  config.logger = Logger.new(STDOUT)
end

Starting a Worker

Command Line

# Start with default settings
natswork

# Specify queues and concurrency
natswork -q critical,default -c 10

# With config file
natswork -C config/natswork.yml

Programmatically

require 'natswork/server'

worker = NatsWork::Worker.new(
  queues: ['critical', 'default'],
  concurrency: 10,
  logger: Logger.new(STDOUT)
)

# Handle signals gracefully
trap('TERM') { worker.stop }
trap('INT') { worker.stop }

worker.start

Worker Features

Concurrency

Workers process jobs concurrently using a thread pool:

worker = NatsWork::Worker.new(
  concurrency: 25  # Number of concurrent jobs
)

Queue Priority

Process queues in order of priority:

worker = NatsWork::Worker.new(
  queues: ['critical', 'default', 'low'],
  strict: true  # Strict priority ordering
)

Middleware

Add server-side middleware:

class MetricsMiddleware
  def call(job, message)
    start = Time.now
    yield
  ensure
    duration = Time.now - start
    StatsD.timing("job.duration", duration, tags: ["job:#{job.class.name}"])
  end
end

NatsWork::Server.configure do |config|
  config.server_middleware do |chain|
    chain.add MetricsMiddleware
  end
end

Error Handling

Configure error handling and retries:

NatsWork::Server.configure do |config|
  # Retry configuration
  config.max_retries = 5
  config.retry_backoff = :exponential

  # Dead letter queue
  config.enable_dead_letter = true
  config.dead_letter_queue = 'failed_jobs'

  # Error callbacks
  config.error_handler = ->(error, job, message) {
    Bugsnag.notify(error, {
      job_class: job.class.name,
      job_id: message['job_id']
    })
  }
end

Monitoring

Health Checks

Built-in health check endpoint:

# Add health checks
NatsWork::HealthChecker.global.add_check('database') do
  ActiveRecord::Base.connection.active?
end

# Check health
health = NatsWork::HealthChecker.global.report
# => { status: :healthy, checks: { ... } }

Metrics

Track worker metrics:

metrics = worker.stats
# => {
#   jobs_processed: 1000,
#   jobs_failed: 5,
#   active_jobs: 3,
#   queue_latency: { default: 0.5 }
# }

Logging

Structured logging with job context:

NatsWork::Server.configure do |config|
  config.logger = Logger.new(STDOUT)
  config.log_level = :info
  config.log_format = :json  # JSON structured logs
end

Deployment

Systemd

[Unit]
Description=NatsWork Worker
After=network.target

[Service]
Type=simple
User=app
WorkingDirectory=/app
ExecStart=/usr/local/bin/bundle exec natswork
Restart=always
RestartSec=3

[Install]
WantedBy=multi-user.target

Docker

FROM ruby:3.1

WORKDIR /app
COPY Gemfile* ./
RUN bundle install

COPY . .

CMD ["bundle", "exec", "natswork"]

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: natswork-worker
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: worker
        image: myapp/worker:latest
        env:
        - name: NATS_URL
          value: nats://nats:4222
        - name: CONCURRENCY
          value: "25"
        resources:
          requests:
            memory: "256Mi"
            cpu: "500m"
          limits:
            memory: "512Mi"
            cpu: "1000m"

Scaling

Horizontal Scaling

Deploy multiple workers:

# Start multiple workers on different machines
natswork -q default -c 25  # Machine 1
natswork -q default -c 25  # Machine 2
natswork -q default -c 25  # Machine 3

Auto-scaling

Configure based on queue depth:

# Monitor queue depth
monitor = NatsWork::QueueMonitor.new
if monitor.queue_depth('default') > 1000
  # Scale up workers
end

Testing

require 'natswork/testing'

RSpec.describe MyJob do
  it 'processes job' do
    NatsWork::Testing.inline! do
      MyJob.perform_async('arg1', 'arg2')
      # Job runs synchronously in test
    end
  end
end

API Reference

See the API documentation for detailed class and method documentation.

Contributing

Bug reports and pull requests are welcome at https://github.com/yourusername/natswork.

License

MIT License