Class: OpenWFE::ListenExpression

Inherits:
FlowExpression show all
Includes:
ConditionMixin, MergeMixin, TimeoutMixin
Defined in:
lib/openwfe/expressions/fe_listen.rb

Overview

The “listen” expression can be viewed in two ways :

1) It’s a hook into the participant map to intercept apply or reply operations on participants.

2) It allows OpenWFE to be a bit closer to the ‘ideal’ process-calculus world (en.wikipedia.org/wiki/Process_calculi)

Anyway…

<listen to="alice">
    <subprocess ref="notify_bob" />
</listen>

Whenever a workitem is dispatched (applied) to the participant named “alice”, the subprocess named “notify_bob” is triggered (once).

listen :to => "^channel_.*", :upon => "reply" do
    sequence do
        participant :ref => "delta"
        participant :ref => "echo"
    end
end

After the listen has been applied, the first workitem coming back from a participant whose named starts with “channel_” will trigger a sequence with the participants ‘delta’ and ‘echo’.

listen :to => "alpha", :where => "${f:color} == red" do
    participant :ref => "echo"
end

Will send a copy of the first workitem meant for participant “alpha” to participant “echo” if this workitem’s color field is set to ‘red’.

listen :to => "alpha", :once => "false" do
    send_email_to_stakeholders
end

This is some kind of a server : each time a workitem is dispatched to participant “alpha”, the subprocess (or participant) named ‘send_email_to_stakeholders’) will receive a copy of that workitem. Use with care.

listen :to => "alpha", :once => "false", :timeout => "1M2w" do
    send_email_to_stakeholders
end

The ‘listen’ expression understands the ‘timeout’ attribute. It can thus be instructed to stop listening after a certain amount of time (here, after one month and two weeks).

Since OpenWFEru 0.9.16, the listen expression is usable without a child expression. It blocks until a valid messages comes in the channel, at which point it resumes the process, with workitem that came as the message (not with the workitem at apply time). (no merge implemented for now).

sequence do
    listen :to => "channel_z", :upon => :reply
    participant :the_rest_of_the_process
end

In this example, the process will block until a workitem comes for a participant named ‘channel_z’.

Note that since OpenWFEru 0.9.16, the engine accept (in its reply() method) workitems that don’t belong to a process intance (ie workitems that have a nil flow_expression_id). So it’s entirely feasible to send ‘notifications only’ workitems to the OpenWFEru engine. (see openwferu.rubyforge.org/svn/trunk/openwfe-ruby/test/ft_54b_listen.rb)

Since OpenWFEru 0.9.16, this expression has been aliased ‘intercept’ and ‘receive’. It also accepts the ‘on’ parameter as an alias parameter to the ‘to’ parameter. Think “listen to” and “receive on”.

A ‘merge’ attribute was added as well in 0.9.16, if set to true (the default value being false), the incoming workitem will be merged with a copy of the workitem that ‘applied’ (activated) the listen expression.

Instance Attribute Summary collapse

Attributes included from TimeoutMixin

#timeout_at, #timeout_job_id

Attributes inherited from FlowExpression

#apply_time, #attributes, #children, #environment_id, #fei, #parent_id, #raw_representation

Attributes included from Contextual

#application_context

Instance Method Summary collapse

Methods included from MergeMixin

#merge_workitems

Methods included from ConditionMixin

#determine_condition_attribute, #eval_condition

Methods included from TimeoutMixin

#determine_timeout, #remove_timedout_flag, #schedule_timeout, #set_timedout_flag

Methods inherited from FlowExpression

#clean_children, #delete_variable, #dup_environment, #fetch_environment, #fetch_text_content, #get_binding, #get_environment, #get_parent, #get_root_environment, #has_attribute, #initialize, is_definition, is_definition?, #lookup_attribute, #lookup_attributes, #lookup_boolean_attribute, #lookup_comma_list_attribute, #lookup_downcase_attribute, #lookup_ref, #lookup_string_attribute, #lookup_sym_attribute, #lookup_value, #lookup_variable, #lookup_vf_attribute, names, #new_environment, new_exp, #owns_its_environment?, #paused?, #remove_child, #reply, #set_variable, #store_itself, #synchronize, #to_s, #to_yaml_properties

Methods included from Contextual

#get_work_directory, #init_service, #lookup

Methods included from Logging

#ldebug, #ldebug_callstack, #lerror, #lfatal, #linfo, #llog, #lunknown, #lwarn

Methods included from OwfeServiceLocator

#get_engine, #get_error_journal, #get_expool, #get_expression_map, #get_expression_pool, #get_expression_storage, #get_expression_storages, #get_journal, #get_participant_map, #get_scheduler, #get_wfid_generator

Methods inherited from ObjectWithMeta

#class_def, meta_def, meta_eval, metaclass

Constructor Details

