Class: OpenWFE::GenericSyncExpression
- Inherits:
-
SyncExpression
- Object
- ObjectWithMeta
- SyncExpression
- OpenWFE::GenericSyncExpression
- Defined in:
- lib/openwfe/expressions/fe_concurrence.rb
Overview
The classical OpenWFE sync expression. Used by ‘concurrence’ and ‘concurrent-iterator’
Defined Under Namespace
Classes: MergeArray
Instance Attribute Summary collapse
-
#cancel_remaining ⇒ Object
Returns the value of attribute cancel_remaining.
-
#count ⇒ Object
Returns the value of attribute count.
-
#remaining_children ⇒ Object
Returns the value of attribute remaining_children.
-
#reply_count ⇒ Object
Returns the value of attribute reply_count.
-
#unready_queue ⇒ Object
Returns the value of attribute unready_queue.
Instance Method Summary collapse
- #add_child(child) ⇒ Object
-
#initialize(synchable, workitem) ⇒ GenericSyncExpression
constructor
A new instance of GenericSyncExpression.
-
#ready(synchable) ⇒ Object
when all the children got applied concurrently, the concurrence calls this method to notify the sync expression that replies can be processed.
- #reply(synchable, workitem) ⇒ Object
Methods inherited from SyncExpression
Methods inherited from ObjectWithMeta
#class_def, meta_def, meta_eval, metaclass
Constructor Details
#initialize(synchable, workitem) ⇒ GenericSyncExpression
Returns a new instance of GenericSyncExpression.
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/openwfe/expressions/fe_concurrence.rb', line 319 def initialize (synchable, workitem) super() @remaining_children = [] @reply_count = 0 @count = determine_count(synchable, workitem) @cancel_remaining = cancel_remaining?(synchable, workitem) merge = synchable.lookup_sym_attribute( :merge, workitem, :default => :first) merge_type = synchable.lookup_sym_attribute( :merge_type, workitem, :default => :mix) synchable.ldebug { "new() merge_type is '#{merge_type}'" } @merge_array = MergeArray.new synchable.fei, merge, merge_type @unready_queue = [] end |
Instance Attribute Details
#cancel_remaining ⇒ Object
Returns the value of attribute cancel_remaining.
312 313 314 |
# File 'lib/openwfe/expressions/fe_concurrence.rb', line 312 def cancel_remaining @cancel_remaining end |
#count ⇒ Object
Returns the value of attribute count.
312 313 314 |
# File 'lib/openwfe/expressions/fe_concurrence.rb', line 312 def count @count end |
#remaining_children ⇒ Object
Returns the value of attribute remaining_children.
312 313 314 |
# File 'lib/openwfe/expressions/fe_concurrence.rb', line 312 def remaining_children @remaining_children end |
#reply_count ⇒ Object
Returns the value of attribute reply_count.
312 313 314 |
# File 'lib/openwfe/expressions/fe_concurrence.rb', line 312 def reply_count @reply_count end |
#unready_queue ⇒ Object
Returns the value of attribute unready_queue.
312 313 314 |
# File 'lib/openwfe/expressions/fe_concurrence.rb', line 312 def unready_queue @unready_queue end |
Instance Method Details
#add_child(child) ⇒ Object
369 370 371 |
# File 'lib/openwfe/expressions/fe_concurrence.rb', line 369 def add_child (child) @remaining_children << child end |
#ready(synchable) ⇒ Object
when all the children got applied concurrently, the concurrence calls this method to notify the sync expression that replies can be processed
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/openwfe/expressions/fe_concurrence.rb', line 347 def ready (synchable) synchable.synchronize do synchable.ldebug do "ready() called by #{synchable.fei.to_debug_s} " + "#{@unready_queue.length} wi waiting" end queue = @unready_queue @unready_queue = nil synchable.store_itself() queue.each do |workitem| break if do_reply(synchable, workitem) # # do_reply() will return 'true' as soon as the # concurrence is over, if this is the case, the # queue should not be treated anymore end end end |
#reply(synchable, workitem) ⇒ Object
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 |
# File 'lib/openwfe/expressions/fe_concurrence.rb', line 373 def reply (synchable, workitem) synchable.synchronize do if @unready_queue @unready_queue << workitem synchable.store_itself() synchable.ldebug do "#{self.class}.reply() "+ "#{@unready_queue.length} wi waiting..." end else do_reply(synchable, workitem) end end end |