Module: Cloudist

Includes:
EMTimerUtils
Defined in:
lib/cloudist.rb,
lib/cloudist/job.rb,
lib/cloudist_old.rb,
lib/cloudist/queue.rb,
lib/cloudist/utils.rb,
lib/cloudist/errors.rb,
lib/cloudist/worker.rb,
lib/cloudist/message.rb,
lib/cloudist/payload.rb,
lib/cloudist/request.rb,
lib/cloudist/encoding.rb,
lib/cloudist/listener.rb,
lib/em/em_timer_utils.rb,
lib/cloudist/messaging.rb,
lib/cloudist/publisher.rb,
lib/cloudist/application.rb,
lib/cloudist/payload_old.rb,
lib/cloudist/queues/job_queue.rb,
lib/cloudist/queues/basic_queue.rb,
lib/cloudist/queues/reply_queue.rb

Defined Under Namespace

Modules: EMTimerUtils, Encoding, Queues, Utils Classes: Application, BadPayload, EnqueueError, Error, ExpiredMessage, GenericListener, GenericWorker, Job, JobQueue, Listener, Message, Messaging, Payload, Publisher, Queue, ReplyQueue, Request, StaleHeadersError, UnknownReplyTo, Worker

Constant Summary collapse

DEFAULT_TTL =
300
@@workers =
{}

Class Method Summary collapse

Methods included from EMTimerUtils

included, #periodic_timer, #timer

Class Method Details

.add_listener(klass) ⇒ Object

Adds a listener class

Raises:

  • (ArgumentError)


181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/cloudist.rb', line 181

def add_listener(klass)
  raise ArgumentError, "Your listener must extend Cloudist::Listener" unless klass.superclass == Cloudist::Listener
  raise ArgumentError, "Your listener must declare at least one queue to listen to. Use listen_to 'queue.name'" if klass.job_queue_names.nil?
  
  klass.job_queue_names.each do |queue_name|
    klass.subscribe(queue_name)
  end
  
  self.listeners << klass
  
  return self.listeners
end

.closing?Boolean

Returns:

  • (Boolean)


237
238
239
# File 'lib/cloudist_old.rb', line 237

def closing?
  ::AMQP.closing?
end

.connectionObject



75
76
77
# File 'lib/cloudist.rb', line 75

def connection
  AMQP.connection
end

.connection=(conn) ⇒ Object



79
80
81
# File 'lib/cloudist.rb', line 79

def connection=(conn)
  AMQP.connection = conn
end

.default_settingsObject

EM beta



260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/cloudist_old.rb', line 260

def default_settings
  uri = URI.parse(ENV["AMQP_URL"] || 'amqp://guest:guest@localhost:5672/')
  {
    :vhost => uri.path,
    :host => uri.host,
    :user => uri.user,
    :port => uri.port || 5672,
    :pass => uri.password,
    :heartbeat => 0,
    :logging => false
  }
rescue Object => e
  raise "invalid AMQP_URL: (#{uri.inspect}) #{e.class} -> #{e.message}"
end

.enqueue(job_queue_name, data = nil) ⇒ Object

Enqueues a job. Takes a queue name and data hash to be sent to the worker. Returns Job instance Use Job#id to reference job later on.

Raises:



198
199
200
201
202
# File 'lib/cloudist.rb', line 198

def enqueue(job_queue_name, data = nil)
  raise EnqueueError, "Incorrect arguments, you must include data when enqueuing job" if data.nil?
  # TODO: Detect if inside loop, if not use bunny sync
  Cloudist::Publisher.enqueue(job_queue_name, data)
end

.extract_cloudist_options!(options) ⇒ Object



70
71
72
73
# File 'lib/cloudist.rb', line 70

def extract_cloudist_options!(options)
  self.worker_prefetch =    options.delete(:worker_prefetch) || 1
  self.listener_prefetch =  options.delete(:listener_prefetch) || 1
end

.handle(*queue_names) ⇒ Object

Registers a worker class to handle a specific queue

Cloudist.handle('make.sandwich', 'eat.sandwich').with(MyWorker)

A standard worker would look like this:

class MyWorker < Cloudist::Worker
  def process
    log.debug(data.inspect)
  end
end

A new instance of this worker will be created everytime a job arrives

Refer to examples.



130
131
132
133
134
135
136
137
138
139
# File 'lib/cloudist.rb', line 130

def handle(*queue_names)
  class << queue_names
    def with(handler)
      self.each do |queue_name|
        Cloudist.register_worker(queue_name.to_s, handler)
      end
    end
  end
  queue_names
end

.handle_error(e) ⇒ Object



240
241
242
243
244
245
# File 'lib/cloudist.rb', line 240

def handle_error(e)
  log.error "#{e.class}: #{e.message}"#, :exception => e
  e.backtrace.each do |line|
    log.error line
  end
end

.job(queue_name) ⇒ Object

Defines a job handler (GenericWorker)

job('make.sandwich') {
  job.started!
  # Work hard
  sleep(5)
  job.finished!
}

Refer to sandwich_worker.rb example



106
107
108
109
110
111
112
113
# File 'lib/cloudist.rb', line 106

