Class: Celerb::TaskPublisher
- Inherits:
-
Object
- Object
- Celerb::TaskPublisher
- Defined in:
- lib/celerb/task_publisher.rb
Class Method Summary collapse
- .connect(opts) ⇒ Object
- .delay_task(task_name, task_args = [], task_kwargs = {}, task_id = nil, taskset_id = nil, expires = nil, eta = nil, exchange = nil, exchange_type = nil, retries = 0) ⇒ Object
- .publish(body) ⇒ Object
- .register_result_handler(task_id, expiry, &blk) ⇒ Object
- .uniq_id ⇒ Object
Class Method Details
.connect(opts) ⇒ Object
4 5 6 7 8 |
# File 'lib/celerb/task_publisher.rb', line 4 def self.connect(opts) @@exchange = MQ.direct(opts[:exchange], :key => opts[:key], :durable => true) @@results = ResultConsumer.new end |
.delay_task(task_name, task_args = [], task_kwargs = {}, task_id = nil, taskset_id = nil, expires = nil, eta = nil, exchange = nil, exchange_type = nil, retries = 0) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/celerb/task_publisher.rb', line 10 def self.delay_task(task_name, task_args=[], task_kwargs={}, task_id=nil, taskset_id=nil, expires=nil, eta=nil, exchange=nil, exchange_type=nil, retries=0) task_id ||= TaskPublisher.uniq_id publish({ :task => task_name, :id => task_id, :args => task_args, :kwargs => task_kwargs, :retries => retries, :eta => eta, :expires => expires }) return task_id end |
.publish(body) ⇒ Object
32 33 34 35 36 37 |
# File 'lib/celerb/task_publisher.rb', line 32 def self.publish(body) @@exchange.publish MessagePack.pack(body), { :content_type => 'application/x-msgpack', :content_encoding => 'binary' } end |
.register_result_handler(task_id, expiry, &blk) ⇒ Object
26 27 28 |
# File 'lib/celerb/task_publisher.rb', line 26 def self.register_result_handler(task_id, expiry, &blk) @@results.register(task_id, expiry, &blk) end |
.uniq_id ⇒ Object
39 40 41 |
# File 'lib/celerb/task_publisher.rb', line 39 def self.uniq_id return UUID.create_v4.to_s end |