Class: ActivePubsub::Publisher

Inherits:
Object
  • Object
show all
Includes:
Settings, Celluloid
Defined in:
lib/active_pubsub/publisher.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Settings

#exchange_settings, #queue_settings, #subscribe_settings

Constructor Details

#initializePublisher

Instance Methods ###



33
34
35
# File 'lib/active_pubsub/publisher.rb', line 33

def initialize
  connection
end

Class Attribute Details

.publishable_model_countObject

Returns the value of attribute publishable_model_count.



12
13
14
# File 'lib/active_pubsub/publisher.rb', line 12

def publishable_model_count
  @publishable_model_count
end

.startedObject

Returns the value of attribute started.



11
12
13
# File 'lib/active_pubsub/publisher.rb', line 11

def started
  @started
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



6
7
8
# File 'lib/active_pubsub/publisher.rb', line 6

def connection
  @connection
end

Class Method Details

.increment_publishable_model_count!Object



18
19
20
# File 'lib/active_pubsub/publisher.rb', line 18

def self.increment_publishable_model_count!
  self.publishable_model_count += 1
end

.startObject



22
23
24
25
26
# File 'lib/active_pubsub/publisher.rb', line 22

def self.start
  supervise_as :rabbit_publisher

  self.started = true
end

.started?Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/active_pubsub/publisher.rb', line 28

def self.started?
  self.started
end

Instance Method Details

#channelObject



46
47
48
# File 'lib/active_pubsub/publisher.rb', line 46

def channel
  connection.channel
end

#clear_connections!Object



41
42
43
44
# File 'lib/active_pubsub/publisher.rb', line 41

def clear_connections!
  channel.close
  connection.close
end

#exchangesObject



50
51
52
# File 'lib/active_pubsub/publisher.rb', line 50

def exchanges
  @exchanges ||= {}
end

#options_for_publish(event) ⇒ Object



54
55
56
57
58
59
# File 'lib/active_pubsub/publisher.rb', line 54

def options_for_publish(event)
  {
    :routing_key => event.routing_key,
    :persistent => ::ActivePubsub.config.durable
  }
end

#publish_event(event) ⇒ Object



61
62
63
64
65
66
67
68
69
# File 'lib/active_pubsub/publisher.rb', line 61

def publish_event(event)
  return if ::ActivePubsub.publisher_disabled?

  ::ActiveRecord::Base.connection_pool.with_connection do
    ::ActivePubsub.logger.info("Publishing event: #{event.id} to #{event.routing_key}")

    exchanges[event.exchange].publish(serialize_event(event), options_for_publish(event))
  end
end

#register_exchange(exchange_name) ⇒ Object



75
76
77
# File 'lib/active_pubsub/publisher.rb', line 75

def register_exchange(exchange_name)
  exchanges[exchange_name] ||= channel.topic(exchange_name, exchange_settings)
end

#serialize_event(event) ⇒ Object



71
72
73
# File 'lib/active_pubsub/publisher.rb', line 71

def serialize_event(event)
  ::Marshal.dump(event)
end