Class: DaemonKit::RuoteWorkitem
- Inherits:
-
Object
- Object
- DaemonKit::RuoteWorkitem
show all
- Defined in:
- lib/daemon_kit/ruote_workitem.rb
Overview
Dual purpose class that is a) responsible for parsing incoming workitems and delegating to the correct RuotePseudoParticipant, and b) wrapping the workitem hash into something a bit more digestable.
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(workitem = {}) ⇒ RuoteWorkitem
Returns a new instance of RuoteWorkitem.
101
102
103
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 101
def initialize( workitem = {} )
@workitem = workitem
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method_name, *args) ⇒ Object
160
161
162
163
164
165
166
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 160
def method_missing( method_name, *args )
if self.attributes.keys.include?( method_name.to_s )
return self.attributes[ method_name.to_s ]
end
super
end
|
Class Method Details
.parse(workitem) ⇒ Object
96
97
98
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 96
def parse( workitem )
new( JSON.parse( workitem ) )
end
|
.parse_command(work) ⇒ Object
Extract the class and method name from the workitem, then pick the matching class from the registered list of participants
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 68
def parse_command( work )
return nil if work['params']['command'].nil?
_, klass, method = work['params']['command'].split('/')
instance = RuoteParticipants.instance.participants[ klass ]
if instance.nil?
msg = "No instance registered for #{klass}"
DaemonKit.logger.error( msg )
raise DaemonKit::MissingParticipant, msg
end
return instance, method
end
|
.process(transport, workitem) ⇒ Object
Process incoming commands via an AMQP queue
Expects a JSON workitem from ruote that has these fields set in attributes key:
{
'reply_queue' => 'queue to send replies to',
'params' => {
'command' => '/actor/method'
}
}
Notes on the command key:
It looks like a resource, and will be treated as such. Is should be in the format of /class/method
, and it will be passed the complete workitem as a hash.
Notes on replies
Replies are sent back to the queue specified in the reply_queue
key.
Notes on errors
Where daemon-kit detects errors in attempting to parse and delegate the workitems, it will reply to the engine and set the following field with the error information:
daemon_kit.error
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 39
def process( transport, workitem )
@instance ||= new
work = parse( workitem )
DaemonKit.logger.warn "Processing workitem that has timed out!" if work.timed_out?
target, method = parse_command( work )
if target.nil? || method.nil?
msg = "Missing target/method in command parameter, or command parameter missing"
DaemonKit.logger.error( msg )
work["daemon_kit"] = { "error" => msg }
elsif target.public_methods.include?( method )
target.perform( method, work )
else
msg = "Workitem cannot be processes: #{method} not exposed by #{target.inspect}"
DaemonKit.logger.error( msg )
work["daemon_kit"] = { "error" => msg }
end
reply_to_engine( transport, work )
end
|
.reply_to_engine(transport, response) ⇒ Object
84
85
86
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 84
def reply_to_engine( transport, response )
send( "reply_via_#{transport}", response )
end
|
.reply_via_amqp(response) ⇒ Object
88
89
90
91
92
93
94
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 88
def reply_via_amqp( response )
DaemonKit.logger.debug("Replying to engine via AMQP with #{response.inspect}")
::MQ.queue( response['reply_queue'] ).publish( response.to_json )
response
end
|
Instance Method Details
135
136
137
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 135
def []( key )
self.attributes[ key ]
end
|
#[]=(key, value) ⇒ Object
139
140
141
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 139
def []=( key, value )
self.attributes[ key ] = value
end
|
#attributes ⇒ Object
131
132
133
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 131
def attributes
@workitem['attributes']
end
|
#dispatch_time ⇒ Object
119
120
121
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 119
def dispatch_time
@dispath_time ||= Time.parse( @workitem['dispatch_time'] )
end
|
105
106
107
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 105
def fei
@workitem['flow_expression_id']
end
|
#last_modified ⇒ Object
123
124
125
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 123
def last_modified
@last_modified ||= Time.parse( @workitem['last_modified'] )
end
|
#participant_name ⇒ Object
127
128
129
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 127
def participant_name
@workitem['participant_name']
end
|
#short_fei ⇒ Object
109
110
111
112
113
114
115
116
117
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 109
def short_fei
@short_fei ||=
'(' + [
'fei', self.fei['owfe_version'], self.fei['engine_id'],
self.fei['workflow_definition_url'], self.fei['workflow_definition_name'],
self.fei['workflow_definition_revision'], self.fei['workflow_instance_id'],
self.fei['expression_name'], self.fei['expression_id']
].join(' ') + ')'
end
|
#timed_out? ⇒ Boolean
Look at the workitem payload and attempt to determine if this workitem has timed out or not. This method will only ever work if you used the +:timeout: parameter was set for the expression.
150
151
152
153
154
155
156
157
158
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 150
def timed_out?
key = fei['workflow_instance_id'] + '__' + fei['expression_id']
if self.attributes["__timeouts__"] && timeout = self.attributes["__timeouts__"][ key ]
return Time.at( timeout.last ) < Time.now
end
return false
end
|
143
144
145
|
# File 'lib/daemon_kit/ruote_workitem.rb', line 143
def to_json
@workitem.to_json
end
|