Class: Beetle::Subscriber

Inherits:
Base
  • Object
show all
Defined in:
lib/beetle/subscriber.rb

Overview

Manages subscriptions and message processing on the receiver side of things.

Instance Attribute Summary collapse

Attributes inherited from Base

#options, #server, #servers

Instance Method Summary collapse

Methods included from Logging

#logger

Constructor Details

#initialize(client, options = {}) ⇒ Subscriber

create a new subscriber instance



13
14
15
16
17
18
19
20
21
22
23
# File 'lib/beetle/subscriber.rb', line 13

def initialize(client, options = {}) #:nodoc:
  super
  @servers.concat @client.additional_subscription_servers
  @handlers = {}
  @connections = {}
  @channels = {}
  @subscriptions = {}
  @listened_queues = []
  @channels_closed = false
  @tracing = false
end

Instance Attribute Details

#tracingObject

Returns the value of attribute tracing.



7
8
9
# File 'lib/beetle/subscriber.rb', line 7

def tracing
  @tracing
end

Instance Method Details

#listen_queues(queues) ⇒ Object

the client calls this method to subscribe to a list of queues. this method does the following things:

  • creates all exchanges which have been registered for the given queues

  • creates and binds each listed queue queues

  • subscribes the handlers for all these queues

yields before entering the eventmachine loop (if a block was given)



33
34
35
36
37
38
39
40
41
42
# File 'lib/beetle/subscriber.rb', line 33

def listen_queues(queues) #:nodoc:
  @listened_queues = queues
  @exchanges_for_queues = exchanges_for_queues(queues)
  EM.run do
    each_server_sorted_randomly do
      connect_server connection_settings
    end
    yield if block_given?
  end
end

#pause_listening(queues) ⇒ Object



44
45
46
47
48
# File 'lib/beetle/subscriber.rb', line 44

def pause_listening(queues)
  each_server do
    queues.each { |name| pause(name) if has_subscription?(name) }
  end
end

#register_handler(queues, opts = {}, handler = nil, &block) ⇒ Object

register handler for the given queues (see Client#register_handler)



73
74
75
76
77
# File 'lib/beetle/subscriber.rb', line 73

def register_handler(queues, opts={}, handler=nil, &block) #:nodoc:
  Array(queues).each do |queue|
    @handlers[queue] = [opts.symbolize_keys, handler || block]
  end
end

#resume_listening(queues) ⇒ Object



50
51
52
53
54
# File 'lib/beetle/subscriber.rb', line 50

def resume_listening(queues)
  each_server do
    queues.each { |name| resume(name) if has_subscription?(name) }
  end
end

#stop!Object

closes all AMQP connections and stop the eventmachine loop. note that the shutdown process is asynchronous. must not be called while a message handler is running. typically one would use EM.add_timer(0) { stop! } to ensure this.



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/beetle/subscriber.rb', line 59

def stop! #:nodoc:
  if EM.reactor_running?
    EM.add_timer(0) do
      close_all_channels
      close_all_connections
    end
  else
    # try to clean up as much a possible under the circumstances, by closing all connections
    # this should a least close the sockets
    close_connections_with_reactor_not_running
  end
end

#tracing?Boolean

Returns:

  • (Boolean)


8
9
10
# File 'lib/beetle/subscriber.rb', line 8

def tracing?
  @tracing
end