Class: Fleck::Consumer
- Inherits:
-
Object
show all
- Defined in:
- lib/fleck/consumer.rb
Defined Under Namespace
Classes: Request, Response
Class Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(thread_id = nil) ⇒ Consumer
Returns a new instance of Consumer.
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
# File 'lib/fleck/consumer.rb', line 45
def initialize(thread_id = nil)
@__thread_id = thread_id
@__connection = nil
@__host = configs[:host]
@__port = configs[:port]
@__user = configs[:user] || 'guest'
@__pass = configs[:password] || configs[:pass]
@__vhost = configs[:vhost] || "/"
@__queue_name = configs[:queue]
logger.info "Launching #{self.class.to_s.color(:yellow)} consumer ..."
connect!
create_channel!
subscribe!
at_exit do
terminate
end
end
|
Class Attribute Details
.configs ⇒ Object
Returns the value of attribute configs.
5
6
7
|
# File 'lib/fleck/consumer.rb', line 5
def configs
@configs
end
|
.consumers ⇒ Object
Returns the value of attribute consumers.
5
6
7
|
# File 'lib/fleck/consumer.rb', line 5
def consumers
@consumers
end
|
.logger ⇒ Object
Returns the value of attribute logger.
5
6
7
|
# File 'lib/fleck/consumer.rb', line 5
def logger
@logger
end
|
Class Method Details
.autostart(subclass) ⇒ Object
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/fleck/consumer.rb', line 30
def self.autostart(subclass)
trace = TracePoint.new(:end) do |tp|
if tp.self == subclass
trace.disable
[subclass.configs[:concurrency].to_i, 1].max.times do |i|
subclass.consumers << subclass.new(i)
end
end
end
trace.enable
end
|
15
16
17
18
|
# File 'lib/fleck/consumer.rb', line 15
def self.configure(opts = {})
self.configs.merge!(opts)
logger.debug "Consumer configurations updated."
end
|
.inherited(subclass) ⇒ Object
8
9
10
11
12
13
|
# File 'lib/fleck/consumer.rb', line 8
def self.inherited(subclass)
super
init_consumer(subclass)
autostart(subclass)
Fleck.register_consumer(subclass)
end
|
.init_consumer(subclass) ⇒ Object
20
21
22
23
24
25
26
27
28
|
# File 'lib/fleck/consumer.rb', line 20
def self.init_consumer(subclass)
subclass.logger = Fleck.logger.clone
subclass.logger.progname = subclass.to_s
subclass.logger.debug "Setting defaults for #{subclass.to_s.color(:yellow)} consumer"
subclass.configs = Fleck.config.default_options
subclass.consumers = []
end
|
Instance Method Details
#configs ⇒ Object
86
87
88
|
# File 'lib/fleck/consumer.rb', line 86
def configs
@configs ||= self.class.configs
end
|
#logger ⇒ Object
78
79
80
81
82
83
84
|
# File 'lib/fleck/consumer.rb', line 78
def logger
return @logger if @logger
@logger = self.class.logger.clone
@logger.progname = "#{self.class.name}" + (configs[:concurrency].to_i <= 1 ? "" : "[#{@__thread_id}]")
@logger
end
|
#on_message(request, response) ⇒ Object
67
68
69
|
# File 'lib/fleck/consumer.rb', line 67
def on_message(request, response)
raise NotImplementedError.new("You must implement #on_message(delivery_info, metadata, payload) method")
end
|
#terminate ⇒ Object
71
72
73
74
75
76
|
# File 'lib/fleck/consumer.rb', line 71
def terminate
unless @__channel.closed?
@__channel.close
logger.info "Consumer successfully terminated."
end
end
|