Class: OpenWFE::ExpressionPool
- Inherits:
-
Object
- Object
- OpenWFE::ExpressionPool
- Includes:
- MonitorMixin, FeiMixin, OwfeObservable, OwfeServiceLocator, ServiceMixin, WorkqueueMixin
- Defined in:
- lib/openwfe/expool/expressionpool.rb
Overview
The ExpressionPool stores expressions (pieces of workflow instance). It’s the core of the workflow engine. It relies on an expression storage for actual persistence of the expressions.
Instance Attribute Summary collapse
-
#paused_instances ⇒ Object
readonly
The hash containing the wfid of the process instances currently paused.
Attributes included from ServiceMixin
Attributes included from Contextual
Instance Method Summary collapse
-
#apply(exp, workitem) ⇒ Object
Applies a given expression (id or expression).
-
#cancel(exp) ⇒ Object
Cancels the given expression.
-
#cancel_expression(exp) ⇒ Object
Cancels the given expression and makes sure to resume the flow if the expression or one of its children were active.
-
#cancel_process(exp_or_wfid) ⇒ Object
(also: #cancel_flow)
Given any expression of a process, cancels the complete process instance.
-
#determine_rep(param) ⇒ Object
Gets the process definition (if necessary) and turns into into an expression tree (for storing into a RawExpression).
-
#engine_environment_id ⇒ Object
Returns the unique engine_environment FlowExpressionId instance.
-
#fetch(exp) ⇒ Object
Fetches a FlowExpression from the pool.
-
#fetch_engine_environment ⇒ Object
Returns the engine environment (the top level environment).
-
#fetch_expression(exp) ⇒ Object
Fetches a FlowExpression (returns only the FlowExpression instance).
-
#fetch_root(wfid) ⇒ Object
Fetches the root expression of a process (or a subprocess).
-
#forget(parent_exp, exp) ⇒ Object
Forgets the given expression (makes sure to substitute its parent_id with the GONE_PARENT_ID constant).
-
#get_monitor(fei) ⇒ Object
Obtains a unique monitor for an expression.
-
#initialize(service_name, application_context) ⇒ ExpressionPool
constructor
The constructor for the expression pool.
-
#launch(launchitem, options = {}) ⇒ Object
Instantiates a workflow definition and launches it.
-
#launch_template(requesting_expression, env_id, sub_id, template, workitem, params = nil) ⇒ Object
launches a subprocess.
-
#list_processes(options = {}) ⇒ Object
Lists all workflows (processes) currently in the expool (in the engine).
-
#notify_error(error, fei, message, workitem) ⇒ Object
This method is called when apply() or reply() failed for an expression.
-
#prepare_from_template(requesting_expression, env_id, sub_id, template, params = nil) ⇒ Object
Prepares a raw expression from a template.
-
#prepare_raw_expression(launchitem) ⇒ Object
This method is called by the launch method.
-
#process_stack(wfid, unapplied = false) ⇒ Object
Returns the list of applied expressions belonging to a given workflow instance.
-
#remove(exp) ⇒ Object
Removes a flow expression from the pool (This method is mainly called from the pool itself).
-
#reply(exp, workitem) ⇒ Object
Replies to a given expression.
-
#reply_to_parent(exp, workitem, remove = true) ⇒ Object
Replies to the parent of the given expression.
-
#reschedule ⇒ Object
This method is called at each expool (engine) [re]start.
-
#stop ⇒ Object
Stops this expression pool (especially its workqueue).
-
#update(flow_expression) ⇒ Object
Adds or updates a flow expression in this pool.
Methods included from WorkqueueMixin
#is_workqueue_busy?, #queue_work, #start_workqueue, #stop_workqueue
Methods included from OwfeObservable
#add_observer, #remove_observer
Methods included from OwfeServiceLocator
#get_engine, #get_error_journal, #get_expool, #get_expression_map, #get_expression_pool, #get_expression_storage, #get_expression_storages, #get_journal, #get_participant_map, #get_scheduler, #get_wfid_generator
Methods included from ServiceMixin
Methods included from Contextual
#get_work_directory, #init_service, #lookup
Methods included from Logging
#ldebug, #ldebug_callstack, #lerror, #lfatal, #linfo, #llog, #lunknown, #lwarn
Constructor Details
#initialize(service_name, application_context) ⇒ ExpressionPool
The constructor for the expression pool.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/openwfe/expool/expressionpool.rb', line 104 def initialize (service_name, application_context) super() service_init(service_name, application_context) @paused_instances = {} @monitors = MonitorProvider.new(application_context) @observers = {} @stopped = false engine_environment_id # makes sure it's called now start_workqueue end |
Instance Attribute Details
#paused_instances ⇒ Object (readonly)
The hash containing the wfid of the process instances currently paused.
99 100 101 |
# File 'lib/openwfe/expool/expressionpool.rb', line 99 def paused_instances @paused_instances end |
Instance Method Details
#apply(exp, workitem) ⇒ Object
Applies a given expression (id or expression)
346 347 348 349 350 |
# File 'lib/openwfe/expool/expressionpool.rb', line 346 def apply (exp, workitem) queue_work :do_apply, exp, workitem #do_apply exp, workitem end |
#cancel(exp) ⇒ Object
Cancels the given expression. The param might be an expression instance or a FlowExpressionId instance.
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 |
# File 'lib/openwfe/expool/expressionpool.rb', line 366 def cancel (exp) exp, fei = fetch exp unless exp ldebug { "cancel() cannot cancel missing #{fei.to_debug_s}" } return nil end ldebug { "cancel() for #{fei.to_debug_s}" } onotify :cancel, exp inflowitem = exp.cancel() remove exp inflowitem end |
#cancel_expression(exp) ⇒ Object
Cancels the given expression and makes sure to resume the flow if the expression or one of its children were active.
If the cancelled branch was not active, this method will take care of removing the cancelled expression from the parent expression.
393 394 395 396 397 398 399 400 401 402 403 404 405 |
# File 'lib/openwfe/expool/expressionpool.rb', line 393 def cancel_expression (exp) exp = fetch_expression exp wi = cancel exp if wi reply_to_parent exp, wi, false else parent_exp = fetch_expression exp.parent_id parent_exp.remove_child(exp.fei) if parent_exp end end |
#cancel_process(exp_or_wfid) ⇒ Object Also known as: cancel_flow
Given any expression of a process, cancels the complete process instance.
411 412 413 414 415 416 417 418 419 420 421 422 |
# File 'lib/openwfe/expool/expressionpool.rb', line 411 def cancel_process (exp_or_wfid) wfid = extract_wfid exp_or_wfid, false ldebug { "cancel_process() '#{wfid}'" } root = fetch_root wfid raise "no process to cancel '#{wfid}'" unless root cancel root end |
#determine_rep(param) ⇒ Object
Gets the process definition (if necessary) and turns into into an expression tree (for storing into a RawExpression).
762 763 764 765 766 767 |
# File 'lib/openwfe/expool/expressionpool.rb', line 762 def determine_rep (param) param = read_uri(param) if param.is_a?(URI) DefParser.parse param end |
#engine_environment_id ⇒ Object
Returns the unique engine_environment FlowExpressionId instance. There is only one such environment in an engine, hence this ‘singleton’ method.
664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 |
# File 'lib/openwfe/expool/expressionpool.rb', line 664 def engine_environment_id () #synchronize do return @eei if @eei @eei = FlowExpressionId.new @eei.owfe_version = OPENWFERU_VERSION @eei.engine_id = get_engine.service_name @eei.initial_engine_id = @eei.engine_id @eei.workflow_definition_url = 'ee' @eei.workflow_definition_name = 'ee' @eei.workflow_definition_revision = '0' @eei.workflow_instance_id = '0' @eei.expression_name = EN_ENVIRONMENT @eei.expression_id = '0' @eei #end end |
#fetch(exp) ⇒ Object
Fetches a FlowExpression from the pool. Returns a tuple : the FlowExpression plus its FlowExpressionId.
The param ‘exp’ may be a FlowExpressionId or a FlowExpression that has to be reloaded.
537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 |
# File 'lib/openwfe/expool/expressionpool.rb', line 537 def fetch (exp) synchronize do #ldebug { "fetch() exp is of kind #{exp.class}" } fei = if exp.is_a?(FlowExpression) exp.fei elsif not exp.is_a?(FlowExpressionId) raise \ "Cannot fetch expression with key : "+ "'#{fei}' (#{fei.class})" else exp end #ldebug { "fetch() for #{fei.to_debug_s}" } [ get_expression_storage()[fei], fei ] end end |
#fetch_engine_environment ⇒ Object
Returns the engine environment (the top level environment)
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 |
# File 'lib/openwfe/expool/expressionpool.rb', line 578 def fetch_engine_environment synchronize do # # synchronize to ensure that there's 1! engine env eei = engine_environment_id ee, fei = fetch eei return ee if ee ee = Environment.new_env( eei, nil, nil, @application_context, nil) ee.store_itself ee end end |
#fetch_expression(exp) ⇒ Object
Fetches a FlowExpression (returns only the FlowExpression instance)
The param ‘exp’ may be a FlowExpressionId or a FlowExpression that has to be reloaded.
569 570 571 572 573 |
# File 'lib/openwfe/expool/expressionpool.rb', line 569 def fetch_expression (exp) exp, fei = fetch exp exp end |
#fetch_root(wfid) ⇒ Object
Fetches the root expression of a process (or a subprocess).
600 601 602 603 |
# File 'lib/openwfe/expool/expressionpool.rb', line 600 def fetch_root (wfid) get_expression_storage.fetch_root wfid end |
#forget(parent_exp, exp) ⇒ Object
Forgets the given expression (makes sure to substitute its parent_id with the GONE_PARENT_ID constant)
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 |
# File 'lib/openwfe/expool/expressionpool.rb', line 429 def forget (parent_exp, exp) exp, fei = fetch exp #ldebug { "forget() forgetting #{fei}" } return if not exp onotify :forget, exp parent_exp.children.delete(fei) exp.parent_id = GONE_PARENT_ID exp.dup_environment exp.store_itself() ldebug { "forget() forgot #{fei}" } end |
#get_monitor(fei) ⇒ Object
Obtains a unique monitor for an expression. It avoids the need for the FlowExpression instances to include the monitor mixin by themselves
143 144 145 146 |
# File 'lib/openwfe/expool/expressionpool.rb', line 143 def get_monitor (fei) @monitors[fei] end |
#launch(launchitem, options = {}) ⇒ Object
Instantiates a workflow definition and launches it.
This method call will return immediately, it could even return before the actual launch is completely over.
Returns the FlowExpressionId instance of the root expression of the newly launched flow.
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/openwfe/expool/expressionpool.rb', line 196 def launch (launchitem, ={}) # # prepare raw expression raw_expression = prepare_raw_expression launchitem # # will raise an exception if there are requirements # and one of them is not met raw_expression.new_environment # # as this expression is the root of a new process instance, # it has to have an environment for all the variables of # the process instance raw_expression = wrap_in_schedule(raw_expression, ) \ if .size > 0 fei = raw_expression.fei # # apply prepared raw expression wi = build_workitem launchitem onotify :launch, fei, launchitem apply raw_expression, wi fei end |
#launch_template(requesting_expression, env_id, sub_id, template, workitem, params = nil) ⇒ Object
launches a subprocess
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
# File 'lib/openwfe/expool/expressionpool.rb', line 306 def launch_template ( requesting_expression, env_id, sub_id, template, workitem, params=nil) rawexp = prepare_from_template( requesting_expression, env_id, sub_id, template, params) workitem.flow_expression_id = rawexp.fei onotify :launch_template, rawexp.fei, workitem apply rawexp, workitem rawexp.fei end |
#list_processes(options = {}) ⇒ Object
Lists all workflows (processes) currently in the expool (in the engine). This method will return a list of “process-definition” expressions (root of flows).
714 715 716 717 718 719 720 721 722 723 |
# File 'lib/openwfe/expool/expressionpool.rb', line 714 def list_processes (={}) [:include_classes] = DefineExpression # # Maybe it would be better to list root expressions instead # so that expressions like 'sequence' can be used # as root expressions. Later... get_expression_storage.find_expressions end |
#notify_error(error, fei, message, workitem) ⇒ Object
This method is called when apply() or reply() failed for an expression. There are currently only two ‘users’, the ParticipantExpression class and the do_process_workelement method of this ExpressionPool class.
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 |
# File 'lib/openwfe/expool/expressionpool.rb', line 732 def notify_error (error, fei, , workitem) fei = extract_fei fei # densha requires that... :( se = OpenWFE::exception_to_s error onotify :error, fei, , workitem, error.class.name, se #fei = extract_fei fei if error.is_a?(PausedError) lwarn do "#{self.service_name} " + "operation :#{message.to_s} on #{fei.to_s} " + "delayed because process '#{fei.wfid}' is in pause" end else lwarn do "#{self.service_name} " + "operation :#{message.to_s} on #{fei.to_s} " + "failed with\n" + se end end end |
#prepare_from_template(requesting_expression, env_id, sub_id, template, params = nil) ⇒ Object
Prepares a raw expression from a template. Returns that raw expression.
Used in the concurrent-iterator when building up the children list and of course used by the launch_template() method.
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/openwfe/expool/expressionpool.rb', line 236 def prepare_from_template ( requesting_expression, env_id, sub_id, template, params=nil) rawexp = if template.is_a?(RawExpression) template.application_context = @application_context template elsif template.is_a?(FlowExpressionId) fetch_expression template else build_raw_expression nil, template end #raise "did not find subprocess in : #{template.to_s}" # unless rawexp rawexp = rawexp.dup rawexp.fei = rawexp.fei.dup if requesting_expression == nil rawexp.parent_id = nil rawexp.fei.workflow_instance_id = get_wfid_generator.generate elsif requesting_expression.kind_of?(FlowExpressionId) rawexp.parent_id = requesting_expression rawexp.fei.workflow_instance_id = \ "#{requesting_expression.workflow_instance_id}.#{sub_id}" elsif requesting_expression.kind_of?(String) rawexp.parent_id = nil rawexp.fei.workflow_instance_id = \ "#{requesting_expression}.#{sub_id}" else # kind is FlowExpression rawexp.parent_id = requesting_expression.fei rawexp.fei.workflow_instance_id = \ "#{requesting_expression.fei.workflow_instance_id}.#{sub_id}" end #ldebug do # "launch_template() spawning wfid " + # "#{rawexp.fei.workflow_instance_id.to_s}" #end if env_id rawexp.environment_id = env_id else # # the new scope gets its own environment # rawexp.new_environment params end rawexp.store_itself rawexp end |
#prepare_raw_expression(launchitem) ⇒ Object
This method is called by the launch method. It’s actually the first stage of that method. It may be interessant to use to ‘validate’ a launchitem and its process definition, as it will raise an exception in case of ‘parameter’ mismatch.
There is a ‘pre_launch_check’ alias for this method in the Engine class.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/openwfe/expool/expressionpool.rb', line 158 def prepare_raw_expression (launchitem) wfdurl = launchitem.workflow_definition_url raise "launchitem.workflow_definition_url not set, cannot launch" \ unless wfdurl definition = if wfdurl.match "^field:" wfdfield = wfdurl[6..-1] launchitem.attributes.delete wfdfield else read_uri wfdurl end raise "didn't find process definition at '#{wfdurl}'" \ unless definition raw_expression = build_raw_expression launchitem, definition raw_expression.check_parameters launchitem # # will raise an exception if there are requirements # and one of them is not met raw_expression end |
#process_stack(wfid, unapplied = false) ⇒ Object
Returns the list of applied expressions belonging to a given workflow instance.
If the unapplied optional parameter is set to true, all the expressions (even those not yet applied) that compose the process instance will be returned. Environments will be returned as well.
691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 |
# File 'lib/openwfe/expool/expressionpool.rb', line 691 def process_stack (wfid, unapplied=false) #raise "please provide a non-nil workflow instance id" # unless wfid wfid = extract_wfid wfid, true params = { #:exclude_classes => [ Environment, RawExpression ], #:exclude_classes => [ Environment ], :parent_wfid => wfid } params[:applied] = true if (not unapplied) get_expression_storage.find_expressions params end |
#remove(exp) ⇒ Object
Removes a flow expression from the pool (This method is mainly called from the pool itself)
609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 |
# File 'lib/openwfe/expool/expressionpool.rb', line 609 def remove (exp) exp, _fei = fetch(exp) \ if exp.is_a?(FlowExpressionId) return unless exp ldebug { "remove() fe #{exp.fei.to_debug_s}" } onotify :remove, exp.fei synchronize do @monitors.delete(exp.fei) remove_environment(exp.environment_id) \ if exp.owns_its_environment? end end |
#reply(exp, workitem) ⇒ Object
Replies to a given expression
355 356 357 358 359 |
# File 'lib/openwfe/expool/expressionpool.rb', line 355 def reply (exp, workitem) queue_work :do_reply, exp, workitem #do_reply exp, workitem end |
#reply_to_parent(exp, workitem, remove = true) ⇒ Object
Replies to the parent of the given expression.
451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 |
# File 'lib/openwfe/expool/expressionpool.rb', line 451 def reply_to_parent (exp, workitem, remove=true) ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" } workitem.last_expression_id = exp.fei onotify :reply_to_parent, exp, workitem if remove remove(exp) # # remove the expression itself exp.clean_children() # # remove all the children of the expression end # # manage tag, have to remove it so it can get 'redone' or 'undone' # (preventing abuse) tagname = exp.attributes["tag"] if exp.attributes exp.delete_variable(tagname) if tagname # # flow terminated ? if not exp.parent_id ldebug do "reply_to_parent() process " + "#{exp.fei.workflow_instance_id} terminated" end onotify :terminate, exp, workitem return end # # else, gone parent ? if exp.parent_id == GONE_PARENT_ID ldebug do "reply_to_parent() parent is gone for " + exp.fei.to_debug_s end return end # # parent still present, reply to it reply exp.parent_id, workitem end |
#reschedule ⇒ Object
This method is called at each expool (engine) [re]start. It roams through the previously saved (persisted) expressions to reschedule ones like ‘sleep’ or ‘cron’.
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 |
# File 'lib/openwfe/expool/expressionpool.rb', line 634 def reschedule return if @stopped synchronize do t = OpenWFE::Timer.new linfo { "reschedule() initiating..." } = { :include_classes => Rufus::Schedulable } get_expression_storage.find_expressions().each do |fexp| linfo { "reschedule() for #{fexp.fei.to_s}..." } onotify :reschedule, fexp.fei fexp.reschedule get_scheduler end linfo { "reschedule() done. (took #{t.duration} ms)" } end end |
#stop ⇒ Object
Stops this expression pool (especially its workqueue).
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/openwfe/expool/expressionpool.rb', line 127 def stop @stopped = true stop_workqueue # # flushes the work queue onotify :stop end |
#update(flow_expression) ⇒ Object
Adds or updates a flow expression in this pool
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 |
# File 'lib/openwfe/expool/expressionpool.rb', line 514 def update (flow_expression) ldebug { "update() for #{flow_expression.fei.to_debug_s}" } #t = Timer.new onotify :update, flow_expression.fei, flow_expression #ldebug do # "update() took #{t.duration} ms " + # "#{flow_expression.fei.to_debug_s}" #end flow_expression end |