Class: PubSubModelSync::ServiceRabbit
- Inherits:
-
ServiceBase
- Object
- Base
- ServiceBase
- PubSubModelSync::ServiceRabbit
- Defined in:
- lib/pub_sub_model_sync/service_rabbit.rb
Constant Summary collapse
- QUEUE_SETTINGS =
{ durable: true, auto_delete: false }.freeze
- LISTEN_SETTINGS =
{ manual_ack: true }.freeze
- PUBLISH_SETTINGS =
{}.freeze
Constants inherited from ServiceBase
PubSubModelSync::ServiceBase::SERVICE_KEY
Instance Attribute Summary collapse
-
#channels ⇒ Object
: [Channel1].
-
#exchanges ⇒ Object
Returns the value of attribute exchanges.
-
#service ⇒ Object
Returns the value of attribute service.
-
#topic_names ⇒ Object
: [‘Topic 1’, ‘Topic 2’].
Instance Method Summary collapse
-
#initialize ⇒ ServiceRabbit
constructor
A new instance of ServiceRabbit.
- #listen_messages ⇒ Object
- #publish(payload) ⇒ Object
- #stop ⇒ Object
Methods inherited from Base
Constructor Details
#initialize ⇒ ServiceRabbit
Returns a new instance of ServiceRabbit.
19 20 21 22 23 24 |
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 19 def initialize @service = Bunny.new(*config.bunny_connection) @topic_names = Array(config.topic_name || 'model_sync') @channels = [] @exchanges = {} end |
Instance Attribute Details
#channels ⇒ Object
: [Channel1]
17 |
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 17 attr_accessor :service, :topic_names, :channels, :exchanges |
#exchanges ⇒ Object
Returns the value of attribute exchanges.
17 18 19 |
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 17 def exchanges @exchanges end |
#service ⇒ Object
Returns the value of attribute service.
17 18 19 |
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 17 def service @service end |
#topic_names ⇒ Object
: [‘Topic 1’, ‘Topic 2’]
17 |
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 17 attr_accessor :service, :topic_names, :channels, :exchanges |
Instance Method Details
#listen_messages ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 26 def log('Listener starting...') subscribe_to_queues do |queue| queue.subscribe(LISTEN_SETTINGS) { |info, , payload| (queue, info, , payload) } end log('Listener started') loop { sleep 5 } rescue PubSubModelSync::Runner::ShutDown log('Listener stopped') rescue => e log("Error listening message: #{[e.message, e.backtrace]}", :error) end |
#publish(payload) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 39 def publish(payload) qty_retry ||= 0 deliver_data(payload) rescue => e if e.is_a?(Timeout::Error) && (qty_retry += 1) <= 2 log("Error publishing (retrying....): #{e.message}", :error) initialize retry end raise end |
#stop ⇒ Object
51 52 53 54 55 |
# File 'lib/pub_sub_model_sync/service_rabbit.rb', line 51 def stop log('Listener stopping...') channels.each(&:close) service.close end |