Class: ActivePubsub::Subscriber
- Inherits:
-
Object
- Object
- ActivePubsub::Subscriber
- Defined in:
- lib/active_pubsub/subscriber.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#record ⇒ Object
Instance Methods ###.
Class Method Summary collapse
- .as(service_namespace) ⇒ Object
- .bind_subscriptions! ⇒ Object
-
.channel ⇒ Object
Class Methods ###.
- .deserialize_event(event) ⇒ Object
- .deserialize_record(record) ⇒ Object
- .exchange ⇒ Object
- .observes(target_exchange) ⇒ Object
- .on(event_name, &block) ⇒ Object
- .print_subscriptions! ⇒ Object
- .queue_for_event(event_name) ⇒ Object
- .routing_key_for_event(event_name) ⇒ Object
Instance Method Summary collapse
-
#initialize(record) ⇒ Subscriber
constructor
A new instance of Subscriber.
Constructor Details
#initialize(record) ⇒ Subscriber
Returns a new instance of Subscriber.
80 81 82 |
# File 'lib/active_pubsub/subscriber.rb', line 80 def initialize(record) @record = record end |
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
5 6 7 |
# File 'lib/active_pubsub/subscriber.rb', line 5 def connection @connection end |
#record ⇒ Object
Instance Methods ###
78 79 80 |
# File 'lib/active_pubsub/subscriber.rb', line 78 def record @record end |
Class Method Details
.as(service_namespace) ⇒ Object
24 25 26 |
# File 'lib/active_pubsub/subscriber.rb', line 24 def self.as(service_namespace) self.local_service_namespace = service_namespace end |
.bind_subscriptions! ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/active_pubsub/subscriber.rb', line 32 def self.bind_subscriptions! events.each_pair do |event_name, block| channel.queue(queue_for_event(event_name.to_s)) .bind(exchange, :routing_key => routing_key_for_event(event_name)) .subscribe do |delivery_info, properties, payload| deserialized_event = deserialize_event(payload) deserialized_record = deserialize_record(deserialized_event[:record]) subscriber_instance = new(deserialized_record) subscriber_instance.instance_exec(deserialized_record, &block) end end end |
.channel ⇒ Object
Class Methods ###
16 17 18 |
# File 'lib/active_pubsub/subscriber.rb', line 16 def self.channel connection.channel end |
.deserialize_event(event) ⇒ Object
46 47 48 |
# File 'lib/active_pubsub/subscriber.rb', line 46 def self.deserialize_event(event) @current_event = ::Marshal.load(event) end |
.deserialize_record(record) ⇒ Object
50 51 52 |
# File 'lib/active_pubsub/subscriber.rb', line 50 def self.deserialize_record(record) @current_record = ::Marshal.load(record) end |
.exchange ⇒ Object
20 21 22 |
# File 'lib/active_pubsub/subscriber.rb', line 20 def self.exchange channel.topic(exchange_name, :auto_delete => true) end |
.observes(target_exchange) ⇒ Object
54 55 56 |
# File 'lib/active_pubsub/subscriber.rb', line 54 def self.observes(target_exchange) self.exchange_name = target_exchange end |
.on(event_name, &block) ⇒ Object
28 29 30 |
# File 'lib/active_pubsub/subscriber.rb', line 28 def self.on(event_name, &block) events[event_name] = block end |
.print_subscriptions! ⇒ Object
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/active_pubsub/subscriber.rb', line 66 def self.print_subscriptions! = "Watching: \n" events.each_pair do |event_name, block| << "Queue: #{queue_for_event(event_name.to_s)} \n" << "Routing Key: #{routing_key_for_event(event_name)} \n" << "\n" end puts end |
.queue_for_event(event_name) ⇒ Object
58 59 60 |
# File 'lib/active_pubsub/subscriber.rb', line 58 def self.queue_for_event(event_name) [local_service_namespace, exchange_name, event_name].compact.join('.') end |
.routing_key_for_event(event_name) ⇒ Object
62 63 64 |
# File 'lib/active_pubsub/subscriber.rb', line 62 def self.routing_key_for_event(event_name) [exchange_name, event_name].join(".") end |