Class: ActivePubsub::Subscriber

Inherits:
Object
  • Object
show all
Includes:
Settings
Defined in:
lib/active_pubsub/subscriber.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Settings

#exchange_settings, #queue_settings, #subscribe_settings

Constructor Details

#initialize(record) ⇒ Subscriber

Returns a new instance of Subscriber.



104
105
106
# File 'lib/active_pubsub/subscriber.rb', line 104

def initialize(record)
  @record = record
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



7
8
9
# File 'lib/active_pubsub/subscriber.rb', line 7

def connection
  @connection
end

#recordObject

Instance Methods ###



102
103
104
# File 'lib/active_pubsub/subscriber.rb', line 102

def record
  @record
end

Class Method Details

.as(service_namespace) ⇒ Object

Class Methods ###



19
20
21
# File 'lib/active_pubsub/subscriber.rb', line 19

def self.as(service_namespace)
  self.local_service_namespace = service_namespace
end

.bind_subscriptions!Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/active_pubsub/subscriber.rb', line 44

def self.bind_subscriptions!
  return if started?

  events.each_pair do |event_name, block|
    channel.queue(queue_for_event(event_name.to_s), queue_settings)
           .bind(exchange, :routing_key => routing_key_for_event(event_name))
           .subscribe(subscribe_settings) do |delivery_info, properties, payload|
      deserialized_event = deserialize_event(payload)
      deserialized_record = deserialize_record(deserialized_event[:record])

      subscriber_instance = new(deserialized_record)
      subscriber_instance.instance_exec(deserialized_record, &block)

      ::ActivePubsub.logger.info "#{delivery_info[:routing_key]} #{name} consumed #{deserialized_event}"

      channel.ack(delivery_info.delivery_tag) if ::ActivePubsub.config.ack
    end
  end

  self.started = true
end

.channelObject



28
29
30
# File 'lib/active_pubsub/subscriber.rb', line 28

def self.channel
  connection.channel
end

.clear_connections!Object



23
24
25
26
# File 'lib/active_pubsub/subscriber.rb', line 23

def self.clear_connections!
  channel.close
  connection.close
end

.deserialize_event(event) ⇒ Object



66
67
68
# File 'lib/active_pubsub/subscriber.rb', line 66

def self.deserialize_event(event)
  ::Marshal.load(event)
end

.deserialize_record(record) ⇒ Object



70
71
72
# File 'lib/active_pubsub/subscriber.rb', line 70

def self.deserialize_record(record)
  ::Marshal.load(record)
end

.exchangeObject



32
33
34
# File 'lib/active_pubsub/subscriber.rb', line 32

def self.exchange
  channel.topic(exchange_name, exchange_settings)
end

.inherited(klass) ⇒ Object



36
37
38
# File 'lib/active_pubsub/subscriber.rb', line 36

def self.inherited(klass)
  klass.events = {}
end

.observes(target_exchange) ⇒ Object



74
75
76
# File 'lib/active_pubsub/subscriber.rb', line 74

def self.observes(target_exchange)
  self.exchange_name = target_exchange
end

.on(event_name, &block) ⇒ Object



40
41
42
# File 'lib/active_pubsub/subscriber.rb', line 40

def self.on(event_name, &block)
  events[event_name] = block
end


86
87
88
89
90
91
92
93
94
95
# File 'lib/active_pubsub/subscriber.rb', line 86

def self.print_subscriptions!
  message = "Watching: \n"
  events.each_pair do |event_name, block|
    message << "Queue: #{queue_for_event(event_name.to_s)} \n" <<
               "Routing Key: #{routing_key_for_event(event_name)} \n" <<
               "\n"
  end

  ::ActivePubsub.logger.info(message)
end

.queue_for_event(event_name) ⇒ Object



78
79
80
# File 'lib/active_pubsub/subscriber.rb', line 78

def self.queue_for_event(event_name)
  [local_service_namespace, exchange_name, event_name].compact.join('.')
end

.routing_key_for_event(event_name) ⇒ Object



82
83
84
# File 'lib/active_pubsub/subscriber.rb', line 82

def self.routing_key_for_event(event_name)
  [exchange_name, event_name].join(".")
end

.started?Boolean

Returns:

  • (Boolean)


97
98
99
# File 'lib/active_pubsub/subscriber.rb', line 97

def self.started?
  self.started
end