Class: Ruote::Amqp::Receiver

Inherits:
Receiver
  • Object
show all
Defined in:
lib/ruote/amqp/receiver.rb

Overview

A receiver is plugged between a ruote engine/storage and an AMQP queue. It will listen on the queue for messages, try to turn them into workitems and feed those workitem back to the engine/storage (in the usual use case, those workitems were initially emitted by the engine).

#decode_message(headers, payload)

By default, the receiver expects the incoming workitem to be serialized entirely in the payload of the AMQP message. One can change this behaviour by overriding the #decode_workitem method (usually when subclassing)

class MyYamlReceiver < Ruote::Amqp::Receiver
  def decode_message(headers, payload)
    YAML.load(payload)
  end
end

#decode_message is supposed to return a Ruby hash (describing either a workitem or a launchitem), the difference is explained below.

workitems and launchitems

The standard use case is to accept workitems coming back (they probably left the engine via Ruote::Amqp::Participant). But it’s also OK to accept “launchitems”, hashes with at least one ‘process_definition’ (or ‘definition’) entry.

Upon receiving a launchitem, the receiver will launch a new process instances.

Launchitems may have two more optional entries, ‘workitems_fields’ (or ‘fields’) and ‘process_variables’ (or ‘variables’).

‘workitem_fields’ must contain a hash of initial workitem fields (they will populate the initial workitem.

‘process_variables’ are a very advanced option. It’s possible to set the initial variables in a workflow. Read the general ruote documentation to learn about the difference between fields and variables.

The #decode_message is supposed to return a hash representing either a workitem, either a launchitem.

#handle_error(err)

Out of the box, the receiver will print out to $stderr the details of errors it encounters when receiving, decoding and handing back the workitems (messages?) to the engine. This can be changed by overriding the #handle_error method:

class MyReceiver < Ruote::Amqp::Receiver
  def handle_error(err)
    ThatLoggerService.log(err)
  end
end

initialization options

One can set the “launch_only” option to true when initializing the receiver to prevent it from handling anything but launchitems.

receiver = Ruote::Amqp::Receiver.new(
  @dashboard, @amqp_queue, :launch_only => true)

‘launch_only’ (string) is valid too.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine_or_storage, queue, options = {}) ⇒ Receiver

Returns a new instance of Receiver.



99
100
101
102
103
104
105
# File 'lib/ruote/amqp/receiver.rb', line 99

def initialize(engine_or_storage, queue, options={})

  super(engine_or_storage, Ruote.keys_to_s(options))

  @queue = queue
  @queue.subscribe(&method(:handle))
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



97
98
99
# File 'lib/ruote/amqp/receiver.rb', line 97

def queue
  @queue
end

Instance Method Details

#shutdownObject



107
108
109
110
# File 'lib/ruote/amqp/receiver.rb', line 107

def shutdown

  @queue.unsubscribe
end