This class inherits a constructor from OpenWFE::FlowExpression

Instance Attribute Details

#applied_workitemObject

‘listen’ accepts a :merge attribute, when set to true, this field will contain a copy of the workitem that activated the listen activity. This copy will be merged with incoming (listened for) workitems when triggering the listen child.



165
166
167
# File 'lib/openwfe/expressions/fe_listen.rb', line 165

def applied_workitem
  @applied_workitem
end

#call_countObject

how many messages were received (can more than 0 or 1 if ‘once’ is set to false).



171
172
173
# File 'lib/openwfe/expressions/fe_listen.rb', line 171

def call_count
  @call_count
end

#onceObject

is set to true if the expression listen to 1! workitem and then replies to its parent. When set to true, it listens until the process it belongs to terminates. The default value is true.



151
152
153
# File 'lib/openwfe/expressions/fe_listen.rb', line 151

def once
  @once
end

#participant_regexObject

the channel on which this expression ‘listens’



142
143
144
# File 'lib/openwfe/expressions/fe_listen.rb', line 142

def participant_regex
  @participant_regex
end

#uponObject

can take :apply or :reply as a value, if not set (nil), will listen on both ‘directions’.



157
158
159
# File 'lib/openwfe/expressions/fe_listen.rb', line 157

def upon
  @upon
end

Instance Method Details

#apply(workitem) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/openwfe/expressions/fe_listen.rb', line 174

def apply (workitem)

    #if @children.size < 1
    #    reply_to_parent workitem
    #    return
    #end
        #
        # 'listen' now blocks if there is no children

    @participant_regex = lookup_string_attribute(:to, workitem)

    @participant_regex = lookup_string_attribute(:on, workitem) \
        unless @participant_regex

    raise "attribute 'to' is missing for expression 'listen'" \
        unless @participant_regex

    ldebug { "apply() listening to '#{@participant_regex}'" }

    #
    # once

    @once = lookup_boolean_attribute :once, workitem, true

    @once = true if @children.size < 1
        # a 'blocking listen' can only get triggered once.

    ldebug { "apply() @once is #{@once}" }

    #
    # merge

    merge = lookup_boolean_attribute :merge, workitem, false

    ldebug { "apply() merge is #{@merge}" }

    @applied_workitem = workitem.dup if merge

    #
    # upon

    @upon = lookup_sym_attribute(
        :upon, workitem, :default => :apply)

    @upon = (@upon == :reply) ? :reply : :apply

    ldebug { "apply() @upon is #{@upon}" }

    @call_count = 0

    determine_timeout
    reschedule(get_scheduler)

    store_itself
end

#call(channel, *args) ⇒ Object

This is the method called when a ‘listenable’ workitem comes in



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/openwfe/expressions/fe_listen.rb', line 252

def call (channel, *args)
    synchronize do

        upon = args[0]

        ldebug { "call() channel : '#{channel}' upon '#{upon}'" }

        return if upon != @upon

        workitem = args[1].dup

        conditional = eval_condition(:where, workitem)
            #
            # note that the values if the incoming workitem (not the
            # workitem at apply time) are used for the evaluation
            # of the condition (if necessary).

        return if conditional == false

        return if @once and @call_count > 0

        #
        # workitem does match...

        ldebug do 
            "call() "+
            "through for fei #{workitem.fei} / "+
            "'#{workitem.participant_name}'"
        end

        @call_count += 1
        store_itself()

        #ldebug { "call() @call_count is #{@call_count}" }

        #
        # eventual merge
        
        workitem = merge_workitems @applied_workitem.dup, workitem \
            if @applied_workitem

        #
        # reply or launch nested child expression

        if @children.size > 0
            #
            # listen with child

            parent = @once ? self : nil

            get_expression_pool.launch_template(
                parent, 
                nil, 
                @call_count - 1, 
                @children[0], 
                workitem, 
                nil)
        else
            #
            # 'blocking listen'
            
            reply_to_parent workitem
        end
    end
end

#cancelObject



230
231
232
233
# File 'lib/openwfe/expressions/fe_listen.rb', line 230

def cancel

    stop_observing
end

#reply_to_parent(workitem) ⇒ Object



235
236
237
238
239
# File 'lib/openwfe/expressions/fe_listen.rb', line 235

def reply_to_parent (workitem)

    stop_observing
    super
end

#reschedule(scheduler) ⇒ Object

Registers for timeout and start observing the participant activity.



322
323
324
325
326
# File 'lib/openwfe/expressions/fe_listen.rb', line 322

def reschedule (scheduler)

    to_reschedule(scheduler)
    start_observing
end

#trigger(params) ⇒ Object

Only called in case of timeout.



244
245
246
247
# File 'lib/openwfe/expressions/fe_listen.rb', line 244

def trigger (params)

    reply_to_parent workitem
end