Class: Fleck::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/fleck/consumer.rb

Defined Under Namespace

Classes: Request, Response

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(thread_id = nil) ⇒ Consumer

Returns a new instance of Consumer.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fleck/consumer.rb', line 45

def initialize(thread_id = nil)
  @__thread_id    = thread_id
  @__connection   = nil

  @__host       = configs[:host]
  @__port       = configs[:port]
  @__user       = configs[:user]     || 'guest'
  @__pass       = configs[:password] || configs[:pass]
  @__vhost      = configs[:vhost]    || "/"
  @__queue_name = configs[:queue]

  logger.info "Launching #{self.class.to_s.color(:yellow)} consumer ..."

  connect!
  create_channel!
  subscribe!

  at_exit do
    terminate
  end
end

Class Attribute Details

.configsObject

Returns the value of attribute configs.



5
6
7
# File 'lib/fleck/consumer.rb', line 5

def configs
  @configs
end

.consumersObject

Returns the value of attribute consumers.



5
6
7
# File 'lib/fleck/consumer.rb', line 5

def consumers
  @consumers
end

.loggerObject

Returns the value of attribute logger.



5
6
7
# File 'lib/fleck/consumer.rb', line 5

def logger
  @logger
end

Class Method Details

.autostart(subclass) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/fleck/consumer.rb', line 30

def self.autostart(subclass)
  # Use TracePoint to autostart the consumer when ready
  trace = TracePoint.new(:end) do |tp|
    if tp.self == subclass
      # disable tracing when we reach the end of the subclass
      trace.disable
      # create a new instance of the subclass, in order to start the consumer
      [subclass.configs[:concurrency].to_i, 1].max.times do |i|
        subclass.consumers << subclass.new(i)
      end
    end
  end
  trace.enable
end

.configure(opts = {}) ⇒ Object



15
16
17
18
# File 'lib/fleck/consumer.rb', line 15

def self.configure(opts = {})
  self.configs.merge!(opts)
  logger.debug "Consumer configurations updated."
end

.inherited(subclass) ⇒ Object



8
9
10
11
12
13
# File 'lib/fleck/consumer.rb', line 8

def self.inherited(subclass)
  super
  init_consumer(subclass)
  autostart(subclass)
  Fleck.register_consumer(subclass)
end

.init_consumer(subclass) ⇒ Object



20
21
22
23
24
25
26
27
28
# File 'lib/fleck/consumer.rb', line 20

def self.init_consumer(subclass)
  subclass.logger          = Fleck.logger.clone
  subclass.logger.progname = subclass.to_s

  subclass.logger.debug "Setting defaults for #{subclass.to_s.color(:yellow)} consumer"

  subclass.configs   = Fleck.config.default_options
  subclass.consumers = []
end

Instance Method Details

#configsObject



86
87
88
# File 'lib/fleck/consumer.rb', line 86

def configs
  @configs ||= self.class.configs
end

#loggerObject



78
79
80
81
82
83
84
# File 'lib/fleck/consumer.rb', line 78

def logger
  return @logger if @logger
  @logger = self.class.logger.clone
  @logger.progname = "#{self.class.name}" + (configs[:concurrency].to_i <= 1 ? "" : "[#{@__thread_id}]")

  @logger
end

#on_message(request, response) ⇒ Object

Raises:

  • (NotImplementedError)


67
68
69
# File 'lib/fleck/consumer.rb', line 67

def on_message(request, response)
  raise NotImplementedError.new("You must implement #on_message(delivery_info, metadata, payload) method")
end

#terminateObject



71
72
73
74
75
76
# File 'lib/fleck/consumer.rb', line 71

def terminate
  unless @__channel.closed?
    @__channel.close
    logger.info "Consumer successfully terminated."
  end
end