Class: PubSubModelSync::ServiceRabbit

Inherits:
ServiceBase show all
Defined in:
lib/pub_sub_model_sync/service_rabbit.rb

Constant Summary collapse

QUEUE_SETTINGS =
{ durable: true, auto_delete: false }.freeze
LISTEN_SETTINGS =
{ manual_ack: true }.freeze
PUBLISH_SETTINGS =
{}.freeze

Constants inherited from ServiceBase

PubSubModelSync::ServiceBase::SERVICE_KEY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

config, debug?, log

Constructor Details

#initializeServiceRabbit

Returns a new instance of ServiceRabbit.



19
20
21
22
23
24
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 19

def initialize
  @service = Bunny.new(*config.bunny_connection)
  @topic_names = Array(config.topic_name || 'model_sync')
  @channels = []
  @exchanges = {}
end

Instance Attribute Details

#channelsObject

: [Channel1]



17
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 17

attr_accessor :service, :topic_names, :channels, :exchanges

#exchangesObject

Returns the value of attribute exchanges.



17
18
19
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 17

def exchanges
  @exchanges
end

#serviceObject

Returns the value of attribute service.



17
18
19
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 17

def service
  @service
end

#topic_namesObject

: [‘Topic 1’, ‘Topic 2’]



17
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 17

attr_accessor :service, :topic_names, :channels, :exchanges

Instance Method Details

#listen_messagesObject



26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 26

def listen_messages
  log('Listener starting...')
  subscribe_to_queues do |queue|
    queue.subscribe(LISTEN_SETTINGS) { |info, meta, payload| process_message(queue, info, meta, payload) }
  end
  log('Listener started')
  loop { sleep 5 }
rescue PubSubModelSync::Runner::ShutDown
  log('Listener stopped')
rescue => e
  log("Error listening message: #{[e.message, e.backtrace]}", :error)
end

#publish(payload) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 39

def publish(payload)
  qty_retry ||= 0
  deliver_data(payload)
rescue => e
  if e.is_a?(Timeout::Error) && (qty_retry += 1) <= 2
    log("Error publishing (retrying....): #{e.message}", :error)
    initialize
    retry
  end
  raise
end

#stopObject



51
52
53
54
55
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 51

def stop
  log('Listener stopping...')
  channels.each(&:close)
  service.close
end