Class: MQueue::Queue
- Inherits:
-
Object
show all
- Defined in:
- lib/m_queue/queue.rb
Defined Under Namespace
Classes: MQueueError, NoServersLeft
Constant Summary
collapse
- LOG_DIR =
File.join(MQUEUE_ROOT, 'log')
- @@queues =
[]
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.inherited(subclass) ⇒ Object
10
11
12
|
# File 'lib/m_queue/queue.rb', line 10
def inherited(subclass)
@@queues << subclass
end
|
.poll ⇒ Object
47
48
49
50
51
52
53
54
55
|
# File 'lib/m_queue/queue.rb', line 47
def poll
send_to_server(true) {|server|
begin
server[queue_name]
rescue => e
puts e
end
}
end
|
.publish(msg) ⇒ Object
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/m_queue/queue.rb', line 35
def publish(msg)
if defined?(RAILS_ENV) &&
(RAILS_ENV == 'development' or
RAILS_ENV == 'test')
return self.new.on_message(msg)
end
send_to_server {|server|
server[queue_name] = msg.to_yaml
}
msg
end
|
.queue_name ⇒ Object
57
58
59
|
# File 'lib/m_queue/queue.rb', line 57
def queue_name
self.name
end
|
.queues ⇒ Object
14
15
16
|
# File 'lib/m_queue/queue.rb', line 14
def queues
@@queues
end
|
.reload! ⇒ Object
69
70
71
72
73
|
# File 'lib/m_queue/queue.rb', line 69
def reload!
servers.each do |s|
s.reload! if s.respond_to?('reload!')
end
end
|
.run ⇒ Object
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/m_queue/queue.rb', line 18
def run
num_sleeps = 0
loop do
msg = poll
self.new.process(msg) if msg
if !msg
if num_sleeps < 50
num_sleeps += 1
num_sleeps *= 2
end
sleep 2 else
num_sleeps = 0
end
end
end
|
.servers ⇒ Object
65
66
67
|
# File 'lib/m_queue/queue.rb', line 65
def servers
@@servers ||= []
end
|
.servers=(srvs) ⇒ Object
61
62
63
|
# File 'lib/m_queue/queue.rb', line 61
def servers=(srvs)
@@servers = srvs
end
|
Instance Method Details
#logger ⇒ Object
113
114
115
116
117
118
|
# File 'lib/m_queue/queue.rb', line 113
def logger
return @logger if @logger
FileUtils.mkdir_p(LOG_DIR)
@logger = Logger.new(File.join(LOG_DIR, self.class.queue_name + '.log'))
@logger
end
|
#on_message(msg) ⇒ Object
120
121
122
|
# File 'lib/m_queue/queue.rb', line 120
def on_message(msg)
raise 'You must implement on_message.'
end
|
#process(msg) ⇒ Object
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/m_queue/queue.rb', line 101
def process(msg)
reload!
begin
on_message(YAML.load(msg))
rescue => e
puts "\n#{ e.message } - (#{ e.class })\n" <<
"#{(e.backtrace or []).join("\n")}"
end
end
|
#reload! ⇒ Object
124
125
126
|
# File 'lib/m_queue/queue.rb', line 124
def reload!
ActiveRecord::Base.verify_active_connections! if defined?(ActiveRecord)
end
|