Class: DaemonKit::RuoteWorkitem

Inherits:
Object
  • Object
show all
Defined in:
lib/daemon_kit/ruote_workitem.rb

Overview

Dual purpose class that is a) responsible for parsing incoming workitems and delegating to the correct RuotePseudoParticipant, and b) wrapping the workitem hash into something a bit more digestable.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workitem = {}) ⇒ RuoteWorkitem

Returns a new instance of RuoteWorkitem.



101
102
103
# File 'lib/daemon_kit/ruote_workitem.rb', line 101

def initialize( workitem = {} )
  @workitem = workitem
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method_name, *args) ⇒ Object



160
161
162
163
164
165
166
# File 'lib/daemon_kit/ruote_workitem.rb', line 160

def method_missing( method_name, *args )
  if self.attributes.keys.include?( method_name.to_s )
    return self.attributes[ method_name.to_s ]
  end

  super
end

Class Method Details

.parse(workitem) ⇒ Object



96
97
98
# File 'lib/daemon_kit/ruote_workitem.rb', line 96

def parse( workitem )
  new( JSON.parse( workitem ) )
end

.parse_command(work) ⇒ Object

Extract the class and method name from the workitem, then pick the matching class from the registered list of participants



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/daemon_kit/ruote_workitem.rb', line 68

def parse_command( work )
  return nil if work['params']['command'].nil?

  _, klass, method = work['params']['command'].split('/')

  instance = RuoteParticipants.instance.participants[ klass ]

  if instance.nil?
    msg = "No instance registered for #{klass}"
    DaemonKit.logger.error( msg )
    raise DaemonKit::MissingParticipant, msg
  end

  return instance, method
end

.process(transport, workitem) ⇒ Object

Process incoming commands via an AMQP queue

Expects a JSON workitem from ruote that has these fields set in attributes key:

{
  'reply_queue'    => 'queue to send replies to',
  'params' => {
    'command'  => '/actor/method'
  }
}

Notes on the command key:

It looks like a resource, and will be treated as such. Is should be in the format of /class/method, and it will be passed the complete workitem as a hash.

Notes on replies

Replies are sent back to the queue specified in the reply_queue key.

Notes on errors

Where daemon-kit detects errors in attempting to parse and delegate the workitems, it will reply to the engine and set the following field with the error information:

daemon_kit.error


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/daemon_kit/ruote_workitem.rb', line 39

def process( transport, workitem )
  # keep it singleton
  @instance ||= new

  work = parse( workitem )

  DaemonKit.logger.warn "Processing workitem that has timed out!" if work.timed_out?

  target, method = parse_command( work )

  if target.nil? || method.nil?
    msg = "Missing target/method in command parameter, or command parameter missing"
    DaemonKit.logger.error( msg )
    work["daemon_kit"] = { "error" => msg }

  elsif target.public_methods.include?( method )
    target.perform( method, work )

  else
    msg = "Workitem cannot be processes: #{method} not exposed by #{target.inspect}"
    DaemonKit.logger.error( msg )
    work["daemon_kit"] = { "error" => msg }
  end

  reply_to_engine( transport, work )
end

.reply_to_engine(transport, response) ⇒ Object



84
85
86
# File 'lib/daemon_kit/ruote_workitem.rb', line 84

def reply_to_engine( transport, response )
  send( "reply_via_#{transport}", response )
end

.reply_via_amqp(response) ⇒ Object



88
89
90
91
92
93
94
# File 'lib/daemon_kit/ruote_workitem.rb', line 88

def reply_via_amqp( response )
  DaemonKit.logger.debug("Replying to engine via AMQP with #{response.inspect}")

  ::MQ.queue( response['reply_queue'] ).publish( response.to_json )

  response
end

Instance Method Details

#[](key) ⇒ Object



135
136
137
# File 'lib/daemon_kit/ruote_workitem.rb', line 135

def []( key )
  self.attributes[ key ]
end

#[]=(key, value) ⇒ Object



139
140
141
# File 'lib/daemon_kit/ruote_workitem.rb', line 139

def []=( key, value )
  self.attributes[ key ] = value
end

#attributesObject



131
132
133
# File 'lib/daemon_kit/ruote_workitem.rb', line 131

def attributes
  @workitem['attributes']
end

#dispatch_timeObject



119
120
121
# File 'lib/daemon_kit/ruote_workitem.rb', line 119

def dispatch_time
  @dispath_time ||= Time.parse( @workitem['dispatch_time'] )
end

#feiObject



105
106
107
# File 'lib/daemon_kit/ruote_workitem.rb', line 105

def fei
  @workitem['flow_expression_id']
end

#last_modifiedObject



123
124
125
# File 'lib/daemon_kit/ruote_workitem.rb', line 123

def last_modified
  @last_modified ||= Time.parse( @workitem['last_modified'] )
end

#participant_nameObject



127
128
129
# File 'lib/daemon_kit/ruote_workitem.rb', line 127

def participant_name
  @workitem['participant_name']
end

#short_feiObject



109
110
111
112
113
114
115
116
117
# File 'lib/daemon_kit/ruote_workitem.rb', line 109

def short_fei
  @short_fei ||=
    '(' + [
           'fei', self.fei['owfe_version'], self.fei['engine_id'],
           self.fei['workflow_definition_url'], self.fei['workflow_definition_name'],
           self.fei['workflow_definition_revision'], self.fei['workflow_instance_id'],
           self.fei['expression_name'], self.fei['expression_id']
          ].join(' ') + ')'
end

#timed_out?Boolean

Look at the workitem payload and attempt to determine if this workitem has timed out or not. This method will only ever work if you used the +:timeout: parameter was set for the expression.

Returns:

  • (Boolean)


150
151
152
153
154
155
156
157
158
# File 'lib/daemon_kit/ruote_workitem.rb', line 150

def timed_out?
  key = fei['workflow_instance_id'] + '__' + fei['expression_id']

  if self.attributes["__timeouts__"] && timeout = self.attributes["__timeouts__"][ key ]
    return Time.at( timeout.last ) < Time.now
  end

  return false
end

#to_jsonObject



143
144
145
# File 'lib/daemon_kit/ruote_workitem.rb', line 143

def to_json
  @workitem.to_json
end