Class: ActivePubsub::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/active_pubsub/subscriber.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(record) ⇒ Subscriber

Returns a new instance of Subscriber.



80
81
82
# File 'lib/active_pubsub/subscriber.rb', line 80

def initialize(record)
  @record = record
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



5
6
7
# File 'lib/active_pubsub/subscriber.rb', line 5

def connection
  @connection
end

#recordObject

Instance Methods ###



78
79
80
# File 'lib/active_pubsub/subscriber.rb', line 78

def record
  @record
end

Class Method Details

.as(service_namespace) ⇒ Object



24
25
26
# File 'lib/active_pubsub/subscriber.rb', line 24

def self.as(service_namespace)
  self.local_service_namespace = service_namespace
end

.bind_subscriptions!Object



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/active_pubsub/subscriber.rb', line 32

def self.bind_subscriptions!
  events.each_pair do |event_name, block|
    channel.queue(queue_for_event(event_name.to_s))
           .bind(exchange, :routing_key => routing_key_for_event(event_name))
           .subscribe do |delivery_info, properties, payload|
      deserialized_event = deserialize_event(payload)
      deserialized_record = deserialize_record(deserialized_event[:record])

      subscriber_instance = new(deserialized_record)
      subscriber_instance.instance_exec(deserialized_record, &block)
    end
  end
end

.channelObject

Class Methods ###



16
17
18
# File 'lib/active_pubsub/subscriber.rb', line 16

def self.channel
  connection.channel
end

.deserialize_event(event) ⇒ Object



46
47
48
# File 'lib/active_pubsub/subscriber.rb', line 46

def self.deserialize_event(event)
  @current_event = ::Marshal.load(event)
end

.deserialize_record(record) ⇒ Object



50
51
52
# File 'lib/active_pubsub/subscriber.rb', line 50

def self.deserialize_record(record)
  @current_record = ::Marshal.load(record)
end

.exchangeObject



20
21
22
# File 'lib/active_pubsub/subscriber.rb', line 20

def self.exchange
  channel.topic(exchange_name, :auto_delete => true)
end

.observes(target_exchange) ⇒ Object



54
55
56
# File 'lib/active_pubsub/subscriber.rb', line 54

def self.observes(target_exchange)
  self.exchange_name = target_exchange
end

.on(event_name, &block) ⇒ Object



28
29
30
# File 'lib/active_pubsub/subscriber.rb', line 28

def self.on(event_name, &block)
  events[event_name] = block
end


66
67
68
69
70
71
72
73
74
75
# File 'lib/active_pubsub/subscriber.rb', line 66

def self.print_subscriptions!
  message = "Watching: \n"
  events.each_pair do |event_name, block|
    message << "Queue: #{queue_for_event(event_name.to_s)} \n" <<
               "Routing Key: #{routing_key_for_event(event_name)} \n" <<
               "\n"
  end

  puts message
end

.queue_for_event(event_name) ⇒ Object



58
59
60
# File 'lib/active_pubsub/subscriber.rb', line 58

def self.queue_for_event(event_name)
  [local_service_namespace, exchange_name, event_name].compact.join('.')
end

.routing_key_for_event(event_name) ⇒ Object



62
63
64
# File 'lib/active_pubsub/subscriber.rb', line 62

def self.routing_key_for_event(event_name)
  [exchange_name, event_name].join(".")
end