Class: RemoteService::Queue

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/remote_service/queue.rb

Constant Summary collapse

EXIT_SIGNALS =
['INT', 'TERM', 'SIGQUIT']

Instance Method Summary collapse

Instance Method Details

#connect(brokers, &block) ⇒ Object



11
12
13
14
15
# File 'lib/remote_service/queue.rb', line 11

def connect(brokers, &block)
  brokers ||= ENV.fetch('REMOTE_SERVICE_BROKERS', 'nats://127.0.0.1:4222').split(',')
  @conn = Connector::Nats.new(brokers)
  @conn.start(&block)
end

#publish(queue, payload) ⇒ Object



36
37
38
# File 'lib/remote_service/queue.rb', line 36

def publish(queue, payload)
  @conn.publish(queue, encode(payload))
end

#request(queue, payload) ⇒ Object



25
26
27
28
29
30
31
32
33
34
# File 'lib/remote_service/queue.rb', line 25

def request(queue, payload)
  RemoteService.logger.debug "REQUEST - SERVICE:[#{queue}] PAYLOAD:[#{payload}]"
  sent_at = Time.now.utc
  @conn.request(queue, encode(payload)) do |response|
    data = decode(response)
    response_time = (Time.now.utc - sent_at)*1000
    RemoteService.logger.debug "RESPONSE - SERVICE:[#{queue}] PAYLOAD:[#{data}] TIME:[#{response_time}ms]"
    yield(data)
  end
end

#service(service_handler, workers, monitor_interval) ⇒ Object



17
18
19
20
21
22
23
# File 'lib/remote_service/queue.rb', line 17

def service(service_handler, workers, monitor_interval)
  workers ||= ENV.fetch('REMOTE_SERVICE_WORKERS', 4)
  monitor_interval ||= ENV.fetch('REMOTE_SERVICE_MONITOR_INTERVAL', 5)
  @worker_pool = WorkerPool.new(workers.to_i, monitor_interval.to_i)
  @service_handler = service_handler
  start_service_subscriber
end

#stopObject



40
41
42
# File 'lib/remote_service/queue.rb', line 40

def stop
  @conn.stop
end