Class: RemoteService::Queue
- Inherits:
-
Object
- Object
- RemoteService::Queue
- Includes:
- Singleton
- Defined in:
- lib/remote_service/queue.rb
Constant Summary collapse
- EXIT_SIGNALS =
['INT', 'TERM', 'SIGQUIT']
Instance Method Summary collapse
- #connect(brokers, &block) ⇒ Object
- #publish(queue, payload) ⇒ Object
- #request(queue, payload) ⇒ Object
- #service(service_handler, workers, monitor_interval) ⇒ Object
- #stop ⇒ Object
Instance Method Details
#connect(brokers, &block) ⇒ Object
11 12 13 14 15 |
# File 'lib/remote_service/queue.rb', line 11 def connect(brokers, &block) brokers ||= ENV.fetch('REMOTE_SERVICE_BROKERS', 'nats://127.0.0.1:4222').split(',') @conn = Connector::Nats.new(brokers) @conn.start(&block) end |
#publish(queue, payload) ⇒ Object
36 37 38 |
# File 'lib/remote_service/queue.rb', line 36 def publish(queue, payload) @conn.publish(queue, encode(payload)) end |
#request(queue, payload) ⇒ Object
25 26 27 28 29 30 31 32 33 34 |
# File 'lib/remote_service/queue.rb', line 25 def request(queue, payload) RemoteService.logger.debug "REQUEST - SERVICE:[#{queue}] PAYLOAD:[#{payload}]" sent_at = Time.now.utc @conn.request(queue, encode(payload)) do |response| data = decode(response) response_time = (Time.now.utc - sent_at)*1000 RemoteService.logger.debug "RESPONSE - SERVICE:[#{queue}] PAYLOAD:[#{data}] TIME:[#{response_time}ms]" yield(data) end end |
#service(service_handler, workers, monitor_interval) ⇒ Object
17 18 19 20 21 22 23 |
# File 'lib/remote_service/queue.rb', line 17 def service(service_handler, workers, monitor_interval) workers ||= ENV.fetch('REMOTE_SERVICE_WORKERS', 4) monitor_interval ||= ENV.fetch('REMOTE_SERVICE_MONITOR_INTERVAL', 5) @worker_pool = WorkerPool.new(workers.to_i, monitor_interval.to_i) @service_handler = service_handler start_service_subscriber end |
#stop ⇒ Object
40 41 42 |
# File 'lib/remote_service/queue.rb', line 40 def stop @conn.stop end |