Class: OpenC3::QueueMicroservice
- Inherits:
-
Microservice
- Object
- Microservice
- OpenC3::QueueMicroservice
- Defined in:
- lib/openc3/microservices/queue_microservice.rb
Overview
The queue microservice starts a processor then gets the queue entries from redis. It then monitors the QueueTopic for changes.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#processor ⇒ Object
readonly
Returns the value of attribute processor.
-
#processor_thread ⇒ Object
readonly
Returns the value of attribute processor_thread.
Attributes inherited from Microservice
#count, #custom, #error, #logger, #microservice_status_thread, #scope, #secrets, #state
Instance Method Summary collapse
- #block_for_updates ⇒ Object
-
#initialize(*args) ⇒ QueueMicroservice
constructor
A new instance of QueueMicroservice.
- #run ⇒ Object
- #shutdown ⇒ Object
Methods inherited from Microservice
#as_json, #microservice_cmd, run, #setup_microservice_topic
Constructor Details
#initialize(*args) ⇒ QueueMicroservice
Returns a new instance of QueueMicroservice.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 95 def initialize(*args) super(*args) @queue_name = @name.split('__')[2] initial_state = 'HOLD' (@config['options'] || []).each do |option| case option[0].upcase when 'QUEUE_STATE' initial_state = option[1] else @logger.error("Unknown option passed to microservice #{@name}: #{option}") end end @processor = QueueProcessor.new(name: @queue_name, state: initial_state, logger: @logger, scope: @scope) @processor_thread = nil @read_topic = true end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
93 94 95 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 93 def name @name end |
#processor ⇒ Object (readonly)
Returns the value of attribute processor.
93 94 95 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 93 def processor @processor end |
#processor_thread ⇒ Object (readonly)
Returns the value of attribute processor_thread.
93 94 95 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 93 def processor_thread @processor_thread end |
Instance Method Details
#block_for_updates ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 138 def block_for_updates @read_topic = true while @read_topic && !@cancel_thread begin QueueTopic.read_topics(@topics) do |_topic, _msg_id, msg_hash, _redis| data = JSON.parse(msg_hash['data']) if msg_hash['data'] if data['name'] == @queue_name and msg_hash['kind'] == 'updated' @processor.state = data['state'] end end rescue StandardError => e @logger.error "QueueMicroservice failed to read topics #{@topics}\n#{e.formatted}" end end end |
#run ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 114 def run @logger.info "QueueMicroservice running" @processor_thread = Thread.new { @processor.run } # Let the frontend know that the microservice has been deployed and is running notification = { 'kind' => 'deployed', # name and updated_at fields are required for Event formatting 'data' => JSON.generate({ 'name' => @name, 'updated_at' => Time.now.to_nsec_from_epoch, }), } QueueTopic.write_notification(notification, scope: @scope) loop do break if @cancel_thread block_for_updates() end @processor.shutdown() @processor_thread.join() if @processor_thread @logger.info "QueueMicroservice exiting" end |
#shutdown ⇒ Object
154 155 156 157 158 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 154 def shutdown @read_topic = false @processor.shutdown() if @processor super end |