Module: Chasqui

Extended by:
Forwardable
Defined in:
lib/chasqui.rb,
lib/chasqui/config.rb,
lib/chasqui/version.rb,
lib/chasqui/subscriber.rb,
lib/chasqui/queue_adapter.rb,
lib/chasqui/subscriptions.rb,
lib/chasqui/subscription_builder.rb,
lib/chasqui/queue_adapter/redis_queue_adapter.rb,
lib/chasqui/subscription_builder/resque_subscription_builder.rb,
lib/chasqui/subscription_builder/sidekiq_subscription_builder.rb

Overview

A persistent implementation of the publish-subscribe messaging pattern for Resque and Sidekiq workers.

Defined Under Namespace

Modules: QueueAdapter, QueueAdapters, Workers Classes: Broker, CLI, Config, ConfigurationError, RedisBroker, ResqueSubscriptionBuilder, SidekiqSubscriptionBuilder, Subscriber, SubscriptionBuilder, Subscriptions

Constant Summary collapse

VERSION =
'1.0.0'

Class Method Summary collapse

Class Method Details

.configure {|config| ... } ⇒ Object

Yields an object for configuring Chasqui.

Examples:

Chasqui.configure do |c|
  c.redis = 'redis://my-redis.example.com:6379'
  ...
end

Yield Parameters:

See Also:



38
39
40
# File 'lib/chasqui.rb', line 38

def configure(&block)
  yield config
end

.publish(channel, *args) ⇒ Object

Publish an event to a channel.

Parameters:

  • channel (String)

    the channel name

  • args (Array<#to_json>)

    an array of JSON serializable objects that comprise the event’s payload.



58
59
60
# File 'lib/chasqui.rb', line 58

def publish(channel, *args)
  redis.lpush inbox_queue, build_event(channel, *args).to_json
end

.subscribe(options = {}) ⇒ Object

Subscribe workers to channels.

Chasqui.subscribe(queue: 'high-priority') do
  on 'channel1', Worker1
  on 'channel2', Worker2
  on 'channel3', ->(event) { ... }, queue: 'low-priority'
  ...
end

The .subscribe method creates a context for registering workers to receive events for specified channels. Within a subscribe block you make calls to the #on method to create subscriptions.

#on expects a channel name as the first argument and either a Resque/Sidekiq worker as the second argument or a callable object, such as a proc, lambda, or any object that responds to #call.

Parameters:

  • options (Hash) (defaults to: {})

    default options for calls to #on. The defaults will be overriden by options supplied to the #on method directly. See Chasqui::SubscriptionBuilder#on for available options.

See Also:



84
85
86
87
# File 'lib/chasqui.rb', line 84

def subscribe(options={})
  builder = SubscriptionBuilder.builder(subscriptions, options)
  builder.instance_eval &Proc.new
end

.unsubscribe(channel, queue, worker = nil) ⇒ Object

Unsubscribe workers from a channel.

When you unsubscribe from a channel, the broker will stop placing jobs on the worker queue. When only given channel and queue arguments, #unsubscribe will unsubscribe all workers using that channel and queue. When the additional worker argument is given, Chasqui will only unsubscribe the given worker.

Parameters:

  • channel (String)

    the channel name

  • queue (String)

    the queue name

  • worker (.perform, #perform, #call) (defaults to: nil)

    the worker class or proc for a a currently subscribed worker



110
111
112
113
114
115
116
117
118
# File 'lib/chasqui.rb', line 110

def unsubscribe(channel, queue, worker=nil)
  subscribers = if worker
                  [Subscriber.new(channel, queue, worker)]
                else
                  subscriptions.find channel, queue
                end

  subscribers.each { |sub| subscriptions.unregister sub }
end