def job(queue_name)
  if block_given?
    block = Proc.new
    register_worker(queue_name, &block)
  else
    raise ArgumentError, "You must supply a block as the last argument"
  end
end

.listen(*queue_names, &block) ⇒ Object

Accepts either a queue name or a job instance returned from enqueue. This method operates in two modes, when given a queue name, it will return all responses regardless of job id so you can use the job id to lookup a database record to update etc. When given a job instance it will only return messages from that job.

DEPRECATED

Raises:

  • (NotImplementedError)


176
177
178
# File 'lib/cloudist.rb', line 176

def listen(*queue_names, &block)
  raise NotImplementedError, "This DSL method has been removed. Please use add_listener"
end

.logObject



279
280
281
# File 'lib/cloudist.rb', line 279

def log
  @@log ||= Logger.new($stdout)
end

.log=(log) ⇒ Object



283
284
285
# File 'lib/cloudist.rb', line 283

def log=(log)
  @@log = log
end

.register_worker(queue_name, klass = nil, &block) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/cloudist.rb', line 141

def register_worker(queue_name, klass = nil, &block)
  job_queue = JobQueue.new(queue_name)
  job_queue.subscribe do |request|
    j = Job.new(request.payload.dup)
    begin
      if block_given?
        worker_instance = GenericWorker.new(j, job_queue.q)
        worker_instance.process(&block)
      elsif klass
        worker_instance = klass.new(j, job_queue.q)
        worker_instance.process
      else
        raise RuntimeError, "Failed to register worker, I need either a handler class or block."
      end
    rescue Exception => e
      j.handle_error(e)
    ensure
      finished = Time.now.utc.to_f
      log.debug("Finished Job in #{finished - request.start} seconds")
      j.reply({:runtime => (finished - request.start)}, {:message_type => 'runtime'})
      j.cleanup
    end
  end
  
  ((self.workers[queue_name.to_s] ||= []) << job_queue).uniq!
end

.remove_workersObject



289
290
291
292
293
# File 'lib/cloudist.rb', line 289

def remove_workers
  self.workers.keys.each do |worker|
    self.workers.delete(worker)
  end
end

.reply(queue_name, job_id, data, options = {}) ⇒ Object

Send a reply synchronously This uses bunny instead of AMQP and as such can be run outside of EventMachine and the Cloudist start loop.

Usage: Cloudist.reply(‘make.sandwich’, => 12345)



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/cloudist_old.rb', line 208

def reply(queue_name, job_id, data, options = {})
  headers = {
    :message_id => job_id,
    :message_type => "reply",
    # :event => 'working',
    :message_type => 'reply'
  }.update(options)

  payload = Cloudist::Payload.new(data, headers)

  queue = Cloudist::SyncReplyQueue.new(queue_name)

  queue.setup
  queue.publish_to_q(payload)
end

.settingsObject



266
267
268
# File 'lib/cloudist.rb', line 266

def settings
  @@settings ||= default_settings
end

.settings=(settings_hash) ⇒ Object



270
271
272
# File 'lib/cloudist.rb', line 270

def settings=(settings_hash)
  @@settings = default_settings.update(settings_hash)
end

.signal_trap!Object Also known as: install_signal_trap



274
275
276
277
# File 'lib/cloudist.rb', line 274

def signal_trap!
  ::Signal.trap('INT') { Cloudist.stop }
  ::Signal.trap('TERM'){ Cloudist.stop }
end

.start(options = {}, &block) ⇒ Object

Start the Cloudist loop

Cloudist.start {
  # Do stuff in here
}

Options

  • :user => ‘name’

  • :pass => ‘secret’

  • :host => ‘localhost’

  • :port => 5672

  • :vhost => /

  • :heartbeat => 5

  • :logging => false

Refer to default config below for how to set these as defaults



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/cloudist.rb', line 57

def start(options_or_connection = {}, &block)
  if options_or_connection.is_a?(Hash)
    extract_cloudist_options!(options_or_connection)
    config = settings.update(options_or_connection)
    AMQP.start(config) do
      self.instance_eval(&block) if block_given?
    end
  else
    # self.connection = options_or_connection
    self.instance_eval(&block) if block_given?
  end
end

.stop_safelyObject Also known as: stop

Call this at anytime inside the loop to exit the app.



227
228
229
230
231
232
233
234
235
236
# File 'lib/cloudist.rb', line 227

def stop_safely
  if EM.reactor_running?
    ::EM.add_timer(0.2) { 
      ::AMQP.stop { 
        ::EM.stop
        puts "\n"
      }
    }
  end
end

.versionObject



247
248
249
# File 'lib/cloudist.rb', line 247

def version
  @@version ||= File.read(File.dirname(__FILE__) + '/../VERSION').strip
end

.worker(&block) ⇒ Object

Define a worker. Must be called inside start loop

worker {
  job('make.sandwich') {}
}

REMOVED

Raises:

  • (NotImplementedError)


91
92
93
# File 'lib/cloudist.rb', line 91

def worker(&block)
  raise NotImplementedError, "This DSL format has been removed. Please use job('make.sandwich') {} instead."
end

.workersObject



290
291
292
# File 'lib/cloudist_old.rb', line 290

def workers
  @@workers
end