Class: ActivePubsub::Publisher
- Inherits:
-
Object
- Object
- ActivePubsub::Publisher
show all
- Includes:
- Settings, Celluloid
- Defined in:
- lib/active_pubsub/publisher.rb
Class Attribute Summary collapse
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Settings
#exchange_settings, #queue_settings, #subscribe_settings
Constructor Details
33
34
35
|
# File 'lib/active_pubsub/publisher.rb', line 33
def initialize
connection
end
|
Class Attribute Details
.publishable_model_count ⇒ Object
Returns the value of attribute publishable_model_count.
12
13
14
|
# File 'lib/active_pubsub/publisher.rb', line 12
def publishable_model_count
@publishable_model_count
end
|
.started ⇒ Object
Returns the value of attribute started.
11
12
13
|
# File 'lib/active_pubsub/publisher.rb', line 11
def started
@started
end
|
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
6
7
8
|
# File 'lib/active_pubsub/publisher.rb', line 6
def connection
@connection
end
|
Class Method Details
.increment_publishable_model_count! ⇒ Object
18
19
20
|
# File 'lib/active_pubsub/publisher.rb', line 18
def self.increment_publishable_model_count!
self.publishable_model_count += 1
end
|
.start ⇒ Object
22
23
24
25
26
|
# File 'lib/active_pubsub/publisher.rb', line 22
def self.start
supervise_as :rabbit_publisher
self.started = true
end
|
.started? ⇒ Boolean
28
29
30
|
# File 'lib/active_pubsub/publisher.rb', line 28
def self.started?
self.started
end
|
Instance Method Details
#channel ⇒ Object
46
47
48
|
# File 'lib/active_pubsub/publisher.rb', line 46
def channel
connection.channel
end
|
#clear_connections! ⇒ Object
41
42
43
44
|
# File 'lib/active_pubsub/publisher.rb', line 41
def clear_connections!
channel.close
connection.close
end
|
#exchanges ⇒ Object
50
51
52
|
# File 'lib/active_pubsub/publisher.rb', line 50
def exchanges
@exchanges ||= {}
end
|
#options_for_publish(event) ⇒ Object
54
55
56
57
58
59
|
# File 'lib/active_pubsub/publisher.rb', line 54
def options_for_publish(event)
{
:routing_key => event.routing_key,
:persistent => ::ActivePubsub.config.durable
}
end
|
#publish_event(event) ⇒ Object
61
62
63
64
65
66
67
68
69
|
# File 'lib/active_pubsub/publisher.rb', line 61
def publish_event(event)
return if ::ActivePubsub.publisher_disabled?
::ActiveRecord::Base.connection_pool.with_connection do
::ActivePubsub.logger.info("Publishing event: #{event.id} to #{event.routing_key}")
exchanges[event.exchange].publish(serialize_event(event), options_for_publish(event))
end
end
|
#register_exchange(exchange_name) ⇒ Object
75
76
77
|
# File 'lib/active_pubsub/publisher.rb', line 75
def register_exchange(exchange_name)
exchanges[exchange_name] ||= channel.topic(exchange_name, exchange_settings)
end
|
#serialize_event(event) ⇒ Object
71
72
73
|
# File 'lib/active_pubsub/publisher.rb', line 71
def serialize_event(event)
::Marshal.dump(event)
end
|