Class: OpenWFE::ListenExpression
- Inherits:
-
FlowExpression
- Object
- ObjectWithMeta
- FlowExpression
- OpenWFE::ListenExpression
- Includes:
- ConditionMixin, MergeMixin, TimeoutMixin
- Defined in:
- lib/openwfe/expressions/fe_listen.rb
Overview
The “listen” expression can be viewed in two ways :
1) It’s a hook into the participant map to intercept apply or reply operations on participants.
2) It allows OpenWFE to be a bit closer to the ‘ideal’ process-calculus world (en.wikipedia.org/wiki/Process_calculi)
Anyway…
<listen to="alice">
<subprocess ref="notify_bob" />
</listen>
Whenever a workitem is dispatched (applied) to the participant named “alice”, the subprocess named “notify_bob” is triggered (once).
listen :to => "^channel_.*", :upon => "reply" do
sequence do
participant :ref => "delta"
participant :ref => "echo"
end
end
After the listen has been applied, the first workitem coming back from a participant whose named starts with “channel_” will trigger a sequence with the participants ‘delta’ and ‘echo’.
listen :to => "alpha", :where => "${f:color} == red" do
participant :ref => "echo"
end
Will send a copy of the first workitem meant for participant “alpha” to participant “echo” if this workitem’s color field is set to ‘red’.
listen :to => "alpha", :once => "false" do
send_email_to_stakeholders
end
This is some kind of a server : each time a workitem is dispatched to participant “alpha”, the subprocess (or participant) named ‘send_email_to_stakeholders’) will receive a copy of that workitem. Use with care.
listen :to => "alpha", :once => "false", :timeout => "1M2w" do
send_email_to_stakeholders
end
The ‘listen’ expression understands the ‘timeout’ attribute. It can thus be instructed to stop listening after a certain amount of time (here, after one month and two weeks).
Since OpenWFEru 0.9.16, the listen expression is usable without a child expression. It blocks until a valid messages comes in the channel, at which point it resumes the process, with workitem that came as the message (not with the workitem at apply time). (no merge implemented for now).
sequence do
listen :to => "channel_z", :upon => :reply
participant :the_rest_of_the_process
end
In this example, the process will block until a workitem comes for a participant named ‘channel_z’.
Note that since OpenWFEru 0.9.16, the engine accept (in its reply() method) workitems that don’t belong to a process intance (ie workitems that have a nil flow_expression_id). So it’s entirely feasible to send ‘notifications only’ workitems to the OpenWFEru engine. (see openwferu.rubyforge.org/svn/trunk/openwfe-ruby/test/ft_54b_listen.rb)
Since OpenWFEru 0.9.16, this expression has been aliased ‘intercept’ and ‘receive’. It also accepts the ‘on’ parameter as an alias parameter to the ‘to’ parameter. Think “listen to” and “receive on”.
A ‘merge’ attribute was added as well in 0.9.16, if set to true (the default value being false), the incoming workitem will be merged with a copy of the workitem that ‘applied’ (activated) the listen expression.
Instance Attribute Summary collapse
-
#applied_workitem ⇒ Object
‘listen’ accepts a :merge attribute, when set to true, this field will contain a copy of the workitem that activated the listen activity.
-
#call_count ⇒ Object
how many messages were received (can more than 0 or 1 if ‘once’ is set to false).
-
#once ⇒ Object
is set to true if the expression listen to 1! workitem and then replies to its parent.
-
#participant_regex ⇒ Object
the channel on which this expression ‘listens’.
-
#upon ⇒ Object
can take :apply or :reply as a value, if not set (nil), will listen on both ‘directions’.
Attributes included from TimeoutMixin
Attributes inherited from FlowExpression
#apply_time, #attributes, #children, #environment_id, #fei, #parent_id, #raw_representation
Attributes included from Contextual
Instance Method Summary collapse
- #apply(workitem) ⇒ Object
-
#call(channel, *args) ⇒ Object
This is the method called when a ‘listenable’ workitem comes in.
- #cancel ⇒ Object
- #reply_to_parent(workitem) ⇒ Object
-
#reschedule(scheduler) ⇒ Object
Registers for timeout and start observing the participant activity.
-
#trigger(params) ⇒ Object
Only called in case of timeout.
Methods included from MergeMixin
Methods included from ConditionMixin
#determine_condition_attribute, #eval_condition
Methods included from TimeoutMixin
#determine_timeout, #remove_timedout_flag, #schedule_timeout, #set_timedout_flag
Methods inherited from FlowExpression
#clean_children, #delete_variable, #dup_environment, #fetch_environment, #fetch_text_content, #get_binding, #get_environment, #get_parent, #get_root_environment, #has_attribute, #initialize, is_definition, is_definition?, #lookup_attribute, #lookup_attributes, #lookup_boolean_attribute, #lookup_comma_list_attribute, #lookup_downcase_attribute, #lookup_ref, #lookup_string_attribute, #lookup_sym_attribute, #lookup_value, #lookup_variable, #lookup_vf_attribute, names, #new_environment, new_exp, #owns_its_environment?, #paused?, #remove_child, #reply, #set_variable, #store_itself, #synchronize, #to_s, #to_yaml_properties
Methods included from Contextual
#get_work_directory, #init_service, #lookup
Methods included from Logging
#ldebug, #ldebug_callstack, #lerror, #lfatal, #linfo, #llog, #lunknown, #lwarn
Methods included from OwfeServiceLocator
#get_engine, #get_error_journal, #get_expool, #get_expression_map, #get_expression_pool, #get_expression_storage, #get_expression_storages, #get_journal, #get_participant_map, #get_scheduler, #get_wfid_generator
Methods inherited from ObjectWithMeta
#class_def, meta_def, meta_eval, metaclass
Constructor Details
This class inherits a constructor from OpenWFE::FlowExpression
Instance Attribute Details
#applied_workitem ⇒ Object
‘listen’ accepts a :merge attribute, when set to true, this field will contain a copy of the workitem that activated the listen activity. This copy will be merged with incoming (listened for) workitems when triggering the listen child.
165 166 167 |
# File 'lib/openwfe/expressions/fe_listen.rb', line 165 def applied_workitem @applied_workitem end |
#call_count ⇒ Object
how many messages were received (can more than 0 or 1 if ‘once’ is set to false).
171 172 173 |
# File 'lib/openwfe/expressions/fe_listen.rb', line 171 def call_count @call_count end |
#once ⇒ Object
is set to true if the expression listen to 1! workitem and then replies to its parent. When set to true, it listens until the process it belongs to terminates. The default value is true.
151 152 153 |
# File 'lib/openwfe/expressions/fe_listen.rb', line 151 def once @once end |
#participant_regex ⇒ Object
the channel on which this expression ‘listens’
142 143 144 |
# File 'lib/openwfe/expressions/fe_listen.rb', line 142 def participant_regex @participant_regex end |
#upon ⇒ Object
can take :apply or :reply as a value, if not set (nil), will listen on both ‘directions’.
157 158 159 |
# File 'lib/openwfe/expressions/fe_listen.rb', line 157 def upon @upon end |
Instance Method Details
#apply(workitem) ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/openwfe/expressions/fe_listen.rb', line 174 def apply (workitem) #if @children.size < 1 # reply_to_parent workitem # return #end # # 'listen' now blocks if there is no children @participant_regex = lookup_string_attribute(:to, workitem) @participant_regex = lookup_string_attribute(:on, workitem) \ unless @participant_regex raise "attribute 'to' is missing for expression 'listen'" \ unless @participant_regex ldebug { "apply() listening to '#{@participant_regex}'" } # # once @once = lookup_boolean_attribute :once, workitem, true @once = true if @children.size < 1 # a 'blocking listen' can only get triggered once. ldebug { "apply() @once is #{@once}" } # # merge merge = lookup_boolean_attribute :merge, workitem, false ldebug { "apply() merge is #{@merge}" } @applied_workitem = workitem.dup if merge # # upon @upon = lookup_sym_attribute( :upon, workitem, :default => :apply) @upon = (@upon == :reply) ? :reply : :apply ldebug { "apply() @upon is #{@upon}" } @call_count = 0 determine_timeout reschedule(get_scheduler) store_itself end |
#call(channel, *args) ⇒ Object
This is the method called when a ‘listenable’ workitem comes in
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/openwfe/expressions/fe_listen.rb', line 252 def call (channel, *args) synchronize do upon = args[0] ldebug { "call() channel : '#{channel}' upon '#{upon}'" } return if upon != @upon workitem = args[1].dup conditional = eval_condition(:where, workitem) # # note that the values if the incoming workitem (not the # workitem at apply time) are used for the evaluation # of the condition (if necessary). return if conditional == false return if @once and @call_count > 0 # # workitem does match... ldebug do "call() "+ "through for fei #{workitem.fei} / "+ "'#{workitem.participant_name}'" end @call_count += 1 store_itself() #ldebug { "call() @call_count is #{@call_count}" } # # eventual merge workitem = merge_workitems @applied_workitem.dup, workitem \ if @applied_workitem # # reply or launch nested child expression if @children.size > 0 # # listen with child parent = @once ? self : nil get_expression_pool.launch_template( parent, nil, @call_count - 1, @children[0], workitem, nil) else # # 'blocking listen' reply_to_parent workitem end end end |
#cancel ⇒ Object
230 231 232 233 |
# File 'lib/openwfe/expressions/fe_listen.rb', line 230 def cancel stop_observing end |
#reply_to_parent(workitem) ⇒ Object
235 236 237 238 239 |
# File 'lib/openwfe/expressions/fe_listen.rb', line 235 def reply_to_parent (workitem) stop_observing super end |
#reschedule(scheduler) ⇒ Object
Registers for timeout and start observing the participant activity.
322 323 324 325 326 |
# File 'lib/openwfe/expressions/fe_listen.rb', line 322 def reschedule (scheduler) to_reschedule(scheduler) start_observing end |
#trigger(params) ⇒ Object
Only called in case of timeout.
244 245 246 247 |
# File 'lib/openwfe/expressions/fe_listen.rb', line 244 def trigger (params) reply_to_parent workitem end |