EventHub::Processor2
Next generation gem to build ruby based eventhub processors. Implementation is based on Celluloid, an Actor-based concurrent object framework for Ruby https://celluloid.io. The main idea is to have sub-components in your application and have them supervised and automatically re-booted when they crash.
Processor2 has currently the following sub-components implemented
- Heartbeater - send hearbeats to EventHub dispatcher every x minutes
- Publisher - responsible for message publishing
- Watchdog - Checks regularly broker connection and defined listener queue(s)
- Listener AMQP - Listens to defined AMQP queues, parses recevied message into a EventHub::Message instance and calls handle_message method as defined in derived class.
- Listener HTTP - Provides HTTP endpoints for health checks and monitoring (e.g. /svc/class_name/heartbeat, /svc/class_name/version)
Processor2 is using Bunny http://rubybunny.info a feature complete RabbitMQ Client to interact with message broker. Processor2 can deal with long running message processing.
Supported Ruby Versions
Currently supported and tested ruby versions are:
- 3.4 (EOL 2028-03-31)
- 3.3 (EOL 2027-03-31)
- 3.2 (EOL 2026-03-31)
Ruby versions not tested anymore:
- 3.1 (EOL 2025-03-31)
- 3.0 (EOL 2024-04-23)
Installation
Add this line to your application's Gemfile:
gem "eventhub-processor2"
And then execute:
$ bundle
Or install it yourself as:
$ gem install eventhub-processor2
Usage
Create example.rb
module EventHub
class Example < Processor2
def version
"1.0.0" # define your version
end
def (, args = {})
# deal with your parsed EventHub message
# message.class => EventHub::Message
puts .process_name # or whatever you need to do
# args is a hash with currently following keys
# => :queue_name (used when listening to multiple queues)
# => :content_type
# => :priority
# => :delivery_tag
# => :correlation_id (if present in incoming AMQP message)
# if an exception is raised in your code
# it will be automatically catched by
# the processor2 gem and returned
# to the event_hub.inbound queue
# it is possible to publish a message during message processing but it's a
# good style to return one or multiple messages at end of handle_message
publish(message: "your message as a string") # default exchange_name is 'event_hub.inbound'
publish(message: "your message as string", exchange_name: "your_specfic_exchange")
# at the end return one of
= .copy # return message if sucessfull processing
# message.copy sets status.code automatically
# to EventHub::STATUS_SUCCESS which signals
# dispatcher successful processing
# or if you have multiple messages to return to event_hub.inbound queue
[ , , ]
# or if there is no message to return to event_hub.inbound queue
nil # [] works as well
end
end
end
# start your processor instance
EventHub::Example.new.start
Start your processor and pass optional arguments
bundle exec ruby example.rb --help
Usage: example [options]
-e, --environment ENVIRONMENT Define environment (default development)
-d, --detached Run processor detached as a daemon
-c, --config CONFIG Define configuration file
--console-log-only Logs to console only (E.g. containers)
bundle exec ruby example.rb
I, [2018-02-09T15:22:35.649646 #37966] INFO -- : example (1.1.0): has been started
I, [2018-02-09T15:22:35.652592 #37966] INFO -- : Heartbeat is starting...
I, [2018-02-09T15:22:35.657200 #37966] INFO -- : Publisher is starting...
I, [2018-02-09T15:22:35.657903 #37966] INFO -- : Watchdog is starting...
I, [2018-02-09T15:22:35.658336 #37966] INFO -- : Running watchdog...
I, [2018-02-09T15:22:35.658522 #37966] INFO -- : Listener is starting...
I, [2018-02-09T15:22:35.699161 #37966] INFO -- : Listening to queue [example]
Logging
By default, Processor2 logs to both stdout (standard format) and a logstash file. For containerized environments (Docker, Kubernetes), use the --console-log-only option to output structured JSON logs to stdout only:
bundle exec ruby example.rb --console-log-only
This outputs logs in JSON format suitable for log aggregation systems:
{"@timestamp":"2026-01-18T10:00:00.000Z","@version":"1","severity":"INFO","host":"server1","application":"example","environment":"production","message":"example (1.0.0): has been started"}
Correlation ID
Processor2 supports automatic propagation of correlation IDs for distributed tracing. While EventHub messages already contain an execution_id in the message body for tracing, the AMQP correlation_id provides an additional benefit: it's part of the message metadata (envelope), not the payload. This means it's available even when the JSON body is invalid and cannot be parsed - useful for error tracking and debugging malformed messages.
When an incoming AMQP message includes a correlation_id in its metadata:
- Automatic logging: All log messages during message processing will include
correlation_idas a separate field in structured JSON output - Automatic publishing: Any messages published during processing will automatically include the same correlation_id in their AMQP headers
- Available in args: The correlation_id is passed to
handle_messageviaargs[:correlation_id] - Consistent execution_id: When creating new messages,
execution_idis automatically set to matchcorrelation_id, ensuring consistent tracing across both AMQP metadata and message body
This happens transparently without any changes to your handle_message implementation:
def (, args = {})
# correlation_id is available in args if needed
correlation_id = args[:correlation_id]
# Logging automatically includes correlation_id in JSON output
EventHub.logger.info("Processing order")
# Publishing automatically includes correlation_id
publish(message: response.to_json)
.copy
end
You can also explicitly set or override the correlation_id when publishing:
publish(message: msg.to_json, correlation_id: "550e8400-e29b-41d4-a716-446655440000")
If no correlation_id is present in the AMQP metadata, the message body's execution_id is used as fallback to ensure tracing continuity.
How it works
- Received: When an AMQP message arrives,
correlation_idis extracted from the message metadata - Fallback: If no
correlation_idin AMQP metadata, the message body'sexecution_idis used instead (ensuring tracing continuity) - Stored: The value is stored in thread-local storage (
Thread.current) for the duration of message processing - Passed: The
correlation_idis passed tohandle_messageviaargs[:correlation_id] - Logging: The logger automatically reads from thread-local storage and includes it in JSON output
- Publishing: The publisher automatically reads from thread-local storage and adds it to outgoing AMQP message headers (can be overwritten by passing
correlation_id:explicitly) - New messages: When creating a new
EventHub::Message:- With
correlation_idpresent →execution_idis set to matchcorrelation_id - Without
correlation_id→execution_idis set to a new UUID (default behavior)
- With
- Cleared: After message processing completes, the stored value is cleared
This design ensures correlation_id flows transparently through the entire message processing lifecycle. No code changes are required for existing implementations - just update the processor2 gem dependency.
Note: We use correlation_id (snake_case) to follow Ruby naming conventions, AMQP standard message properties, and stay consistent with other args keys like :queue_name, :content_type, etc. The correlation_id value is typically a UUID (e.g., 550e8400-e29b-41d4-a716-446655440000).
Configuration
If --config option is not provided processor tries to load config/class_name.json. If file does not exist it loads default values as specified below.
{
"development": {
"server": {
"user": "guest",
"password": "guest",
"host": "localhost",
"vhost": "event_hub",
"port": 5672,
"tls": false,
"tls_cert": null,
"tls_key": null,
"tls_ca_certificates": [],
"verify_peer": false,
"show_bunny_logs": false,
"http": {
"bind_address": "localhost",
"port": 8080,
"base_path": "/svc/{class_name}"
}
},
"processor": {
"listener_queues": [
"{class_name}"
],
"heartbeat_cycle_in_s": 300,
"watchdog_cycle_in_s": 15,
"restart_in_s": 15
}
}
}
Default configuration will dynamically resolve class_name. Exp. if your class is called MyClass and is derived from Processor2, value of class_name would be "my_class". You can overwrite config settings as needed.
More details about TLS configuration for underlying Bunny gem can be found here: http://rubybunny.info/articles/tls.html.
Feel free to define additional hash key/values (outside of server and processor key) as required by your application.
{
"development": {
"server": {
},
"processor": {
},
"database": {
"user": "guest",
"password": "secret",
"name": {
"subname": "value"
}
}
}
}
Processor2 symbolizes keys and sub-keys from configuration files automatically.
# access configuration values in your application as follows
EventHub::Configuration.database[:user] # => "guest"
EventHub::Configuration.database[:password] # => "secret"
EventHub::Configuration.database[:name][:subname] # => "value"
# If you need strings instead of symbols you can do
database = stringify_keys(EventHub::Configuration.database)
database["user"] # => "guest"
database["password"] # => "secret"
database["name"]["subname"] # => "value"
Version 1.17 and newer allows you to load and merge more configuration files programmatically. It is expected that load! is called once (implicit during class initialization) and then load_more! zero, one, or multiple times. All additional files loaded with load_more! are hash deep merged into one configuration structure. Exceptions while loading of files will be catched and shown as warnings.
# specify a file
EventHub::Configuration.load_more!(pattern: "config/another_config.json")
# specify glob patterns to load multiple files
EventHub::Configuration.load_more!(pattern: "config/processes/**/*.json")
EventHub::Configuration.load_more!(pattern: "config/templates/**/*.json")
If you have conflicting hashes, the previous settings will be overwritten.
1st file loaded
{
"test": {
"a": "a_value",
"b": "b_value"
}
}
2nd file loaded
{
"test": {
"b": "another_value"
}
}
Final configuration result
{
"test": {
"a": "a_value",
"b": "another_value"
}
}
HTTP Resources
Processor2 exposes HTTP resources for health checks and monitoring. All resources share the same HTTP server configuration.
Configuration:
{
"server": {
"http": {
"bind_address": "localhost",
"port": 8080,
"base_path": "/svc/{class_name}"
}
}
}
Resources are mounted under the base_path:
{base_path}/heartbeat- Health check{base_path}/version- Version info as JSON{base_path}/docs- README documentation as HTML{base_path}/docs/configuration- Configuration as HTML table{base_path}/docs/changelog- CHANGELOG as HTML{base_path}/assets/*- Static assets (CSS, images)
Accessing {base_path} or {base_path}/ redirects to {base_path}/docs.
Backward Compatibility: If you have existing configuration using the old heartbeat block with bind_address, port, and path, it will continue to work. The new http configuration takes precedence when both are present.
Heartbeat
Returns detailed processor status information as JSON.
GET {base_path}/heartbeat
Response: 200 OK with JSON body
{
"version": "1.0.0",
"pid": 12345,
"environment": "production",
"heartbeat": {
"started": "2026-01-18T10:00:00.000Z",
"stamp_last_request": "2026-01-18T12:30:00.000Z",
"uptime_in_ms": 9000000,
"heartbeat_cycle_in_ms": 300000,
"queues_consuming_from": ["my_processor"],
"queues_publishing_to": ["event_hub.inbound"],
"host": "server1.example.com",
"addresses": [
{"interface": "en0", "host_name": "server1", "ip_address": "192.168.1.100"}
],
"messages": {
"total": 1000,
"successful": 995,
"unsuccessful": 5,
"average_size": 2048,
"average_process_time_in_ms": 50,
"total_process_time_in_ms": 50000
}
}
}
Version
Returns the processor version as JSON.
GET {base_path}/version
Response: 200 OK with JSON body
{
"version": "1.0.0"
}
The version is taken from the version method in your derived processor class. If not defined, it returns "?.?.?".
class MyProcessor < EventHub::Processor2
def version
"1.0.0" # your version
end
end
Docs
Serves README.md as HTML with a built-in layout.
GET {base_path}/docs
Response: 200 OK with HTML page
By default, looks for README.md in the current directory, then doc/README.md. You can customize the path via configuration:
{
"server": {
"http": {
"docs": {
"readme_path": "/custom/path/to/README.md"
}
}
}
}
Or override completely by defining a readme_as_html method in your processor:
class MyProcessor < EventHub::Processor2
def readme_as_html
"<h1>Custom Documentation</h1><p>Your content here.</p>"
end
end
Changelog
Serves CHANGELOG.md as HTML with a built-in layout.
GET {base_path}/docs/changelog
Response: 200 OK with HTML page
By default, looks for CHANGELOG.md in the current directory, then doc/CHANGELOG.md. You can customize the path via configuration:
{
"server": {
"http": {
"docs": {
"changelog_path": "/custom/path/to/CHANGELOG.md"
}
}
}
}
Or override completely by defining a changelog_as_html method in your processor:
class MyProcessor < EventHub::Processor2
def changelog_as_html
"<h1>Custom Changelog</h1><p>Your changes here.</p>"
end
end
Configuration
Displays the active configuration as an HTML table. Sensitive values (passwords, tokens, keys) are automatically redacted.
GET {base_path}/docs/configuration
Response: 200 OK with HTML page
By default, the following keys are redacted: password, secret, token, api_key, credential. You can customize the list by defining a sensitive_keys method in your processor:
# Override the entire list
class MyProcessor < EventHub::Processor2
def sensitive_keys
%w[password secret token api_key credential connection_string]
end
end
# Or extend the default list
class MyProcessor < EventHub::Processor2
def sensitive_keys
EventHub::DocsRenderer::DEFAULT_SENSITIVE_KEYS + %w[connection_string]
end
end
Or override the entire page by defining a configuration_as_html method:
class MyProcessor < EventHub::Processor2
def configuration_as_html
"<h1>Custom Configuration</h1><p>Your content here.</p>"
end
end
Disabling Resources
By default, all HTTP resources are enabled. You can control which resources are available by defining an http_resources method in your processor. The navbar adapts automatically.
class MyProcessor < EventHub::Processor2
def http_resources
[:heartbeat, :version, :docs, :changelog, :configuration] # default: all enabled
end
end
To disable the configuration page for example:
class MyProcessor < EventHub::Processor2
def http_resources
[:heartbeat, :version, :docs, :changelog]
end
end
Customizing Footer
The documentation pages display company name, version, and environment in the footer. Company name defaults to "Novartis" but can be customized by defining a company_name method in your processor:
class MyProcessor < EventHub::Processor2
def company_name
"My Company"
end
end
Future Extension: A future version could allow overriding the default layout template and CSS assets using convention over configuration (e.g., placing custom files in doc/layout.erb or doc/app.css).
Development
# Get the source code
git clone https://github.com/thomis/eventhub-processor2.git
# Install dependencies
bundle
# Setup rabbitmq docker container with initial definitions. This can be run multiple times to get your container back into an initial state
bundle exec rake init
# Run all rspec tests
bundle exec rake
To install this gem onto your local machine, run bundle exec rake install.
Publishing
This project uses Trusted Publishing to securely publish gems to RubyGems.org via GitHub Actions. To release a new version:
- Update the version number in
lib/eventhub/version.rb - Merge changes to
main - Create a version tag either via command line or GitHub:
git tag v1.x.x git push origin v1.x.xOr on GitHub: go to Releases → Create a new release → enter the tag name (e.g.v1.26.0), selectmainas target, and publish. - The
02 - Test, Build and Releaseworkflow triggers automatically, runs tests, builds the gem, and publishes it to rubygems.org
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/thomis/eventhub-processor2.