Class: Celerb::TaskPublisher

Inherits:
Object
  • Object
show all
Defined in:
lib/celerb/task_publisher.rb

Class Method Summary collapse

Class Method Details

.connect(opts) ⇒ Object



4
5
6
7
8
# File 'lib/celerb/task_publisher.rb', line 4

def self.connect(opts)
  @@exchange = MQ.direct(opts[:exchange],
    :key => opts[:key], :durable => true)
  @@results = ResultConsumer.new
end

.delay_task(task_name, task_args = [], task_kwargs = {}, task_id = nil, taskset_id = nil, expires = nil, eta = nil, exchange = nil, exchange_type = nil, retries = 0) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/celerb/task_publisher.rb', line 10

def self.delay_task(task_name, task_args=[], task_kwargs={},
               task_id=nil, taskset_id=nil, expires=nil, eta=nil,
               exchange=nil, exchange_type=nil, retries=0)
  task_id ||= TaskPublisher.uniq_id
  publish({
    :task => task_name,
    :id   => task_id,
    :args => task_args,
    :kwargs  => task_kwargs,
    :retries => retries,
    :eta     => eta,
    :expires => expires
  })
  return task_id
end

.publish(body) ⇒ Object



32
33
34
35
36
37
# File 'lib/celerb/task_publisher.rb', line 32

def self.publish(body)
  @@exchange.publish MessagePack.pack(body), {
    :content_type => 'application/x-msgpack',
    :content_encoding => 'binary'
  }
end

.register_result_handler(task_id, expiry, &blk) ⇒ Object



26
27
28
# File 'lib/celerb/task_publisher.rb', line 26

def self.register_result_handler(task_id, expiry, &blk)
  @@results.register(task_id, expiry, &blk)
end

.uniq_idObject



39
40
41
# File 'lib/celerb/task_publisher.rb', line 39

def self.uniq_id
  return UUID.create_v4.to_s
end