Class: OpenC3::ReactionWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/openc3/microservices/reaction_microservice.rb

Overview

The Reaction worker is a very simple thread pool worker. Once the manager queues a trigger to evaluate against the reactions. The worker will check the reactions to see if it needs to fire any reactions.

Constant Summary collapse

REACTION_METRIC_NAME =
'reaction_duration_seconds'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, logger:, scope:, share:, ident:) ⇒ ReactionWorker

Returns a new instance of ReactionWorker.



229
230
231
232
233
234
235
236
237
# File 'lib/openc3/microservices/reaction_microservice.rb', line 229

def initialize(name:, logger:, scope:, share:, ident:)
  @name = name
  @logger = logger
  @scope = scope
  @share = share
  @ident = ident
  @metric_output_time = 0
  @metric = Metric.new(microservice: @name, scope: @scope)
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



227
228
229
# File 'lib/openc3/microservices/reaction_microservice.rb', line 227

def name
  @name
end

#scopeObject (readonly)

Returns the value of attribute scope.



227
228
229
# File 'lib/openc3/microservices/reaction_microservice.rb', line 227

def scope
  @scope
end

#shareObject (readonly)

Returns the value of attribute share.



227
228
229
# File 'lib/openc3/microservices/reaction_microservice.rb', line 227

def share
  @share
end

Instance Method Details

#get_token(username) ⇒ Object



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/openc3/microservices/reaction_microservice.rb', line 239

def get_token(username)
  if ENV['OPENC3_API_CLIENT'].nil?
    return OpenC3Authentication.new().token
  else
    # Check for offline access token
    model = nil
    model = OpenC3::OfflineAccessModel.get_model(name: username, scope: @scope) if username and username != ''
    if model and model.offline_access_token
      auth = OpenC3KeycloakAuthentication.new(ENV['OPENC3_KEYCLOAK_URL'])
      return auth.get_token_from_refresh_token(model.offline_access_token)
    else
      return nil
    end
  end
end

#process_enabled_trigger(data:) ⇒ Object



283
284
285
286
287
288
289
290
291
# File 'lib/openc3/microservices/reaction_microservice.rb', line 283

def process_enabled_trigger(data:)
  start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @share.reaction_base.get_reactions(trigger_name: data['name']).each do | reaction |
    run_reaction(reaction: reaction)
  end
  diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float
  metric_labels = { 'type' => 'trigger', 'thread' => "worker-#{@ident}" }
  @metric.add_sample(name: REACTION_METRIC_NAME, value: diff, labels: metric_labels)
end

#reaction(data:) ⇒ Object



255
256
257
# File 'lib/openc3/microservices/reaction_microservice.rb', line 255

def reaction(data:)
  return ReactionModel.from_json(data, name: data['name'], scope: data['scope'])
end

#runObject



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/openc3/microservices/reaction_microservice.rb', line 259

def run
  @logger.info "ReactionWorker-#{@ident} running"
  loop do
    begin
      kind, data = @share.queue_base.queue.pop
      break if kind.nil? || data.nil?
      case kind
      when 'reaction'
        run_reaction(reaction: reaction(data: data))
      when 'trigger'
        process_enabled_trigger(data: data)
      end
      current_time = Time.now.to_i
      if @metric_output_time < current_time
        @metric.output
        @metric_output_time = current_time + 120
      end
    rescue StandardError => e
      @logger.error "ReactionWorker-#{@ident} failed to evaluate kind: #{kind} data: #{data}\n#{e.formatted}"
    end
  end
  @logger.info "ReactionWorker-#{@ident} exiting"
end

#run_action(reaction:, action:) ⇒ Object



304
305
306
307
308
309
310
311
# File 'lib/openc3/microservices/reaction_microservice.rb', line 304

def run_action(reaction:, action:)
  case action['type']
  when 'command'
    run_command(reaction: reaction, action: action)
  when 'script'
    run_script(reaction: reaction, action: action)
  end
end

#run_command(reaction:, action:) ⇒ Object



313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/openc3/microservices/reaction_microservice.rb', line 313

def run_command(reaction:, action:)
  @logger.debug "ReactionWorker-#{@ident} running reaction #{reaction.name}, command: '#{action['value']}' "
  begin
    username = reaction.username
    token = get_token(username)
    raise "No token available for username: #{username}" unless token
    cmd_no_hazardous_check(action['value'], scope: @scope, token: token)
    @logger.info "ReactionWorker-#{@ident} #{reaction.name} command action complete, #{action['value']}"
  rescue StandardError => e
    @logger.error "ReactionWorker-#{@ident} #{reaction.name} command action failed, #{action}\n#{e.message}"
  end
end

#run_reaction(reaction:) ⇒ Object



293
294
295
296
297
298
299
300
301
302
# File 'lib/openc3/microservices/reaction_microservice.rb', line 293

def run_reaction(reaction:)
  start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  reaction.actions.each do |action|
    run_action(reaction: reaction, action: action)
  end
  @share.reaction_base.sleep(name: reaction.name)
  diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float
  metric_labels = { 'type' => 'reaction', 'thread' => "worker-#{@ident}" }
  @metric.add_sample(name: REACTION_METRIC_NAME, value: diff, labels: metric_labels)
end

#run_script(reaction:, action:) ⇒ Object



326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/openc3/microservices/reaction_microservice.rb', line 326

def run_script(reaction:, action:)
  @logger.debug "ReactionWorker-#{@ident} running reaction #{reaction.name}, script: '#{action['value']}'"
  begin
    username = reaction.username
    token = get_token(username)
    raise "No token available for username: #{username}" unless token
    request = Net::HTTP::Post.new(
      "/script-api/scripts/#{action['value']}/run?scope=#{@scope}",
      'Content-Type' => 'application/json',
      'Authorization' => token
    )
    request.body = JSON.generate({
      'scope' => @scope,
      'environment' => action['environment'],
      'reaction' => reaction.name,
      'id' => Time.now.to_i
    })
    hostname = ENV['OPENC3_SCRIPT_HOSTNAME'] || 'openc3-cosmos-script-runner-api'
    response = Net::HTTP.new(hostname, 2902).request(request)
    raise "failed to call #{hostname}, for script: #{action['value']}, response code: #{response.code}" if response.code != '200'

    @logger.info "ReactionWorker-#{@ident} #{reaction.name} script action complete, #{action['value']} => #{response.body}"
  rescue StandardError => e
    @logger.error "ReactionWorker-#{@ident} #{reaction.name} script action failed, #{action}\n#{e.message}"
  end
end