Class: ActivePubsub::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/active_pubsub/subscriber.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



5
6
7
# File 'lib/active_pubsub/subscriber.rb', line 5

def connection
  @connection
end

Class Method Details

.as(service_namespace) ⇒ Object



24
25
26
# File 'lib/active_pubsub/subscriber.rb', line 24

def self.as(service_namespace)
  self.local_service_namespace = service_namespace
end

.bind_subscriptions!Object



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/active_pubsub/subscriber.rb', line 32

def self.bind_subscriptions!
  events.each_pair do |event_name, block|
    channel.queue(queue_for_event(event_name.to_s))
           .bind(exchange, :routing_key => routing_key_for_event(event_name))
           .subscribe do |delivery_info, properties, payload|
      event = deserialize_event(payload)
      resource = deserialize_record(event[:record])

      block.call(resource)
    end
  end
end

.channelObject

Class Methods ###



16
17
18
# File 'lib/active_pubsub/subscriber.rb', line 16

def self.channel
  connection.channel
end

.deserialize_event(event) ⇒ Object



45
46
47
# File 'lib/active_pubsub/subscriber.rb', line 45

def self.deserialize_event(event)
  @current_event = Marshal.load(event)
end

.deserialize_record(record) ⇒ Object



49
50
51
# File 'lib/active_pubsub/subscriber.rb', line 49

def self.deserialize_record(record)
  @current_record = Marshal.load(record)
end

.exchangeObject



20
21
22
# File 'lib/active_pubsub/subscriber.rb', line 20

def self.exchange
  channel.topic(exchange_name, :auto_delete => true)
end

.observes(target_exchange) ⇒ Object



53
54
55
# File 'lib/active_pubsub/subscriber.rb', line 53

def self.observes(target_exchange)
  self.exchange_name = target_exchange
end

.on(event_name, &block) ⇒ Object



28
29
30
# File 'lib/active_pubsub/subscriber.rb', line 28

def self.on(event_name, &block)
  events[event_name] = block
end


65
66
67
68
69
70
71
72
73
74
# File 'lib/active_pubsub/subscriber.rb', line 65

def self.print_subscriptions!
  message = "Watching: \n"
  events.each_pair do |event_name, block|
    message << "Queue: #{queue_for_event(event_name.to_s)} \n" <<
               "Routing Key: #{routing_key_for_event(event_name)} \n" <<
               "\n"
  end

  puts message
end

.queue_for_event(event_name) ⇒ Object



57
58
59
# File 'lib/active_pubsub/subscriber.rb', line 57

def self.queue_for_event(event_name)
  [local_service_namespace, exchange_name, event_name].compact.join('.')
end

.routing_key_for_event(event_name) ⇒ Object



61
62
63
# File 'lib/active_pubsub/subscriber.rb', line 61

def self.routing_key_for_event(event_name)
  [exchange_name, event_name].join(".")
end