Class: OpenC3::QueueMicroservice
- Inherits:
-
Microservice
- Object
- Microservice
- OpenC3::QueueMicroservice
- Defined in:
- lib/openc3/microservices/queue_microservice.rb
Overview
The queue microservice starts a processor then gets the queue entries from redis. It then monitors the QueueTopic for changes.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#processor ⇒ Object
readonly
Returns the value of attribute processor.
-
#processor_thread ⇒ Object
readonly
Returns the value of attribute processor_thread.
Attributes inherited from Microservice
#count, #custom, #error, #logger, #microservice_status_thread, #scope, #secrets, #state
Instance Method Summary collapse
- #block_for_updates ⇒ Object
-
#initialize(*args) ⇒ QueueMicroservice
constructor
A new instance of QueueMicroservice.
- #run ⇒ Object
- #shutdown ⇒ Object
Methods inherited from Microservice
#as_json, #microservice_cmd, run, #setup_microservice_topic
Constructor Details
#initialize(*args) ⇒ QueueMicroservice
Returns a new instance of QueueMicroservice.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 104 def initialize(*args) super(*args) @queue_name = @name.split('__')[2] initial_state = 'HOLD' # See if the queue already exists to get its state queue = OpenC3::QueueModel.get(name: @queue_name, scope: @scope) if queue initial_state = queue['state'] else (@config['options'] || []).each do |option| case option[0].upcase when 'QUEUE_STATE' initial_state = option[1] else @logger.error("Unknown option passed to microservice #{@name}: #{option}") end end end @logger.info "Creating QueueMicroservice in scope #{@scope} for queue #{@queue_name} with initial state #{initial_state}" @processor = QueueProcessor.new(name: @queue_name, state: initial_state, logger: @logger, scope: @scope) @processor_thread = nil @read_topic = true end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
102 103 104 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 102 def name @name end |
#processor ⇒ Object (readonly)
Returns the value of attribute processor.
102 103 104 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 102 def processor @processor end |
#processor_thread ⇒ Object (readonly)
Returns the value of attribute processor_thread.
102 103 104 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 102 def processor_thread @processor_thread end |
Instance Method Details
#block_for_updates ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 154 def block_for_updates @read_topic = true while @read_topic && !@cancel_thread begin QueueTopic.read_topics(@topics) do |_topic, _msg_id, msg_hash, _redis| data = JSON.parse(msg_hash['data']) if msg_hash['data'] if data['name'] == @queue_name and msg_hash['kind'] == 'updated' @processor.state = data['state'] end end rescue StandardError => e @logger.error "QueueMicroservice failed to read topics #{@topics}\n#{e.formatted}" end end end |
#run ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 130 def run @logger.info "QueueMicroservice running" @processor_thread = Thread.new { @processor.run } # Let the frontend know that the microservice has been deployed and is running notification = { 'kind' => 'deployed', # name and updated_at fields are required for Event formatting 'data' => JSON.generate({ 'name' => @name, 'updated_at' => Time.now.to_nsec_from_epoch, }), } QueueTopic.write_notification(notification, scope: @scope) loop do break if @cancel_thread block_for_updates() end @processor.shutdown() @processor_thread.join() if @processor_thread @logger.info "QueueMicroservice exiting" end |
#shutdown ⇒ Object
170 171 172 173 174 |
# File 'lib/openc3/microservices/queue_microservice.rb', line 170 def shutdown @read_topic = false @processor.shutdown() if @processor super end |