Class: RuoteStomp::Receiver

Inherits:
Ruote::Receiver
  • Object
show all
Defined in:
lib/ruote-stomp/receiver.rb

Overview

Stomp Receiver

Used in conjunction with the RuoteStomp::Participant, the WorkitemListener subscribes to a specific direct exchange and monitors for incoming workitems. It expects workitems to arrive serialized as JSON.

Configuration

Stomp configuration is handled by directly manipulating the values of the Stomp.settings hash, as provided by the Stomp gem. No defaults are set by the listener. The only option parsed by the initializer of the workitem listener is the queue key (Hash expected). If no queue key is set, the listener will subscribe to the ruote_workitems direct exchange for workitems, otherwise it will subscribe to the direct exchange provided.

Usage

Register the engine or storage with the listener:

RuoteStomp::Receiver.new(engine_or_storage)

The workitem listener leverages the asynchronous nature of the stomp gem, so no timers are setup when initialized.

Options

:queue and :launchitems

See the RuoteStomp::Participant docs for information on sending workitems out to remote participants, and have them send replies to the correct direct exchange specified in the workitem attributes.

Direct Known Subclasses

LaunchitemListener, WorkitemListener

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine_or_storage, opts = {}) ⇒ Receiver

Starts a new Receiver

Two arguments for this method.

The first one should be a Ruote::Engine, a Ruote::Storage or a Ruote::Worker instance.

The second one is a hash for options. There are two known options :

:queue for setting the queue on which to listen (defaults to ‘ruote_workitems’).

:ignore_disconnect_on_process => true|false (defauts to false) processes the message even if the client has disconnected (use in testing only)

The :launchitems option :

:launchitems => true
  # the receiver accepts workitems and launchitems
:launchitems => false
  # the receiver only accepts workitems
:launchitems => :only
  # the receiver only accepts launchitems


69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/ruote-stomp/receiver.rb', line 69

def initialize(engine_or_storage, opts={})

  super(engine_or_storage)

  @launchitems = opts[:launchitems]
  ignore_disconnect = opts[:ignore_disconnect_on_process]

  @queue =
    opts[:queue] ||
    (@launchitems == :only ? '/queue/ruote_launchitems' : '/queue/ruote_workitems')

  RuoteStomp.start!

  if opts[:unsubscribe]
    begin
      $stomp.unsubscribe(@queue)
    rescue OnStomp::UnsupportedCommandError => e
      $stderr.puts("Connection does support unsubscribe")
    end
  end

  $stomp.subscribe(@queue) do |message|
    # Process your message here
    # Your submitted data is in msg.body
    if $stomp.connected? && !ignore_disconnect
      # do nothing, we're going down
    else
      handle(message)
    end
  end
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



43
44
45
# File 'lib/ruote-stomp/receiver.rb', line 43

def queue
  @queue
end

Instance Method Details

#decode_workitem(msg) ⇒ Object

(feel free to overwrite me)



107
108
109
# File 'lib/ruote-stomp/receiver.rb', line 107

def decode_workitem(msg)
  (Rufus::Json.decode(msg) rescue nil)
end

#stopObject



101
102
103
# File 'lib/ruote-stomp/receiver.rb', line 101

def stop
  RuoteStomp.stop!
end