Class: Pebbles::River::River

Inherits:
Object
  • Object
show all
Defined in:
lib/pebbles/river/river.rb

Direct Known Subclasses

Pebblebed::River

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ River

Returns a new instance of River.



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/pebbles/river/river.rb', line 13

def initialize(options = {})
  options = {environment: options} if options.is_a?(String)  # Backwards compatibility

  @environment = (options[:environment] || ENV['RACK_ENV'] || 'development').dup.freeze

  @exchange_name = 'pebblebed.river'
  @exchange_name << ".#{environment}" if @environment != 'production'

  @last_connect_attempt = nil

  @prefetch = options[:prefetch]
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



9
10
11
# File 'lib/pebbles/river/river.rb', line 9

def channel
  @channel
end

#environmentObject (readonly)

Returns the value of attribute environment.



6
7
8
# File 'lib/pebbles/river/river.rb', line 6

def environment
  @environment
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



7
8
9
# File 'lib/pebbles/river/river.rb', line 7

def exchange
  @exchange
end

#exchange_nameObject (readonly)

Returns the value of attribute exchange_name.



11
12
13
# File 'lib/pebbles/river/river.rb', line 11

def exchange_name
  @exchange_name
end

#prefetchObject (readonly)

Returns the value of attribute prefetch.



10
11
12
# File 'lib/pebbles/river/river.rb', line 10

def prefetch
  @prefetch
end

#sessionObject (readonly)

Returns the value of attribute session.



8
9
10
# File 'lib/pebbles/river/river.rb', line 8

def session
  @session
end

Instance Method Details

#connectObject



30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/pebbles/river/river.rb', line 30

def connect
  unless @session and @channel and @exchange
    disconnect

    @session = Bunny::Session.new(::Pebbles::River.rabbitmq_options)
    @session.start

    @channel = @session.create_channel
    @channel.prefetch(@prefetch) if @prefetch

    @exchange = @channel.exchange(@exchange_name, EXCHANGE_OPTIONS.dup)
  end
end

#connected?Boolean

Returns:

  • (Boolean)


26
27
28
# File 'lib/pebbles/river/river.rb', line 26

def connected?
  @session && @session.connected?
end

#disconnectObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/pebbles/river/river.rb', line 44

def disconnect
  if @channel
    begin
      @channel.close
    rescue Bunny::Exception
      # Ignore
    end
    @channel = nil
  end
  if @session
    begin
      @session.stop
    rescue Bunny::Exception
      # Ignore
    end
    @session = nil
  end
  @exchange = nil
end

#publish(options = {}) ⇒ Object



64
65
66
67
68
69
70
71
# File 'lib/pebbles/river/river.rb', line 64

def publish(options = {})
  connect

  # Note: Using self.exchange so it can be stubbed in tests
  self.exchange.publish(options.to_json,
    persistent: options.fetch(:persistent, true),
    key: Routing.routing_key_for(options.slice(:event, :uid)))
end

#queue(options = {}) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/pebbles/river/river.rb', line 73

def queue(options = {})
  options.assert_valid_keys(:name, :ttl, :event, :path, :klass,
    :dead_letter_routing_key, :routing_key, :no_default_routing_keys)

  raise ArgumentError.new 'Queue must be named' unless options[:name]

  queue_args = {}
  if (ttl = options[:ttl])
    queue_args['x-message-ttl'] = ttl
  end
  if (dead_letter_routing_key = options[:dead_letter_routing_key])
    queue_args['x-dead-letter-exchange'] = @exchange_name
    queue_args['x-dead-letter-routing-key'] = dead_letter_routing_key
  end
  queue_opts = {durable: true, arguments: queue_args}

  connect
  queue = @channel.queue(options[:name], queue_opts)
  if (routing_key = options[:routing_key])
    queue.bind(exchange.name, key: routing_key)
  end
  unless options[:no_default_routing_keys]
    Routing.binding_routing_keys_for(options.slice(:event, :class, :path)).each do |key|
      queue.bind(exchange.name, key: key)
    end
  end
  queue
end