Module: RabbitJobs
- Extended by:
- RabbitJobs
- Included in:
- RabbitJobs
- Defined in:
- lib/rabbit_jobs.rb,
lib/rabbit_jobs/util.rb,
lib/rabbit_jobs/worker.rb,
lib/rabbit_jobs/helpers.rb,
lib/rabbit_jobs/version.rb,
lib/rabbit_jobs/publisher.rb,
lib/rabbit_jobs/scheduler.rb,
lib/rabbit_jobs/amqp_helper.rb,
lib/rabbit_jobs/error_mailer.rb,
lib/rabbit_jobs/configuration.rb
Defined Under Namespace
Modules: Helpers, Job, Publisher
Classes: AmqpHelper, Configuration, ErrorMailer, Scheduler, Util, Worker
Constant Summary
collapse
- VERSION =
"0.6.0"
Instance Attribute Summary collapse
Instance Method Summary
collapse
Instance Attribute Details
#logger ⇒ Object
115
116
117
118
119
120
121
122
123
|
# File 'lib/rabbit_jobs.rb', line 115
def logger
unless @logger
@logger = Logger.new($stdout)
@logger.level = Logger::INFO
@logger.formatter = nil
@logger.progname = 'rj'
end
@logger
end
|
Instance Method Details
#_run_after_fork_callbacks ⇒ Object
131
132
133
134
135
136
|
# File 'lib/rabbit_jobs.rb', line 131
def _run_after_fork_callbacks
@_after_fork_callbacks ||= []
@_after_fork_callbacks.each { |callback|
callback.call
}
end
|
#after_fork(&block) ⇒ Object
125
126
127
128
129
|
# File 'lib/rabbit_jobs.rb', line 125
def after_fork(&block)
raise unless block_given?
@_after_fork_callbacks ||= []
@_after_fork_callbacks << block
end
|
#before_process_message(&block) ⇒ Object
138
139
140
141
142
|
# File 'lib/rabbit_jobs.rb', line 138
def before_process_message(&block)
raise unless block_given?
@before_process_message_callbacks ||= []
@before_process_message_callbacks << block
end
|
#config ⇒ Object
14
15
16
|
# File 'lib/rabbit_jobs/configuration.rb', line 14
def config
@@configuration ||= load_config
end
|
9
10
11
12
|
# File 'lib/rabbit_jobs/configuration.rb', line 9
def configure
@@configuration ||= Configuration.new
yield @@configuration if block_given?
end
|
#direct_publish_to(routing_key, payload, ex = {}, &block) ⇒ Object
83
84
85
86
87
88
89
90
91
92
93
94
95
|
# File 'lib/rabbit_jobs.rb', line 83
def direct_publish_to(routing_key, payload, ex = {}, &block)
if RJ.running?
RJ::Publisher.direct_publish_to(routing_key, payload, ex, &block)
else
RJ.run {
RJ::Publisher.direct_publish_to(routing_key, payload, ex) {
RJ.stop {
yield if block_given?
}
}
}
end
end
|
#load_config(config_file = nil) ⇒ Object
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
# File 'lib/rabbit_jobs/configuration.rb', line 18
def load_config(config_file = nil)
@@configuration ||= nil
config_file ||= defined?(Rails) && Rails.respond_to?(:root) && Rails.root.join('config/rabbit_jobs.yml')
if config_file
if File.exists?(config_file)
@@configuration ||= Configuration.new
@@configuration.load_file(config_file)
end
end
unless @@configuration
self.configure do |c|
c.prefix 'rabbit_jobs'
end
end
@@configuration
end
|
#publish(klass, *params, &block) ⇒ Object
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/rabbit_jobs.rb', line 55
def publish(klass, *params, &block)
if RJ.running?
RJ::Publisher.publish(klass, *params, &block)
else
RJ.run {
RJ::Publisher.publish(klass, *params) {
RJ.stop {
yield if block_given?
}
}
}
end
end
|
#publish_to(routing_key, klass, *params, &block) ⇒ Object
69
70
71
72
73
74
75
76
77
78
79
80
81
|
# File 'lib/rabbit_jobs.rb', line 69
def publish_to(routing_key, klass, *params, &block)
if RJ.running?
RJ::Publisher.publish_to(routing_key, klass, *params, &block)
else
RJ.run {
RJ::Publisher.publish_to(routing_key, klass, *params) {
RJ.stop {
yield if block_given?
}
}
}
end
end
|
#purge_queue(*routing_keys, &block) ⇒ Object
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
# File 'lib/rabbit_jobs.rb', line 97
def purge_queue(*routing_keys, &block)
if RJ.running?
RJ::Publisher.purge_queue(*routing_keys, &block)
else
messages_count = 0
RJ.run {
RJ::Publisher.purge_queue(*routing_keys) { |count|
messages_count = count
RJ.stop {
yield(count) if block_given?
}
}
}
messages_count
end
end
|
#run_before_process_message_callbacks ⇒ Object
144
145
146
147
148
149
150
|
# File 'lib/rabbit_jobs.rb', line 144
def run_before_process_message_callbacks
@before_process_message_callbacks ||= []
@before_process_message_callbacks.each { |callback|
return false unless callback.call
}
return true
end
|
#running? ⇒ Boolean
51
52
53
|
# File 'lib/rabbit_jobs.rb', line 51
def running?
EM.reactor_running?
end
|
#start ⇒ Object
Also known as:
run
22
23
24
25
26
27
28
29
|
# File 'lib/rabbit_jobs.rb', line 22
def start
raise unless block_given?
raise if EM.reactor_running?
EM.run {
yield
}
end
|
#stop ⇒ Object
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
# File 'lib/rabbit_jobs.rb', line 33
def stop
RJ::Publisher.cleanup
if AMQP.connection
AMQP.connection.disconnect {
AMQP.connection = nil
AMQP.channel = nil
EM.stop {
yield if block_given?
}
}
else
EM.stop {
yield if block_given?
}
end
end
|