Class: OpenWFE::Engine
- Includes:
- FeiMixin, OwfeServiceLocator, StatusMixin
- Defined in:
- lib/openwfe/engine/engine.rb
Overview
The simplest implementation of the OpenWFE workflow engine. No persistence is used, everything is stored in memory.
Direct Known Subclasses
Instance Attribute Summary
Attributes included from ServiceMixin
Attributes included from Contextual
Instance Method Summary collapse
-
#add_workitem_listener(listener, freq = nil) ⇒ Object
Adds a workitem listener to this engine.
-
#cancel_expression(exp_or_fei) ⇒ Object
Cancels the given expression (and its children if any) (warning : advanced method).
-
#cancel_process(exp_or_wfid) ⇒ Object
(also: #cancel_flow, #abort_process)
Given any expression of a process, cancels the complete process instance.
-
#enable_irb_console ⇒ Object
Enabling the console means that hitting CTRL-C on the window / term / dos box / whatever does run the OpenWFEru engine will open an IRB interactive console for directly manipulating the engine instance.
-
#forget_expression(exp_or_fei) ⇒ Object
Forgets the given expression (make it an orphan) (warning : advanced method).
-
#get_participant(participant_name) ⇒ Object
Given a participant name, returns the participant in charge of handling workitems for that name.
-
#get_variables(fei_or_wfid = nil) ⇒ Object
Returns the variables set for a process or an expression.
-
#initialize(application_context = {}) ⇒ Engine
constructor
Builds an OpenWFEru engine.
-
#join ⇒ Object
Makes the current thread join the engine’s scheduler thread.
-
#join_until_idle ⇒ Object
Calling this method makes the control flow block until the workflow engine is inactive.
-
#launch(launch_object, options = {}) ⇒ Object
Launches a [business] process.
-
#list_processes(options = {}) ⇒ Object
(also: #list_workflows)
Lists all workflow (process) instances currently in the expool (in the engine).
-
#lookup_processes(var_name, value = nil) ⇒ Object
Returns an array of wfid (workflow instance ids) whose root environment containes the given variable .
-
#lookup_variable(var_name, fei_or_wfid = nil) ⇒ Object
Looks up a process variable in a process.
-
#pause_process(wfid) ⇒ Object
Pauses a process (sets its /__paused__ variable to true).
-
#pre_launch_check(launchitem) ⇒ Object
When ‘parameters’ are used at the top of a process definition, this method can be used to assert a launchitem before launch.
-
#process_stack(workflow_instance_id, unapplied = false) ⇒ Object
(also: #get_process_stack, #get_flow_stack)
Returns the list of applied expressions belonging to a given workflow instance.
-
#register_participant(regex, participant = nil, &block) ⇒ Object
Registers a participant in this [embedded] engine.
-
#replay_at_error(error) ⇒ Object
Takes care of removing an error from the error journal and they replays its process at that point.
-
#reply(workitem) ⇒ Object
(also: #forward, #proceed)
This method is used to feed a workitem back to the engine (after it got sent to a worklist or wherever by a participant).
-
#reschedule ⇒ Object
(also: #reload)
Call this method once the participants for a persisted engine have been [re]added.
-
#resume_process(wfid) ⇒ Object
Restarts a process : removes its ‘paused’ flag (variable) and makes sure to ‘replay’ events (replies) that came for it while it was in pause.
-
#stop ⇒ Object
Stopping the engine will stop all the services in the application context.
-
#unregister_participant(participant_name) ⇒ Object
Removes the first participant matching the given name from the participant map kept by the engine.
-
#update_expression(fexp) ⇒ Object
Replaces an expression in the pool with a newer version of it.
-
#update_expression_data(fei, data) ⇒ Object
Use only when doing “process gardening”.
-
#update_raw_expression(fei, representation) ⇒ Object
A variant of update_expression() that directly replaces the raw representation stored within a RawExpression.
-
#wait_for(fei_or_wfid) ⇒ Object
Waits for a given process instance to terminate.
Methods included from StatusMixin
#is_paused?, #process_status, #process_statuses
Methods included from OwfeServiceLocator
#get_engine, #get_error_journal, #get_expool, #get_expression_map, #get_expression_pool, #get_expression_storage, #get_expression_storages, #get_journal, #get_participant_map, #get_scheduler, #get_wfid_generator
Methods included from ServiceMixin
Methods included from Contextual
#get_work_directory, #init_service, #lookup
Methods included from Logging
#ldebug, #ldebug_callstack, #lerror, #lfatal, #linfo, #llog, #lunknown, #lwarn
Constructor Details
#initialize(application_context = {}) ⇒ Engine
Builds an OpenWFEru engine.
Accepts an optional initial application_context (containing initialization params for services for example).
The engine itself uses one param :logger, used to define where all the log output for OpenWFEru should go. By default, this output goes to logs/openwferu.log
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/openwfe/engine/engine.rb', line 82 def initialize (application_context={}) super S_ENGINE, application_context $OWFE_LOG = application_context[:logger] unless $OWFE_LOG #puts "Creating logs in " + FileUtils.pwd FileUtils.mkdir("logs") unless File.exist?("logs") $OWFE_LOG = Logger.new "logs/openwferu.log", 10, 1024000 $OWFE_LOG.level = Logger::INFO end # build order matters. # # especially for the expstorage which 'observes' the expression # pool and thus needs to be instantiated after it. build_scheduler # # for delayed or repetitive executions (it's the engine's clock) # see http://openwferu.rubyforge.org/scheduler.html build_expression_map # # mapping expression names ('sequence', 'if', 'concurrence', # 'when'...) to their implementations (SequenceExpression, # IfExpression, ConcurrenceExpression, ...) build_wfid_generator # # the workflow instance (process instance) id generator # making sure each process instance has a unique identifier build_expression_pool # # the core (hairy ball) of the engine build_expression_storage # # the engine persistence (persisting the expression instances # that make up process instances) build_participant_map # # building the services that maps participant names to # participant implementations / instances. build_error_journal # # builds the error journal (keeping track of failures # in business process executions, and an opportunity to # fix and replay) linfo { "new() --- engine started --- #{self.object_id}" } end |
Instance Method Details
#add_workitem_listener(listener, freq = nil) ⇒ Object
Adds a workitem listener to this engine.
The ‘freq’ parameters if present might indicate how frequently the resource should be polled for incoming workitems.
engine.add_workitem_listener(listener, "3m10s")
# every 3 minutes and 10 seconds
engine.add_workitem_listener(listener, "0 22 * * 1-5")
# every weekday at 10pm
TODO : block handling…
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 |
# File 'lib/openwfe/engine/engine.rb', line 349 def add_workitem_listener (listener, freq=nil) name = nil if listener.kind_of? Class listener = init_service nil, listener name = listener.service_name else name = listener.name if listener.respond_to? :name name = "#{listener.class}::#{listener.object_id}" unless name @application_context[name] = listener end result = nil if freq freq = freq.to_s.strip result = if Rufus::Scheduler.is_cron_string(freq) get_scheduler.schedule(freq, listener) else get_scheduler.schedule_every(freq, listener) end end linfo { "add_workitem_listener() added '#{name}'" } result end |
#cancel_expression(exp_or_fei) ⇒ Object
Cancels the given expression (and its children if any) (warning : advanced method)
Cancelling the root expression of a process is equivalent to cancelling the process.
595 596 597 598 |
# File 'lib/openwfe/engine/engine.rb', line 595 def cancel_expression (exp_or_fei) get_expression_pool.cancel_expression exp_or_fei end |
#cancel_process(exp_or_wfid) ⇒ Object Also known as: cancel_flow, abort_process
Given any expression of a process, cancels the complete process instance.
581 582 583 584 |
# File 'lib/openwfe/engine/engine.rb', line 581 def cancel_process (exp_or_wfid) get_expression_pool.cancel_process exp_or_wfid end |
#enable_irb_console ⇒ Object
Enabling the console means that hitting CTRL-C on the window / term / dos box / whatever does run the OpenWFEru engine will open an IRB interactive console for directly manipulating the engine instance.
Hit CTRL-D to get out of the console.
426 427 428 429 |
# File 'lib/openwfe/engine/engine.rb', line 426 def enable_irb_console OpenWFE::trap_int_irb(binding) end |
#forget_expression(exp_or_fei) ⇒ Object
Forgets the given expression (make it an orphan) (warning : advanced method)
604 605 606 607 |
# File 'lib/openwfe/engine/engine.rb', line 604 def forget_expression (exp_or_fei) get_expression_pool.forget exp_or_fei end |
#get_participant(participant_name) ⇒ Object
Given a participant name, returns the participant in charge of handling workitems for that name. May be useful in some embedded contexts.
321 322 323 324 |
# File 'lib/openwfe/engine/engine.rb', line 321 def get_participant (participant_name) get_participant_map.lookup_participant participant_name end |
#get_variables(fei_or_wfid = nil) ⇒ Object
Returns the variables set for a process or an expression.
If a process (wfid) is given, variables of the process environment will be returned, else variables in the environment valid for the expression (fei) will be returned.
If nothing (or nil) is given, the variables set in the engine environment will be returned.
696 697 698 699 700 701 702 |
# File 'lib/openwfe/engine/engine.rb', line 696 def get_variables (fei_or_wfid=nil) return get_expression_pool.fetch_engine_environment.variables \ unless fei_or_wfid fetch_exp(fei_or_wfid).get_environment.variables end |
#join ⇒ Object
Makes the current thread join the engine’s scheduler thread
You can thus make an engine standalone with something like :
require 'openwfe/engine/engine'
the_engine = OpenWFE::Engine.new
the_engine.join
And you’ll have to hit CTRL-C to make it stop.
398 399 400 401 |
# File 'lib/openwfe/engine/engine.rb', line 398 def join get_scheduler.join end |
#join_until_idle ⇒ Object
Calling this method makes the control flow block until the workflow engine is inactive.
TODO : implement idle_for
409 410 411 412 413 414 415 416 |
# File 'lib/openwfe/engine/engine.rb', line 409 def join_until_idle storage = get_expression_storage while storage.size > 1 sleep 1 end end |
#launch(launch_object, options = {}) ⇒ Object
Launches a [business] process. The ‘launch_object’ param may contain either a LaunchItem instance, either a String containing the URL of the process definition to launch (with an empty LaunchItem created on the fly).
The launch object can also be a String containing the XML process definition or directly a class extending OpenWFE::ProcessDefinition (Ruby process definition).
Returns the FlowExpressionId instance of the expression at the root of the newly launched process.
Options for scheduled launches like :at, :in and :cron are accepted via the ‘options’ optional parameter. For example :
engine.launch(launch_item)
# will launch immediately
engine.launch(launch_item, :in => "1d20m")
# will launch in one day and twenty minutes
engine.launch(launch_item, :at => "Tue Sep 11 20:23:02 +0900 2007")
# will launch at that point in time
engine.launch(launch_item, :cron => "0 5 * * *")
# will launch that same process every day,
# five minutes after midnight (see "man 5 crontab")
198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/openwfe/engine/engine.rb', line 198 def launch (launch_object, ={}) launchitem = extract_launchitem launch_object fei = get_expression_pool.launch launchitem, fei.dup # # so that users of this launch() method can play with their # fei without breaking things end |
#list_processes(options = {}) ⇒ Object Also known as: list_workflows
Lists all workflow (process) instances currently in the expool (in the engine). This method will return a list of “process-definition” expressions (i.e. OpenWFE::DefineExpression objects – each representing the root element of a flow).
- :wfid
-
will list only one process,
:wfid => '20071208-gipijiwozo' - :parent_wfid
-
will list only one process, and its subprocesses,
:parent_wfid => '20071208-gipijiwozo' - :consider_subprocesses
-
if true, “process-definition” expressions of subprocesses will be returned as well.
- :wfid_prefix
-
allows your to query for specific workflow instance id prefixes. for example :
:wfid_prefix => "200712"for the processes started in December. - :wfname
-
will return only the process instances who belongs to the given workflow [name].
- :wfrevision
-
usued in conjuction with :wfname, returns only the process instances of a given workflow revision.
571 572 573 574 |
# File 'lib/openwfe/engine/engine.rb', line 571 def list_processes (={}) get_expression_pool.list_processes end |
#lookup_processes(var_name, value = nil) ⇒ Object
Returns an array of wfid (workflow instance ids) whose root environment containes the given variable
If there are no matches, an empty array will be returned.
Regular expressions are accepted as values.
If no value is given, all processes with the given variable name set will be returned.
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 |
# File 'lib/openwfe/engine/engine.rb', line 715 def lookup_processes (var_name, value=nil) # TODO : maybe this would be better in the ExpressionPool regexp = if value if value.is_a?(Regexp) value else Regexp.compile(value.to_s) end else nil end envs = get_expression_storage.find_expressions( :include_classes => Environment) envs = envs.find_all do |env| val = env.variables[var_name] (val and ((not regexp) or (regexp.match(val)))) end envs.collect do |env| env.fei.wfid end #envs.inject([]) do |r, env| # val = env.variables[var_name] # r << env.fei.wfid # if (val and ((not regexp) or (regexp.match(val)))) # r #end # # seems slower... end |
#lookup_variable(var_name, fei_or_wfid = nil) ⇒ Object
Looks up a process variable in a process. If fei_or_wfid is not given, will simply look in the ‘engine environment’ (where the top level variables ‘//’ do reside).
678 679 680 681 682 683 684 |
# File 'lib/openwfe/engine/engine.rb', line 678 def lookup_variable (var_name, fei_or_wfid=nil) return get_expression_pool.fetch_engine_environment[var_name] \ unless fei_or_wfid fetch_exp(fei_or_wfid).lookup_variable var_name end |
#pause_process(wfid) ⇒ Object
Pauses a process (sets its /__paused__ variable to true).
612 613 614 615 616 617 618 619 620 |
# File 'lib/openwfe/engine/engine.rb', line 612 def pause_process (wfid) wfid = extract_wfid wfid root_expression = get_expression_pool.fetch_root wfid get_expression_pool.paused_instances[wfid] = true root_expression.set_variable VAR_PAUSED, true end |
#pre_launch_check(launchitem) ⇒ Object
When ‘parameters’ are used at the top of a process definition, this method can be used to assert a launchitem before launch. An expression will be raised if the parameters do not match the requirements.
Note that the launch method will raise those exceptions as well. This method can be useful in some scenarii though.
163 164 165 166 |
# File 'lib/openwfe/engine/engine.rb', line 163 def pre_launch_check (launchitem) get_expression_pool.prepare_raw_expression(launchitem) end |
#process_stack(workflow_instance_id, unapplied = false) ⇒ Object Also known as: get_process_stack, get_flow_stack
Returns the list of applied expressions belonging to a given workflow instance. May be used to determine where a process instance currently is.
This method returns all the expressions (the stack) a process went through to reach its current state.
If the unapplied optional parameter is set to true, all the expressions (even those not yet applied) that compose the process instance will be returned.
536 537 538 539 |
# File 'lib/openwfe/engine/engine.rb', line 536 def process_stack (workflow_instance_id, unapplied=false) get_expression_pool.process_stack workflow_instance_id, unapplied end |
#register_participant(regex, participant = nil, &block) ⇒ Object
Registers a participant in this [embedded] engine. This method is a shortcut to the ParticipantMap method with the same name.
engine.register_participant "user-.*", HashParticipant
or
engine.register_participant "user-.*" do |wi|
puts "participant '#{wi.participant_name}' received a workitem"
#
# and did nothing with it
# as a block participant implicitely returns the workitem
# to the engine
end
Returns the participant instance.
The participant parameter can be set to hash like in
engine.register_participant(
"alpha",
{ :participant => HashParticipant, :position => :first })
or
engine.register_participant("alpha", :position => :first) do
puts "first !"
end
There are some times where you have to position a participant first (especially with the regex technique).
see ParticipantMap#register_participant
302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/openwfe/engine/engine.rb', line 302 def register_participant (regex, participant=nil, &block) #get_participant_map.register_participant( # regex, participant, &block) params = if participant.class == Hash participant else { :participant => participant } end get_participant_map.register_participant regex, params, &block end |
#replay_at_error(error) ⇒ Object
Takes care of removing an error from the error journal and they replays its process at that point.
661 662 663 664 665 666 667 668 669 670 671 |
# File 'lib/openwfe/engine/engine.rb', line 661 def replay_at_error (error) get_error_journal.remove_errors( error.fei.parent_wfid, error) get_expression_pool.queue_work( error., error.fei, error.workitem) end |
#reply(workitem) ⇒ Object Also known as: forward, proceed
This method is used to feed a workitem back to the engine (after it got sent to a worklist or wherever by a participant). Participant implementations themselves do call this method usually.
This method also accepts LaunchItem instances.
Since OpenWFEru 0.9.16, this reply method accepts InFlowWorkitem that don’t belong to a process instance (ie whose flow_expression_id is nil). It will simply notify the participant_map of the reply for the given participant_name. If there is no participant_name specified for this orphan workitem, an exception will be raised.
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/openwfe/engine/engine.rb', line 223 def reply (workitem) if workitem.is_a?(InFlowWorkItem) if workitem.flow_expression_id # # vanilla case, workitem coming back # (from listener probably) return get_expression_pool.reply( workitem.flow_expression_id, workitem) end if workitem.participant_name # # a workitem that doesn't belong to a process instance # but bears a participant name. # Notify, there may be something listening on # this channel (see the 'listen' expression). return get_participant_map.onotify( workitem.participant_name, :reply, workitem) end raise \ "InFlowWorkitem doesn't belong to a process instance" + " nor to a participant" end return get_expression_pool.launch(workitem) \ if workitem.is_a?(LaunchItem) # # launchitem coming from listener # let's attempt to launch a new process instance raise \ "engine.reply() " + "cannot handle instances of #{workitem.class}" end |
#reschedule ⇒ Object Also known as: reload
Call this method once the participants for a persisted engine have been [re]added.
If this method is called too soon, missing participants will cause trouble… Call this method after all the participants have been added.
147 148 149 150 |
# File 'lib/openwfe/engine/engine.rb', line 147 def reschedule get_expression_pool.reschedule() end |
#resume_process(wfid) ⇒ Object
Restarts a process : removes its ‘paused’ flag (variable) and makes sure to ‘replay’ events (replies) that came for it while it was in pause.
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 |
# File 'lib/openwfe/engine/engine.rb', line 627 def resume_process (wfid) wfid = extract_wfid wfid root_expression = get_expression_pool.fetch_root wfid # # remove 'paused' flag get_expression_pool.paused_instances.delete wfid root_expression.unset_variable VAR_PAUSED # # replay # # select PausedError instances in separate list errors = get_error_journal.get_error_log wfid error_class = PausedError.name paused_errors = errors.select { |e| e.error_class == error_class } return if paused_errors.size < 1 # replay select PausedError instances paused_errors.each do |e| replay_at_error e end end |
#stop ⇒ Object
Stopping the engine will stop all the services in the application context.
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 |
# File 'lib/openwfe/engine/engine.rb', line 447 def stop linfo { "stop() stopping engine '#{@service_name}'" } @application_context.each do |sname, service| next if sname == self.service_name #if service.kind_of?(ServiceMixin) if service.respond_to?(:stop) service.stop linfo do "stop() stopped service '#{sname}' (#{service.class})" end end end linfo { "stop() stopped engine '#{@service_name}'" } nil end |
#unregister_participant(participant_name) ⇒ Object
Removes the first participant matching the given name from the participant map kept by the engine.
330 331 332 333 |
# File 'lib/openwfe/engine/engine.rb', line 330 def unregister_participant (participant_name) get_participant_map.unregister_participant participant_name end |
#update_expression(fexp) ⇒ Object
Replaces an expression in the pool with a newer version of it.
(useful when fixing processes on the fly)
801 802 803 804 805 806 |
# File 'lib/openwfe/engine/engine.rb', line 801 def update_expression (fexp) fexp.application_context = application_context get_expression_pool.update fexp end |
#update_expression_data(fei, data) ⇒ Object
Use only when doing “process gardening”.
This method updates an expression, the ‘data’ parameter is expected to be a hash. If the expression is an Environment, the variables will be merged with the ones found in the data param. If the expression is not an Environment, the data will be merged into the ‘applied_workitem’ if any.
If the merge is not possible, an exception will be raised.
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 |
# File 'lib/openwfe/engine/engine.rb', line 761 def update_expression_data (fei, data) fexp = fetch_exp fei original = if fexp.is_a?(Environment) fexp.variables else fexp.applied_workitem.attributes end original.merge! data get_expression_pool.update fexp end |
#update_raw_expression(fei, representation) ⇒ Object
A variant of update_expression() that directly replaces the raw representation stored within a RawExpression.
Useful for modifying [not yet reached] segments of processes.
784 785 786 787 788 789 790 791 792 793 794 |
# File 'lib/openwfe/engine/engine.rb', line 784 def update_raw_expression (fei, representation) fexp = fetch_exp fei raise "cannot update already applied expression" \ unless fexp.is_a?(RawExpression) fexp.raw_representation = representation get_expression_pool.update fexp end |
#wait_for(fei_or_wfid) ⇒ Object
Waits for a given process instance to terminate. The method only exits when the flow terminates, but beware : if the process already terminated, the method will never exit.
The parameter can be a FlowExpressionId instance, for example the one given back by a launch(), or directly a workflow instance id (String).
This method is mainly used in utests.
482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 |
# File 'lib/openwfe/engine/engine.rb', line 482 def wait_for (fei_or_wfid) wfid = if fei_or_wfid.kind_of?(FlowExpressionId) fei_or_wfid.workflow_instance_id else fei_or_wfid end t = Thread.new { Thread.stop } to = get_expression_pool.add_observer(:terminate) do |c, fe, wi| t.wakeup if (fe.fei.workflow_instance_id == wfid and t.alive?) end te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e| t.wakeup if (fei.parent_wfid == wfid and t.alive?) end #tc = get_expression_pool.add_observer(:cancel) do |c, fe| # if (fe.fei.wfid == wfid and fe.fei.expid == "0" and t.alive?) # sleep 0.500 # t.wakeup # end #end linfo { "wait_for() #{wfid}" } t.join get_expression_pool.remove_observer(to, :terminate) get_expression_pool.remove_observer(te, :error) #get_expression_pool.remove_observer(tc, :cancel) # # it would work as well without specifying the channel, # but it's thus a little bit faster end |