Class: QueueBus::Adapters::Resque

Inherits:
Base
  • Object
show all
Defined in:
lib/resque_bus/adapter.rb

Defined Under Namespace

Modules: RetryHandlers

Instance Method Summary collapse

Instance Method Details

#enabled!Object



4
5
6
7
8
9
10
11
12
# File 'lib/resque_bus/adapter.rb', line 4

def enabled!
  # know we are using it
  require 'resque'
  require 'resque/scheduler'
  require 'resque-retry'

  QueueBus::Worker.extend(::Resque::Plugins::ExponentialBackoff)
  QueueBus::Worker.extend(::QueueBus::Adapters::Resque::RetryHandlers)
end

#enqueue(queue_name, klass, json) ⇒ Object



18
19
20
# File 'lib/resque_bus/adapter.rb', line 18

def enqueue(queue_name, klass, json)
  ::Resque.enqueue_to(queue_name, klass, json)
end

#enqueue_at(epoch_seconds, queue_name, klass, json) ⇒ Object



22
23
24
# File 'lib/resque_bus/adapter.rb', line 22

def enqueue_at(epoch_seconds, queue_name, klass, json)
  ::Resque.enqueue_at_with_queue(queue_name, epoch_seconds, klass, json)
end

#redis(&block) ⇒ Object



14
15
16
# File 'lib/resque_bus/adapter.rb', line 14

def redis(&block)
  block.call(::Resque.redis)
end

#setup_heartbeat!(queue_name) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/resque_bus/adapter.rb', line 26

def setup_heartbeat!(queue_name)
  # turn on the heartbeat
  # should be down after loading scheduler yml if you do that
  # otherwise, anytime
  name     = 'resquebus_heartbeat'
  schedule = { 'class' => '::QueueBus::Worker',
               'args'=>[::QueueBus::Util.encode({'bus_class_proxy' => '::QueueBus::Heartbeat'})],
               'cron'  => '* * * * *',   # every minute
               'queue' => queue_name,
               'description' => 'I publish a heartbeat_minutes event every minute'
             }
  if ::Resque::Scheduler.dynamic
    ::Resque.set_schedule(name, schedule)
  end
  ::Resque.schedule[name] = schedule
end