Class: ActivePubsub::Subscriber
- Inherits:
-
Object
- Object
- ActivePubsub::Subscriber
show all
- Includes:
- Settings
- Defined in:
- lib/active_pubsub/subscriber.rb
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Settings
#exchange_settings, #queue_settings, #subscribe_settings
Constructor Details
#initialize(record) ⇒ Subscriber
Returns a new instance of Subscriber.
104
105
106
|
# File 'lib/active_pubsub/subscriber.rb', line 104
def initialize(record)
@record = record
end
|
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
7
8
9
|
# File 'lib/active_pubsub/subscriber.rb', line 7
def connection
@connection
end
|
#record ⇒ Object
102
103
104
|
# File 'lib/active_pubsub/subscriber.rb', line 102
def record
@record
end
|
Class Method Details
.as(service_namespace) ⇒ Object
19
20
21
|
# File 'lib/active_pubsub/subscriber.rb', line 19
def self.as(service_namespace)
self.local_service_namespace = service_namespace
end
|
.bind_subscriptions! ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/active_pubsub/subscriber.rb', line 44
def self.bind_subscriptions!
return if started?
events.each_pair do |event_name, block|
channel.queue(queue_for_event(event_name.to_s), queue_settings)
.bind(exchange, :routing_key => routing_key_for_event(event_name))
.subscribe(subscribe_settings) do |delivery_info, properties, payload|
deserialized_event = deserialize_event(payload)
deserialized_record = deserialize_record(deserialized_event[:record])
subscriber_instance = new(deserialized_record)
subscriber_instance.instance_exec(deserialized_record, &block)
::ActivePubsub.logger.info "#{delivery_info[:routing_key]} #{name} consumed #{deserialized_event}"
channel.ack(delivery_info.delivery_tag) if ::ActivePubsub.config.ack
end
end
self.started = true
end
|
.channel ⇒ Object
28
29
30
|
# File 'lib/active_pubsub/subscriber.rb', line 28
def self.channel
connection.channel
end
|
.clear_connections! ⇒ Object
23
24
25
26
|
# File 'lib/active_pubsub/subscriber.rb', line 23
def self.clear_connections!
channel.close
connection.close
end
|
.deserialize_event(event) ⇒ Object
66
67
68
|
# File 'lib/active_pubsub/subscriber.rb', line 66
def self.deserialize_event(event)
::Marshal.load(event)
end
|
.deserialize_record(record) ⇒ Object
70
71
72
|
# File 'lib/active_pubsub/subscriber.rb', line 70
def self.deserialize_record(record)
::Marshal.load(record)
end
|
.exchange ⇒ Object
32
33
34
|
# File 'lib/active_pubsub/subscriber.rb', line 32
def self.exchange
channel.topic(exchange_name, exchange_settings)
end
|
.inherited(klass) ⇒ Object
36
37
38
|
# File 'lib/active_pubsub/subscriber.rb', line 36
def self.inherited(klass)
klass.events = {}
end
|
.observes(target_exchange) ⇒ Object
74
75
76
|
# File 'lib/active_pubsub/subscriber.rb', line 74
def self.observes(target_exchange)
self.exchange_name = target_exchange
end
|
.on(event_name, &block) ⇒ Object
40
41
42
|
# File 'lib/active_pubsub/subscriber.rb', line 40
def self.on(event_name, &block)
events[event_name] = block
end
|
.print_subscriptions! ⇒ Object
86
87
88
89
90
91
92
93
94
95
|
# File 'lib/active_pubsub/subscriber.rb', line 86
def self.print_subscriptions!
message = "Watching: \n"
events.each_pair do |event_name, block|
message << "Queue: #{queue_for_event(event_name.to_s)} \n" <<
"Routing Key: #{routing_key_for_event(event_name)} \n" <<
"\n"
end
::ActivePubsub.logger.info(message)
end
|
.queue_for_event(event_name) ⇒ Object
78
79
80
|
# File 'lib/active_pubsub/subscriber.rb', line 78
def self.queue_for_event(event_name)
[local_service_namespace, exchange_name, event_name].compact.join('.')
end
|
.routing_key_for_event(event_name) ⇒ Object
82
83
84
|
# File 'lib/active_pubsub/subscriber.rb', line 82
def self.routing_key_for_event(event_name)
[exchange_name, event_name].join(".")
end
|
.started? ⇒ Boolean
97
98
99
|
# File 'lib/active_pubsub/subscriber.rb', line 97
def self.started?
self.started
end
|