Class: OpenWFE::ConcurrenceExpression

Inherits:
SequenceExpression show all
Includes:
ConditionMixin
Defined in:
lib/openwfe/expressions/fe_concurrence.rb

Overview

The concurrence expression will execute each of its (direct) children in parallel threads.

Thus,

<concurrence>
    <participant ref="pa" />
    <participant ref="pb" />
</concurrence>

Participants pa and pb will be ‘treated’ in parallel (quasi simultaneously).

The concurrence expressions accept many attributes, that can get combined. By default, the concurrence waits for all its children to reply and returns the workitem of the first child that replied. The attributes tune this behaviour.

count

<concurrence count="1">
    <participant ref="pa" />
    <participant ref="pb" />
</concurrence>

The concurrence will be over as soon as ‘pa’ or ‘pb’ replied, i.e. as soon as “1” child replied.

remaining

The attribute ‘remaining’ can take two values ‘cancel’ (the default) and ‘forget’. Cancelled children are completely wiped away, forgotten ones continue to operate but their reply will simply get discarded.

over-if

‘over-if’ accepts a ‘boolean expression’ (something replying ‘true’ or ‘false’), if the expression evaluates to true, the concurrence will be over and the remaining children will get cancelled (the default) or forgotten.

merge

By default, the first child to reply to its parent ‘concurrence’ expression ‘wins’, i.e. its workitem is used for resuming the flow (after the concurrence).

first

The default : the first child to reply wins

last

The last child to reply wins

highest

The first ‘defined’ child (in the list of children) will win

lowest

The last ‘defined’ child (in the list of children) will win

Thus, in that example

<concurrence merge="lowest">
    <participant ref="pa" />
    <participant ref="pb" />
</concurrence>

when the concurrence is done, the workitem of ‘pb’ is used to resume the flow after the concurrence.

merge-type

override

The default : no mix of values between the workitems do occur

mix

Priority is given to the ‘winning’ workitem but their values get mixed

isolate

the attributes of the workitem of each branch is placed in a field in the resulting workitem. For example, the attributes of the first branch will be stored under the field named ‘0’ of the resulting workitem.

The merge occurs are the top level of workitem attributes.

More complex merge behaviour can be obtained by extending the GenericSyncExpression class. But the default sync options are already numerous and powerful by their combinations.

Direct Known Subclasses

ConcurrentIteratorExpression

Instance Attribute Summary collapse

Attributes inherited from FlowExpression

#apply_time, #attributes, #children, #environment_id, #fei, #parent_id, #raw_representation

Attributes included from Contextual

#application_context

Instance Method Summary collapse

Methods included from ConditionMixin

#determine_condition_attribute, #eval_condition

Methods inherited from FlowExpression

#cancel, #clean_children, #delete_variable, #dup_environment, #fetch_environment, #fetch_text_content, #get_binding, #get_environment, #get_parent, #get_root_environment, #has_attribute, #initialize, is_definition, is_definition?, #lookup_attribute, #lookup_attributes, #lookup_boolean_attribute, #lookup_comma_list_attribute, #lookup_downcase_attribute, #lookup_ref, #lookup_string_attribute, #lookup_sym_attribute, #lookup_value, #lookup_variable, #lookup_vf_attribute, names, #new_environment, new_exp, #owns_its_environment?, #paused?, #remove_child, #reply_to_parent, #set_variable, #store_itself, #synchronize, #to_s, #to_yaml_properties

Methods included from Contextual

#get_work_directory, #init_service, #lookup

Methods included from Logging

#ldebug, #ldebug_callstack, #lerror, #lfatal, #linfo, #llog, #lunknown, #lwarn

Methods included from OwfeServiceLocator

#get_engine, #get_error_journal, #get_expool, #get_expression_map, #get_expression_pool, #get_expression_storage, #get_expression_storages, #get_journal, #get_participant_map, #get_scheduler, #get_wfid_generator

Methods inherited from ObjectWithMeta

#class_def, meta_def, meta_eval, metaclass

Constructor Details

This class inherits a constructor from OpenWFE::FlowExpression

Instance Attribute Details

#sync_expressionObject

Returns the value of attribute sync_expression.



139
140
141
# File 'lib/openwfe/expressions/fe_concurrence.rb', line 139

def sync_expression
  @sync_expression
end

Instance Method Details

#apply(workitem) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/openwfe/expressions/fe_concurrence.rb', line 143

def apply (workitem)

    sync = lookup_sym_attribute(
        :sync, workitem, :default => :generic)

    @sync_expression = 
        get_expression_map.get_sync_class(sync).new(self, workitem)

    @children.each do |child|
        @sync_expression.add_child child 
    end

    store_itself()

    concurrence = self

    @children.each_with_index do |child, index|
        Thread.new do
            begin
                #ldebug { "apply() child : #{child.to_debug_s}" }
                concurrence.synchronize do

                    get_expression_pool().apply(
                        child, 
                        #workitem.dup)
                        get_workitem(workitem, index))
                end
            rescue Exception => e
                lwarn do 
                    "apply() " +
                    "caught exception in concurrent child  " + 
                    child.to_debug_s + "\n" + 
                    OpenWFE::exception_to_s(e)
                end
            end
        end
    end

    #@sync_expression.ready(self)
        #
        # this is insufficient, have to do that :

    synchronize do
        #
        # Making sure the freshest version of the concurrence
        # expression is used.
        # This is especially important when using pure persistence.
        #
        reloaded_self, _fei = get_expression_pool.fetch(@fei)
        reloaded_self.sync_expression.ready(reloaded_self)
    end
end

#reply(workitem) ⇒ Object



196
197
198
# File 'lib/openwfe/expressions/fe_concurrence.rb', line 196

def reply (workitem)
    @sync_expression.reply(self, workitem)
end