Class: Ruote::ProcessObserver

Inherits:
Object
  • Object
show all
Defined in:
lib/ruote/util/process_observer.rb

Overview

A base class for process observers, just to provide convenience. It (heavily) sugar coats the Ruote::Observer and translate the messages into actions. Each such action is provided with pre-distilled information relevant for processes.

Example implementation

class WebsocketSubscriber < Ruote::ProcessObserver
   # override initialize to warm-up a websocket client
   def initialize(context, options={})
     super
     @client = WebsocketClient.new()
   end

   # tell the listeners that a new process launched
   def on_launch(wfid, opts)
     @client.publish(
       "/process/launch",
       { :name       => opts[:workitem].wf_name,
         :wfid       => wfid,
         :definition => opts[:pdef],
       }
     )
   end

   # tell the listeners that a new process ended
   def on_end(wfid)
     @client.publish("/process/#{wfid}", { :end => true })
   end
end

Actions

The ProcessObserver adheres closely to the message actions, it calls the following methods:

on_launch

When a process or sub-process starts

on_terminated

When a process ends

on_error_intercepted

When an error was intercepted

on_cancel

When a process or sub-process was canceled

on_dispatch

When a participant is dispatched

on_receive

Whenever a workitem is received

And others, but if you are interested in those; you might be better of using the more low-level Ruote::Observer

Arguments

The methods are called with (wfid[, options])

You can provide a method-signature like:

def on_launch(wfid, options)
def on_launch(wfid)

If the ProcessObserver cannot call the method with the options, it tries to call without options

Options

The following options are provided:

:workitem

The workitem, if available

:action

The original name of the action

:child

Boolean; This is an event of a child, or sub-flow

:error

The intercepted error (only provided with #on_error_intercepted)

:pdef

The (sub-)process definition (only provided with #on_launch)

:variables

The process variables, if available

:flavour

The flavour of canceling (only on_cancel)

Error handling

If anywhere in your implementation an action raises an error, it is caught by the ProcessObserver and silently ignored.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(context, options = {}) ⇒ ProcessObserver

Returns a new instance of ProcessObserver.



103
104
105
106
107
108
109
110
# File 'lib/ruote/util/process_observer.rb', line 103

def initialize(context, options={})
  @filtered_actions = options.delete(:filtered_actions)
  @filtered_actions ||= []
  @filtered_actions |=  %w[dispatched participant_registered variable_set]

  @context = context
  @options = options
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



101
102
103
# File 'lib/ruote/util/process_observer.rb', line 101

def context
  @context
end

#filtered_actionsObject (readonly)

Returns the value of attribute filtered_actions.



101
102
103
# File 'lib/ruote/util/process_observer.rb', line 101

def filtered_actions
  @filtered_actions
end

#optionsObject (readonly)

Returns the value of attribute options.



101
102
103
# File 'lib/ruote/util/process_observer.rb', line 101

def options
  @options
end

Instance Method Details

#on_msg(msg) ⇒ Object

:nodoc:



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/ruote/util/process_observer.rb', line 112

def on_msg(msg) # :nodoc:

  return if @filtered_actions.include? msg['action']

  wfid  = msg['wfid']
  child = false

  if !wfid && msg['parent_id']
    wfid  = msg['parent_id']['wfid']
    child = true
  end

  wfid ||= Ruote.extract_wfid(msg)
  return if !wfid

  workitem = begin
    if msg['workitem']
      Ruote::Workitem.new(Rufus::Json.dup(msg['workitem']))
    else
      Ruote::Workitem.new({})
    end
  rescue
    Ruote::Workitem.new({})
  end

  data = {
    :workitem  => workitem,
    :action    => msg['action'],
    :child     => child,
    :variables => msg['variables'],
  }

  # the prelimenary method name
  method = msg['action'].split('_').first

  # change method or fields based on the action
  case msg['action']
  when 'launch'
    data[:pdef] = msg['tree']

  when 'cancel'
    data[:flavour] = msg['flavour']

  when 'error_intercepted'
    error = Kernel.const_get(msg['error']['class']).new(msg['error']['message'])
    error.set_backtrace msg['error']['trace']

    data[:error] = error
    method = msg['action']
  end

  callback = "on_#{method}"
  if self.respond_to?(callback)
    args = [ wfid ]
    args << data if self.method(callback).arity.abs == 2

    self.send(callback, *args)
  end

  return
rescue
  return
end