Class: OpenWFE::ParticipantExpression

Inherits:
FlowExpression show all
Includes:
ConditionMixin, FilterMixin, TimeoutMixin
Defined in:
lib/openwfe/expressions/fe_participant.rb

Overview

Participants sit at the edge between the engine and the external world. The participant expression transmit the workitem applied to it to the Participant instance it looks up in the participant map tied to the engine.

direct reference to participant alpha :

   <participant ref="alpha" />

the name of the participant is the value found in 
the field 'target' :

   <participant field-ref="target" />
   <participant ref="${f:target}" />

the name of the participant is the value found in 
the variable 'target' :

   <participant variable-ref="target" />
   <participant ref="${target}" />

direct reference to participant 'alpha'
if a subprocess named 'alpha' has been defined, the 
subprocess will be called instead :

   <alpha />

The Participant expressions includes the FilterMixin and thus understands and applies the “filter” attribute.

Since OpenWFEru 0.9.9, the attributes of the participant expression are set inside a hash field named ‘params’ just available to the participant. Thus in

<participant ref="toto" task="play golf" location="Minami Center" />

participant ‘toto’ will receive a workitem with a field named ‘params’ containing the hash { “ref”=>“toto”, “task”=>“play golf”, “location”=>“Minami Center” }.

When the workitem gets back from the participant, the field ‘params’ is deleted.

The participant expressions include the TimeoutMixin, it means that a timeout can be stated :

<participant ref="toto" timeout="2w1d" />

If after 2 weeks and 1 day (15 days), participant “toto” hasn’t replied, the workitem will get cancelled and the flow will resume (behind the scene, participant “toto”, will receive a CancelItem instance bearing the same FlowExpressionId as the initial workitem and the participant implementation is responsible for the cancel application).

The participant expression accepts an optional ‘if’ (or ‘unless’) attribute. It’s used for conditional execution of the participant :

participant :ref => "toto", :if => "${weather} == raining"
    # the participant toto will receive a workitem only if
    # it's raining

boss :unless => "#{f:matter} == 'very trivial'"
    # the boss will not participate in the proces if the matter
    # is 'very trivial'

Instance Attribute Summary collapse

Attributes included from TimeoutMixin

#timeout_at, #timeout_job_id

Attributes included from FilterMixin

#filter

Attributes inherited from FlowExpression

#apply_time, #attributes, #children, #environment_id, #fei, #parent_id, #raw_representation

Attributes included from Contextual

#application_context

Instance Method Summary collapse

Methods included from ConditionMixin

#determine_condition_attribute, #eval_condition

Methods included from TimeoutMixin

#determine_timeout, #remove_timedout_flag, #reschedule, #schedule_timeout, #set_timedout_flag

Methods included from FilterMixin

#filter_in, #filter_out

Methods inherited from FlowExpression

#clean_children, #delete_variable, #dup_environment, #fetch_environment, #fetch_text_content, #get_binding, #get_environment, #get_parent, #get_root_environment, #has_attribute, #initialize, is_definition, is_definition?, #lookup_attribute, #lookup_attributes, #lookup_boolean_attribute, #lookup_comma_list_attribute, #lookup_downcase_attribute, #lookup_ref, #lookup_string_attribute, #lookup_sym_attribute, #lookup_value, #lookup_variable, #lookup_vf_attribute, names, #new_environment, new_exp, #owns_its_environment?, #paused?, #remove_child, #reply, #set_variable, #store_itself, #synchronize, #to_s, #to_yaml_properties

Methods included from Contextual

#get_work_directory, #init_service, #lookup

Methods included from Logging

#ldebug, #ldebug_callstack, #lerror, #lfatal, #linfo, #llog, #lunknown, #lwarn

Methods included from OwfeServiceLocator

#get_engine, #get_error_journal, #get_expool, #get_expression_map, #get_expression_pool, #get_expression_storage, #get_expression_storages, #get_journal, #get_participant_map, #get_scheduler, #get_wfid_generator

Methods inherited from ObjectWithMeta

#class_def, meta_def, meta_eval, metaclass

Constructor Details

This class inherits a constructor from OpenWFE::FlowExpression

Instance Attribute Details

#applied_workitemObject

Returns the value of attribute applied_workitem.



126
127
128
# File 'lib/openwfe/expressions/fe_participant.rb', line 126

def applied_workitem
  @applied_workitem
end

#participant_nameObject

Returns the value of attribute participant_name.



125
126
127
# File 'lib/openwfe/expressions/fe_participant.rb', line 125

def participant_name
  @participant_name
end

Instance Method Details

#apply(workitem) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/openwfe/expressions/fe_participant.rb', line 129

def apply (workitem)

    conditional = eval_condition :if, workitem, :unless

    return super_reply_to_parent(workitem) \
        if conditional == false
            #
            # skip expression 
            # <participant ref="x" if="y" /> (where y evals to false)

    @participant_name = lookup_ref workitem

    @participant_name = fetch_text_content workitem \
        unless @participant_name

    participant = \
        get_participant_map.lookup_participant(@participant_name)

    raise "No participant named '#{@participant_name}'" \
        unless participant

    remove_timedout_flag workitem

    @applied_workitem = workitem.dup

    schedule_timeout()

    filter_in workitem

    store_itself()
    
    workitem.params = lookup_attributes workitem

    #
    # threading AFTER the store_itself()
    #
    Thread.new do
        begin

            # these two pmap calls were combined, but with the :reply
            # notification in reply_to_parent() it feels more
            # elegant like that

            get_participant_map.dispatch(
                participant, @participant_name, workitem)

            get_participant_map.onotify(
                @participant_name, :apply, workitem)

        rescue Exception => e

            get_expression_pool.notify_error(
                e, fei, :do_apply, workitem)
        end
    end
end

#cancelObject

The cancel() method of a ParticipantExpression is particular : it will emit a CancelItem instance towards the participant itself to notify it of the cancellation.



208
209
210
211
212
213
214
215
# File 'lib/openwfe/expressions/fe_participant.rb', line 208

def cancel

    unschedule_timeout

    cancel_participant

    @applied_workitem
end

#reply_to_parent(workitem) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/openwfe/expressions/fe_participant.rb', line 188

def reply_to_parent (workitem)

    get_participant_map.onotify @participant_name, :reply, workitem
        #
        # for 'listen' expressions waiting for replies

    unschedule_timeout()

    workitem.attributes.delete "params"

    filter_out workitem

    super workitem
end

#super_reply_to_parentObject



186
# File 'lib/openwfe/expressions/fe_participant.rb', line 186

alias :super_reply_to_parent :reply_to_parent

#trigger(scheduler) ⇒ Object

Upon timeout, the ParticipantExpression will cancel itself and the flow will resume.



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/openwfe/expressions/fe_participant.rb', line 221

def trigger (scheduler)

    linfo { "trigger() timeout requested for #{@fei.to_debug_s}" }

    begin

        #@scheduler_job_id = nil
            #
            # so that cancel won't unschedule without need 

        cancel_participant

        set_timedout_flag @applied_workitem

        reply_to_parent @applied_workitem
    rescue
        lerror do
            "trigger() problem while timing out\n"+
            OpenWFE::exception_to_s($!)
        end
    end
end