Module: Cloudist
- Includes:
- EMTimerUtils
- Defined in:
- lib/cloudist.rb,
lib/cloudist/job.rb,
lib/cloudist_old.rb,
lib/cloudist/queue.rb,
lib/cloudist/utils.rb,
lib/cloudist/errors.rb,
lib/cloudist/worker.rb,
lib/cloudist/message.rb,
lib/cloudist/payload.rb,
lib/cloudist/request.rb,
lib/cloudist/encoding.rb,
lib/cloudist/listener.rb,
lib/em/em_timer_utils.rb,
lib/cloudist/messaging.rb,
lib/cloudist/publisher.rb,
lib/cloudist/application.rb,
lib/cloudist/payload_old.rb,
lib/cloudist/queues/job_queue.rb,
lib/cloudist/queues/basic_queue.rb,
lib/cloudist/queues/reply_queue.rb
Defined Under Namespace
Modules: EMTimerUtils, Encoding, Queues, Utils Classes: Application, BadPayload, EnqueueError, Error, ExpiredMessage, GenericListener, GenericWorker, Job, JobQueue, Listener, Message, Messaging, Payload, Publisher, Queue, ReplyQueue, Request, StaleHeadersError, UnknownReplyTo, Worker
Constant Summary collapse
- DEFAULT_TTL =
300- @@workers =
{}
Class Method Summary collapse
-
.add_listener(klass) ⇒ Object
Adds a listener class.
- .closing? ⇒ Boolean
- .connection ⇒ Object
- .connection=(conn) ⇒ Object
-
.default_settings ⇒ Object
EM beta.
-
.enqueue(job_queue_name, data = nil) ⇒ Object
Enqueues a job.
- .extract_cloudist_options!(options) ⇒ Object
-
.handle(*queue_names) ⇒ Object
Registers a worker class to handle a specific queue.
- .handle_error(e) ⇒ Object
-
.job(queue_name) ⇒ Object
Defines a job handler (GenericWorker).
-
.listen(*queue_names, &block) ⇒ Object
Accepts either a queue name or a job instance returned from enqueue.
- .log ⇒ Object
- .log=(log) ⇒ Object
- .register_worker(queue_name, klass = nil, &block) ⇒ Object
- .remove_workers ⇒ Object
-
.reply(queue_name, job_id, data, options = {}) ⇒ Object
Send a reply synchronously This uses bunny instead of AMQP and as such can be run outside of EventMachine and the Cloudist start loop.
- .settings ⇒ Object
- .settings=(settings_hash) ⇒ Object
- .signal_trap! ⇒ Object (also: install_signal_trap)
-
.start(options = {}, &block) ⇒ Object
Start the Cloudist loop.
-
.stop_safely ⇒ Object
(also: stop)
Call this at anytime inside the loop to exit the app.
- .version ⇒ Object
-
.worker(&block) ⇒ Object
Define a worker.
- .workers ⇒ Object
Methods included from EMTimerUtils
included, #periodic_timer, #timer
Class Method Details
.add_listener(klass) ⇒ Object
Adds a listener class
181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/cloudist.rb', line 181 def add_listener(klass) raise ArgumentError, "Your listener must extend Cloudist::Listener" unless klass.superclass == Cloudist::Listener raise ArgumentError, "Your listener must declare at least one queue to listen to. Use listen_to 'queue.name'" if klass.job_queue_names.nil? klass.job_queue_names.each do |queue_name| klass.subscribe(queue_name) end self.listeners << klass return self.listeners end |
.closing? ⇒ Boolean
237 238 239 |
# File 'lib/cloudist_old.rb', line 237 def closing? ::AMQP.closing? end |
.connection ⇒ Object
75 76 77 |
# File 'lib/cloudist.rb', line 75 def connection AMQP.connection end |
.connection=(conn) ⇒ Object
79 80 81 |
# File 'lib/cloudist.rb', line 79 def connection=(conn) AMQP.connection = conn end |
.default_settings ⇒ Object
EM beta
260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/cloudist_old.rb', line 260 def default_settings uri = URI.parse(ENV["AMQP_URL"] || 'amqp://guest:guest@localhost:5672/') { :vhost => uri.path, :host => uri.host, :user => uri.user, :port => uri.port || 5672, :pass => uri.password, :heartbeat => 0, :logging => false } rescue Object => e raise "invalid AMQP_URL: (#{uri.inspect}) #{e.class} -> #{e.message}" end |
.enqueue(job_queue_name, data = nil) ⇒ Object
Enqueues a job. Takes a queue name and data hash to be sent to the worker. Returns Job instance Use Job#id to reference job later on.
198 199 200 201 202 |
# File 'lib/cloudist.rb', line 198 def enqueue(job_queue_name, data = nil) raise EnqueueError, "Incorrect arguments, you must include data when enqueuing job" if data.nil? # TODO: Detect if inside loop, if not use bunny sync Cloudist::Publisher.enqueue(job_queue_name, data) end |
.extract_cloudist_options!(options) ⇒ Object
70 71 72 73 |
# File 'lib/cloudist.rb', line 70 def () self.worker_prefetch = .delete(:worker_prefetch) || 1 self.listener_prefetch = .delete(:listener_prefetch) || 1 end |
.handle(*queue_names) ⇒ Object
Registers a worker class to handle a specific queue
Cloudist.handle('make.sandwich', 'eat.sandwich').with(MyWorker)
A standard worker would look like this:
class MyWorker < Cloudist::Worker
def process
log.debug(data.inspect)
end
end
A new instance of this worker will be created everytime a job arrives
Refer to examples.
130 131 132 133 134 135 136 137 138 139 |
# File 'lib/cloudist.rb', line 130 def handle(*queue_names) class << queue_names def with(handler) self.each do |queue_name| Cloudist.register_worker(queue_name.to_s, handler) end end end queue_names end |
.handle_error(e) ⇒ Object
240 241 242 243 244 245 |
# File 'lib/cloudist.rb', line 240 def handle_error(e) log.error "#{e.class}: #{e.message}"#, :exception => e e.backtrace.each do |line| log.error line end end |
.job(queue_name) ⇒ Object
Defines a job handler (GenericWorker)
job('make.sandwich') {
job.started!
# Work hard
sleep(5)
job.finished!
}
Refer to sandwich_worker.rb example
106 107 108 109 110 111 112 113 |
# File 'lib/cloudist.rb', line 106 def job(queue_name) if block_given? block = Proc.new register_worker(queue_name, &block) else raise ArgumentError, "You must supply a block as the last argument" end end |
.listen(*queue_names, &block) ⇒ Object
Accepts either a queue name or a job instance returned from enqueue. This method operates in two modes, when given a queue name, it will return all responses regardless of job id so you can use the job id to lookup a database record to update etc. When given a job instance it will only return messages from that job.
DEPRECATED
176 177 178 |
# File 'lib/cloudist.rb', line 176 def listen(*queue_names, &block) raise NotImplementedError, "This DSL method has been removed. Please use add_listener" end |
.log ⇒ Object
279 280 281 |
# File 'lib/cloudist.rb', line 279 def log @@log ||= Logger.new($stdout) end |
.log=(log) ⇒ Object
283 284 285 |
# File 'lib/cloudist.rb', line 283 def log=(log) @@log = log end |
.register_worker(queue_name, klass = nil, &block) ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/cloudist.rb', line 141 def register_worker(queue_name, klass = nil, &block) job_queue = JobQueue.new(queue_name) job_queue.subscribe do |request| j = Job.new(request.payload.dup) begin if block_given? worker_instance = GenericWorker.new(j, job_queue.q) worker_instance.process(&block) elsif klass worker_instance = klass.new(j, job_queue.q) worker_instance.process else raise RuntimeError, "Failed to register worker, I need either a handler class or block." end rescue Exception => e j.handle_error(e) ensure finished = Time.now.utc.to_f log.debug("Finished Job in #{finished - request.start} seconds") j.reply({:runtime => (finished - request.start)}, {:message_type => 'runtime'}) j.cleanup end end ((self.workers[queue_name.to_s] ||= []) << job_queue).uniq! end |
.remove_workers ⇒ Object
289 290 291 292 293 |
# File 'lib/cloudist.rb', line 289 def remove_workers self.workers.keys.each do |worker| self.workers.delete(worker) end end |
.reply(queue_name, job_id, data, options = {}) ⇒ Object
Send a reply synchronously This uses bunny instead of AMQP and as such can be run outside of EventMachine and the Cloudist start loop.
Usage: Cloudist.reply(‘make.sandwich’, => 12345)
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/cloudist_old.rb', line 208 def reply(queue_name, job_id, data, = {}) headers = { :message_id => job_id, :message_type => "reply", # :event => 'working', :message_type => 'reply' }.update() payload = Cloudist::Payload.new(data, headers) queue = Cloudist::SyncReplyQueue.new(queue_name) queue.setup queue.publish_to_q(payload) end |
.settings ⇒ Object
266 267 268 |
# File 'lib/cloudist.rb', line 266 def settings @@settings ||= default_settings end |
.settings=(settings_hash) ⇒ Object
270 271 272 |
# File 'lib/cloudist.rb', line 270 def settings=(settings_hash) @@settings = default_settings.update(settings_hash) end |
.signal_trap! ⇒ Object Also known as: install_signal_trap
274 275 276 277 |
# File 'lib/cloudist.rb', line 274 def signal_trap! ::Signal.trap('INT') { Cloudist.stop } ::Signal.trap('TERM'){ Cloudist.stop } end |
.start(options = {}, &block) ⇒ Object
Start the Cloudist loop
Cloudist.start {
# Do stuff in here
}
Options
-
:user => ‘name’
-
:pass => ‘secret’
-
:host => ‘localhost’
-
:port => 5672
-
:vhost => /
-
:heartbeat => 5
-
:logging => false
Refer to default config below for how to set these as defaults
57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/cloudist.rb', line 57 def start( = {}, &block) if .is_a?(Hash) () config = settings.update() AMQP.start(config) do self.instance_eval(&block) if block_given? end else # self.connection = options_or_connection self.instance_eval(&block) if block_given? end end |
.stop_safely ⇒ Object Also known as: stop
Call this at anytime inside the loop to exit the app.
227 228 229 230 231 232 233 234 235 236 |
# File 'lib/cloudist.rb', line 227 def stop_safely if EM.reactor_running? ::EM.add_timer(0.2) { ::AMQP.stop { ::EM.stop puts "\n" } } end end |
.version ⇒ Object
247 248 249 |
# File 'lib/cloudist.rb', line 247 def version @@version ||= File.read(File.dirname(__FILE__) + '/../VERSION').strip end |
.worker(&block) ⇒ Object
Define a worker. Must be called inside start loop
worker {
job('make.sandwich') {}
}
REMOVED
91 92 93 |
# File 'lib/cloudist.rb', line 91 def worker(&block) raise NotImplementedError, "This DSL format has been removed. Please use job('make.sandwich') {} instead." end |
.workers ⇒ Object
290 291 292 |
# File 'lib/cloudist_old.rb', line 290 def workers @@workers end |