Class: ActivePubsub::Subscriber
- Inherits:
-
Object
- Object
- ActivePubsub::Subscriber
- Defined in:
- lib/active_pubsub/subscriber.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
Class Method Summary collapse
- .as(service_namespace) ⇒ Object
- .bind_subscriptions! ⇒ Object
-
.channel ⇒ Object
Class Methods ###.
- .deserialize_event(event) ⇒ Object
- .deserialize_record(record) ⇒ Object
- .exchange ⇒ Object
- .observes(target_exchange) ⇒ Object
- .on(event_name, &block) ⇒ Object
- .print_subscriptions! ⇒ Object
- .queue_for_event(event_name) ⇒ Object
- .routing_key_for_event(event_name) ⇒ Object
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
5 6 7 |
# File 'lib/active_pubsub/subscriber.rb', line 5 def connection @connection end |
Class Method Details
.as(service_namespace) ⇒ Object
24 25 26 |
# File 'lib/active_pubsub/subscriber.rb', line 24 def self.as(service_namespace) self.local_service_namespace = service_namespace end |
.bind_subscriptions! ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/active_pubsub/subscriber.rb', line 32 def self.bind_subscriptions! events.each_pair do |event_name, block| channel.queue(queue_for_event(event_name.to_s)) .bind(exchange, :routing_key => routing_key_for_event(event_name)) .subscribe do |delivery_info, properties, payload| event = deserialize_event(payload) resource = deserialize_record(event[:record]) block.call(resource) end end end |
.channel ⇒ Object
Class Methods ###
16 17 18 |
# File 'lib/active_pubsub/subscriber.rb', line 16 def self.channel connection.channel end |
.deserialize_event(event) ⇒ Object
45 46 47 |
# File 'lib/active_pubsub/subscriber.rb', line 45 def self.deserialize_event(event) @current_event = Marshal.load(event) end |
.deserialize_record(record) ⇒ Object
49 50 51 |
# File 'lib/active_pubsub/subscriber.rb', line 49 def self.deserialize_record(record) @current_record = Marshal.load(record) end |
.exchange ⇒ Object
20 21 22 |
# File 'lib/active_pubsub/subscriber.rb', line 20 def self.exchange channel.topic(exchange_name, :auto_delete => true) end |
.observes(target_exchange) ⇒ Object
53 54 55 |
# File 'lib/active_pubsub/subscriber.rb', line 53 def self.observes(target_exchange) self.exchange_name = target_exchange end |
.on(event_name, &block) ⇒ Object
28 29 30 |
# File 'lib/active_pubsub/subscriber.rb', line 28 def self.on(event_name, &block) events[event_name] = block end |
.print_subscriptions! ⇒ Object
65 66 67 68 69 70 71 72 73 74 |
# File 'lib/active_pubsub/subscriber.rb', line 65 def self.print_subscriptions! = "Watching: \n" events.each_pair do |event_name, block| << "Queue: #{queue_for_event(event_name.to_s)} \n" << "Routing Key: #{routing_key_for_event(event_name)} \n" << "\n" end puts end |
.queue_for_event(event_name) ⇒ Object
57 58 59 |
# File 'lib/active_pubsub/subscriber.rb', line 57 def self.queue_for_event(event_name) [local_service_namespace, exchange_name, event_name].compact.join('.') end |
.routing_key_for_event(event_name) ⇒ Object
61 62 63 |
# File 'lib/active_pubsub/subscriber.rb', line 61 def self.routing_key_for_event(event_name) [exchange_name, event_name].join(".") end |