Class: Pebbles::River::River
- Inherits:
-
Object
- Object
- Pebbles::River::River
- Defined in:
- lib/pebbles/river/river.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#environment ⇒ Object
readonly
Returns the value of attribute environment.
-
#exchange ⇒ Object
readonly
Returns the value of attribute exchange.
-
#exchange_name ⇒ Object
readonly
Returns the value of attribute exchange_name.
-
#prefetch ⇒ Object
readonly
Returns the value of attribute prefetch.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
Instance Method Summary collapse
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #disconnect ⇒ Object
-
#initialize(options = {}) ⇒ River
constructor
A new instance of River.
- #publish(options = {}) ⇒ Object
- #queue(options = {}) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ River
Returns a new instance of River.
13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/pebbles/river/river.rb', line 13 def initialize( = {}) = {environment: } if .is_a?(String) # Backwards compatibility @environment = ([:environment] || ENV['RACK_ENV'] || 'development').dup.freeze @exchange_name = 'pebblebed.river' @exchange_name << ".#{environment}" if @environment != 'production' @last_connect_attempt = nil @prefetch = [:prefetch] end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
9 10 11 |
# File 'lib/pebbles/river/river.rb', line 9 def channel @channel end |
#environment ⇒ Object (readonly)
Returns the value of attribute environment.
6 7 8 |
# File 'lib/pebbles/river/river.rb', line 6 def environment @environment end |
#exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
7 8 9 |
# File 'lib/pebbles/river/river.rb', line 7 def exchange @exchange end |
#exchange_name ⇒ Object (readonly)
Returns the value of attribute exchange_name.
11 12 13 |
# File 'lib/pebbles/river/river.rb', line 11 def exchange_name @exchange_name end |
#prefetch ⇒ Object (readonly)
Returns the value of attribute prefetch.
10 11 12 |
# File 'lib/pebbles/river/river.rb', line 10 def prefetch @prefetch end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
8 9 10 |
# File 'lib/pebbles/river/river.rb', line 8 def session @session end |
Instance Method Details
#connect ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/pebbles/river/river.rb', line 30 def connect unless @session and @channel and @exchange disconnect @session = Bunny::Session.new(::Pebbles::River.) @session.start @channel = @session.create_channel @channel.prefetch(@prefetch) if @prefetch @exchange = @channel.exchange(@exchange_name, EXCHANGE_OPTIONS.dup) end end |
#connected? ⇒ Boolean
26 27 28 |
# File 'lib/pebbles/river/river.rb', line 26 def connected? @session && @session.connected? end |
#disconnect ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/pebbles/river/river.rb', line 44 def disconnect if @channel begin @channel.close rescue Bunny::Exception # Ignore end @channel = nil end if @session begin @session.stop rescue Bunny::Exception # Ignore end @session = nil end @exchange = nil end |
#publish(options = {}) ⇒ Object
64 65 66 67 68 69 70 71 |
# File 'lib/pebbles/river/river.rb', line 64 def publish( = {}) connect # Note: Using self.exchange so it can be stubbed in tests self.exchange.publish(.to_json, persistent: .fetch(:persistent, true), key: Routing.routing_key_for(.slice(:event, :uid))) end |
#queue(options = {}) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/pebbles/river/river.rb', line 73 def queue( = {}) .assert_valid_keys(:name, :ttl, :event, :path, :klass, :dead_letter_routing_key, :routing_key, :no_default_routing_keys) raise ArgumentError.new 'Queue must be named' unless [:name] queue_args = {} if (ttl = [:ttl]) queue_args['x-message-ttl'] = ttl end if (dead_letter_routing_key = [:dead_letter_routing_key]) queue_args['x-dead-letter-exchange'] = @exchange_name queue_args['x-dead-letter-routing-key'] = dead_letter_routing_key end queue_opts = {durable: true, arguments: queue_args} connect queue = @channel.queue([:name], queue_opts) if (routing_key = [:routing_key]) queue.bind(exchange.name, key: routing_key) end unless [:no_default_routing_keys] Routing.binding_routing_keys_for(.slice(:event, :class, :path)).each do |key| queue.bind(exchange.name, key: key) end end queue end |