Class: OpenC3::QueueMicroservice

Inherits:
Microservice show all
Defined in:
lib/openc3/microservices/queue_microservice.rb

Overview

The queue microservice starts a processor then gets the queue entries from redis. It then monitors the QueueTopic for changes.

Instance Attribute Summary collapse

Attributes inherited from Microservice

#count, #custom, #error, #logger, #microservice_status_thread, #scope, #secrets, #state

Instance Method Summary collapse

Methods inherited from Microservice

#as_json, #microservice_cmd, run, #setup_microservice_topic

Constructor Details

#initialize(*args) ⇒ QueueMicroservice

Returns a new instance of QueueMicroservice.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/openc3/microservices/queue_microservice.rb', line 95

def initialize(*args)
  super(*args)
  @queue_name = @name.split('__')[2]

  initial_state = 'HOLD'
  (@config['options'] || []).each do |option|
    case option[0].upcase
    when 'QUEUE_STATE'
      initial_state = option[1]
    else
      @logger.error("Unknown option passed to microservice #{@name}: #{option}")
    end
  end

  @processor = QueueProcessor.new(name: @queue_name, state: initial_state, logger: @logger, scope: @scope)
  @processor_thread = nil
  @read_topic = true
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



93
94
95
# File 'lib/openc3/microservices/queue_microservice.rb', line 93

def name
  @name
end

#processorObject (readonly)

Returns the value of attribute processor.



93
94
95
# File 'lib/openc3/microservices/queue_microservice.rb', line 93

def processor
  @processor
end

#processor_threadObject (readonly)

Returns the value of attribute processor_thread.



93
94
95
# File 'lib/openc3/microservices/queue_microservice.rb', line 93

def processor_thread
  @processor_thread
end

Instance Method Details

#block_for_updatesObject



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/openc3/microservices/queue_microservice.rb', line 138

def block_for_updates
  @read_topic = true
  while @read_topic && !@cancel_thread
    begin
      QueueTopic.read_topics(@topics) do |_topic, _msg_id, msg_hash, _redis|
        data = JSON.parse(msg_hash['data']) if msg_hash['data']
        if data['name'] == @queue_name and msg_hash['kind'] == 'updated'
          @processor.state = data['state']
        end
      end
    rescue StandardError => e
      @logger.error "QueueMicroservice failed to read topics #{@topics}\n#{e.formatted}"
    end
  end
end

#runObject



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/openc3/microservices/queue_microservice.rb', line 114

def run
  @logger.info "QueueMicroservice running"
  @processor_thread = Thread.new { @processor.run }

  # Let the frontend know that the microservice has been deployed and is running
  notification = {
    'kind' => 'deployed',
    # name and updated_at fields are required for Event formatting
    'data' => JSON.generate({
      'name' => @name,
      'updated_at' => Time.now.to_nsec_from_epoch,
    }),
  }
  QueueTopic.write_notification(notification, scope: @scope)

  loop do
    break if @cancel_thread
    block_for_updates()
  end
  @processor.shutdown()
  @processor_thread.join() if @processor_thread
  @logger.info "QueueMicroservice exiting"
end

#shutdownObject



154
155
156
157
158
# File 'lib/openc3/microservices/queue_microservice.rb', line 154

def shutdown
  @read_topic = false
  @processor.shutdown() if @processor
  super
end