Class: OpenWFE::Extras::SqsListener
- Inherits:
-
Service
- Object
- Service
- OpenWFE::Extras::SqsListener
- Includes:
- MonitorMixin, Rufus::Schedulable, WorkItemListener
- Defined in:
- lib/openwfe/extras/listeners/sqslisteners.rb
Overview
Polls an Amazon SQS queue for workitems
Workitems can be instances of InFlowWorkItem or LaunchItem.
require 'openwfe/extras/listeners/sqslisteners'
ql = OpenWFE::SqsListener("workqueue1", engine.application_context)
engine.add_workitem_listener(ql, "2m30s")
#
# thus, the engine will poll our "workqueue1" SQS queue
# every 2 minutes and 30 seconds
Instance Attribute Summary collapse
-
#queue_name ⇒ Object
readonly
The name of the Amazon SQS whom this listener cares for.
Instance Method Summary collapse
-
#decode_object(message) ⇒ Object
Extracts a workitem from the message’s body.
-
#initialize(queue_name, application_context) ⇒ SqsListener
constructor
A new instance of SqsListener.
-
#trigger(params) ⇒ Object
polls the SQS for incoming messages.
Constructor Details
#initialize(queue_name, application_context) ⇒ SqsListener
Returns a new instance of SqsListener.
79 80 81 82 83 84 85 86 87 88 |
# File 'lib/openwfe/extras/listeners/sqslisteners.rb', line 79 def initialize (queue_name, application_context) @queue_name = queue_name.to_s service_name = "#{self.class}::#{@queue_name}" super service_name, application_context linfo { "new() queue is '#{@queue_name}'" } end |
Instance Attribute Details
#queue_name ⇒ Object (readonly)
The name of the Amazon SQS whom this listener cares for
77 78 79 |
# File 'lib/openwfe/extras/listeners/sqslisteners.rb', line 77 def queue_name @queue_name end |
Instance Method Details
#decode_object(message) ⇒ Object
Extracts a workitem from the message’s body.
By default, this listeners assumes the workitem is stored in its “hash form” (not directly as a Ruby InFlowWorkItem instance).
LaunchItem instances (as hash as well) are also accepted.
135 136 137 138 139 140 141 |
# File 'lib/openwfe/extras/listeners/sqslisteners.rb', line 135 def decode_object () o = Base64.decode64 . o = YAML.load o o = OpenWFE::workitem_from_h o o end |
#trigger(params) ⇒ Object
polls the SQS for incoming messages
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/openwfe/extras/listeners/sqslisteners.rb', line 93 def trigger (params) synchronize do ldebug { "trigger()" } qs = Rufus::SQS::QueueService.new qs.create_queue @queue_name # just to be sure it is there while true l = qs.( @queue_name, :timeout => 0, :count => 255) break if l.length < 1 l.each do |msg| o = decode_object msg handle_item o msg.delete ldebug do "trigger() " + "handled successfully msg #{msg.}" end end end end end |