Class: OpenWFE::Extras::SqsListener
- Inherits:
-
Service
- Object
- Service
- OpenWFE::Extras::SqsListener
- Includes:
- MonitorMixin, Schedulable, WorkItemListener
- Defined in:
- lib/openwfe/extras/listeners/sqslisteners.rb
Overview
Polls an Amazon SQS queue for workitems
Workitems can be instances of InFlowWorkItem or LaunchItem.
require 'openwfe/extras/listeners/sqslisteners'
ql = OpenWFE::SqsListener("workqueue1", engine.application_context)
engine.add_workitem_listener(ql, "2m30s")
#
# thus, the engine will poll our "workqueue1" SQS queue
# every 2 minutes and 30 seconds
Instance Attribute Summary collapse
-
#queue_name ⇒ Object
readonly
The name of the Amazon SQS whom this listener cares for.
Instance Method Summary collapse
-
#decode_object(message) ⇒ Object
Extracts a workitem from the message’s body.
-
#initialize(queue_name, application_context) ⇒ SqsListener
constructor
A new instance of SqsListener.
-
#trigger(params) ⇒ Object
Will ‘find’ files in the work directory (by default ./work/in/), extract the workitem in them and feed it back to the engine.
Constructor Details
#initialize(queue_name, application_context) ⇒ SqsListener
Returns a new instance of SqsListener.
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/openwfe/extras/listeners/sqslisteners.rb', line 81 def initialize (queue_name, application_context) @queue_name = queue_name.to_s service_name = "#{self.class}::#{@queue_name}" super(service_name, application_context) linfo { "new() queue is '#{@queue_name}'" } end |
Instance Attribute Details
#queue_name ⇒ Object (readonly)
The name of the Amazon SQS whom this listener cares for
79 80 81 |
# File 'lib/openwfe/extras/listeners/sqslisteners.rb', line 79 def queue_name @queue_name end |
Instance Method Details
#decode_object(message) ⇒ Object
Extracts a workitem from the message’s body.
By default, this listeners assumes the workitem is stored in its “hash form” (not directly as a Ruby InFlowWorkItem instance).
LaunchItem instances (as hash as well) are also accepted.
138 139 140 141 142 143 144 |
# File 'lib/openwfe/extras/listeners/sqslisteners.rb', line 138 def decode_object () o = Base64.decode64(.) o = YAML.load(o) o = OpenWFE::workitem_from_h(o) o end |
#trigger(params) ⇒ Object
Will ‘find’ files in the work directory (by default ./work/in/), extract the workitem in them and feed it back to the engine.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/openwfe/extras/listeners/sqslisteners.rb', line 96 def trigger (params) synchronize do ldebug { "trigger()" } qs = SQS::QueueService.new qs.create_queue(@queue_name) # just to be sure it is there while true l = qs.( @queue_name, :timeout => 0, :count => 255) break if l.length < 1 l.each do |msg| o = decode_object(msg) handle_item o msg.delete ldebug do "trigger() " + "handled successfully msg #{msg.}" end end end end end |