Class: DaemonKit::RuoteWorkitem

Inherits:
Object
  • Object
show all
Defined in:
lib/daemon_kit/ruote_workitem.rb

Overview

Dual purpose class that is a) responsible for parsing incoming workitems and delegating to the correct RuotePseudoParticipant, and b) wrapping the workitem hash into something a bit more digestable.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workitem = {}) ⇒ RuoteWorkitem

Returns a new instance of RuoteWorkitem.



109
110
111
# File 'lib/daemon_kit/ruote_workitem.rb', line 109

def initialize( workitem = {} )
  @workitem = workitem
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method_name, *args) ⇒ Object



178
179
180
181
182
183
184
# File 'lib/daemon_kit/ruote_workitem.rb', line 178

def method_missing( method_name, *args )
  if self.fields.keys.include?( method_name.to_s )
    return self.fields[ method_name.to_s ]
  end

  super
end

Class Method Details

.parse(workitem) ⇒ Object



99
100
101
102
103
104
105
106
# File 'lib/daemon_kit/ruote_workitem.rb', line 99

def parse( workitem )
  begin
    return new( JSON.parse( workitem ) )
  rescue JSON::ParserError => e
    DaemonKit.logger.error "No valid JSON payload found in #{workitem}"
    return nil
  end
end

.parse_command(work) ⇒ Object

Extract the class and method name from the workitem, then pick the matching class from the registered list of participants



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/daemon_kit/ruote_workitem.rb', line 72

def parse_command( work )
  return nil if work['params']['command'].nil?

  _, klass, method = work['params']['command'].split('/')

  instance = RuoteParticipants.instance.participants[ klass ]

  if instance.nil?
    msg = "No instance registered for #{klass}"
    DaemonKit.logger.error( msg )
    raise DaemonKit::MissingParticipant, msg
  end

  return instance, method
end

.process(transport, from, workitem) ⇒ Object

Expects a JSON workitem from ruote that has these fields set in fields key:

{
  'reply_queue'    => 'queue to send replies to',
  'params' => {
    'command'  => '/actor/method'
  }
}

Notes on the command key:

It looks like a resource, and will be treated as such. Is should be in the format of /class/method, and it will be passed the complete workitem as a hash.

Notes on replies

Replies are sent back to the queue specified in the reply_queue key.

Notes on errors

Where daemon-kit detects errors in attempting to parse and delegate the workitems, it will reply to the engine and set the following field with the error information:

daemon_kit.error


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/daemon_kit/ruote_workitem.rb', line 39

def process( transport, from, workitem )
  # keep it singleton
  @instance ||= new

  work = parse( workitem )

  # Invalid JSON... mmm
  return if work.nil?

  DaemonKit.logger.warn "Processing workitem that has timed out!" if work.timed_out?

  target, method = parse_command( work )

  if target.nil? || method.nil?
    msg = "Missing target/method in command parameter, or command parameter missing"
    DaemonKit.logger.error( msg )
    work["__error__"] = msg

  elsif target.public_methods.map { |m| m.to_s }.include?( method ) # 1.8.x => [ 'foo' ]
    target.perform( method, work )                                  # 1.9.x => [ :foo ]

  else
    msg = "Workitem cannot be processes: '#{method}' not exposed by #{target.inspect}"
    DaemonKit.logger.error( msg )
    p [ :work, work.inspect ]
    work["__error__"] = msg
  end

  reply_to_engine( transport, from, work )
end

.reply_to_engine(transport, from, response) ⇒ Object



88
89
90
# File 'lib/daemon_kit/ruote_workitem.rb', line 88

def reply_to_engine( transport, from, response )
  send( "reply_via_#{transport}", from, response )
end

.reply_via_amqp(destination_queue, response) ⇒ Object



92
93
94
95
96
97
# File 'lib/daemon_kit/ruote_workitem.rb', line 92

def reply_via_amqp( destination_queue, response )
  DaemonKit.logger.debug("Replying to engine via AMQP with #{response.inspect}")
  ::AMQP::Channel.new.queue( destination_queue, :durable => true ).publish( response.to_json )

  response
end

Instance Method Details

#[](key) ⇒ Object



153
154
155
# File 'lib/daemon_kit/ruote_workitem.rb', line 153

def []( key )
  self.fields[ key ]
end

#[]=(key, value) ⇒ Object



157
158
159
# File 'lib/daemon_kit/ruote_workitem.rb', line 157

def []=( key, value )
  self.fields[ key ] = value
end

#dispatch_timeObject



127
128
129
# File 'lib/daemon_kit/ruote_workitem.rb', line 127

def dispatch_time
  @dispath_time ||= Time.parse( @workitem['dispatch_time'] )
end

#feiObject



113
114
115
# File 'lib/daemon_kit/ruote_workitem.rb', line 113

def fei
  @workitem['fei']
end

#fieldsObject Also known as: attributes



143
144
145
# File 'lib/daemon_kit/ruote_workitem.rb', line 143

def fields
  @workitem['fields'] ||= @workitem['attributes']
end

#has_field?(a) ⇒ Boolean Also known as: has_attribute?

Returns:

  • (Boolean)


139
140
141
# File 'lib/daemon_kit/ruote_workitem.rb', line 139

def has_field?(a)
  self.fields.keys.include?( a )
end

#last_modifiedObject



131
132
133
# File 'lib/daemon_kit/ruote_workitem.rb', line 131

def last_modified
  @last_modified ||= Time.parse( @workitem['last_modified'] )
end

#participant_nameObject



135
136
137
# File 'lib/daemon_kit/ruote_workitem.rb', line 135

def participant_name
  @workitem['participant_name']
end

#short_feiObject



117
118
119
120
121
122
123
124
125
# File 'lib/daemon_kit/ruote_workitem.rb', line 117

def short_fei
  @short_fei ||=
    '(' + [
           'fei', self.fei['owfe_version'], self.fei['engine_id'],
           self.fei['workflow_definition_url'], self.fei['workflow_definition_name'],
           self.fei['workflow_definition_revision'], self.fei['wfid'],
           self.fei['expression_name'], self.fei['expid']
          ].join(' ') + ')'
end

#timed_out?Boolean

Look at the workitem payload and attempt to determine if this workitem has timed out or not. This method will only ever work if you used the +:timeout: parameter was set for the expression.

Returns:

  • (Boolean)


168
169
170
171
172
173
174
175
176
# File 'lib/daemon_kit/ruote_workitem.rb', line 168

def timed_out?
  key = fei['wfid'] + '__' + fei['expid']

  if self.fields["__timeouts__"] && timeout = self.fields["__timeouts__"][ key ]
    return Time.at( timeout.last ) < Time.now
  end

  return false
end

#to_jsonObject



161
162
163
# File 'lib/daemon_kit/ruote_workitem.rb', line 161

def to_json
  @workitem.to_json
end