Class: BossQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/boss_queue/job.rb,
lib/boss_queue/boss_queue.rb

Defined Under Namespace

Classes: Job

Constant Summary collapse

@@environment =
nil

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ BossQueue

Returns a new instance of BossQueue.



6
7
8
9
# File 'lib/boss_queue/boss_queue.rb', line 6

def initialize(options={})
  @failure_action = options[:failure_action] if options[:failure_action]
  @queue_postfix = options[:queue] ? '_' + options[:queue] : ''
end

Class Method Details

.environmentObject

:nodoc:



89
90
91
92
93
94
95
96
97
# File 'lib/boss_queue/boss_queue.rb', line 89

def self.environment # :nodoc:
  @@environment ||= if Module.const_get('Rails')
                      Rails.env
                    elsif Module.const_get('Rack')
                      Rack.env
                    else
                      raise 'BossQueue requires an environment'
                    end
end

.environment=(env) ⇒ Object



11
12
13
# File 'lib/boss_queue/boss_queue.rb', line 11

def self.environment=(env)
  @@environment = env
end

.queue_prefixObject

:nodoc:



99
100
101
102
103
104
105
106
107
108
# File 'lib/boss_queue/boss_queue.rb', line 99

def self.queue_prefix # :nodoc:
  case self.environment
  when 'production'
    ''
  when 'development'
    'dev_'
  else
    environment + '_'
  end
end

Instance Method Details

#create_job(class_or_instance, method_name, *args) ⇒ Object

:nodoc:



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/boss_queue/boss_queue.rb', line 64

def create_job(class_or_instance, method_name, *args) # :nodoc:
  job = BossQueue::Job.shard(table_name).new
  if class_or_instance.is_a?(Class)
    class_name = class_or_instance.to_s
    instance_id = nil
    job.kind = "#{class_name}@#{method_name}"
  else
    class_name = class_or_instance.class.to_s
    instance_id = class_or_instance.id
    job.kind = "#{class_name}##{method_name}"
  end
  job.queue_name = queue_name
  job.failure_action = failure_action
  job.model_class_name = class_name
  job.model_id = instance_id unless instance_id.nil?
  job.job_method = method_name.to_s
  job.job_arguments = JSON.generate(args)
  job.save!
  job
end

#create_queueObject



35
36
37
# File 'lib/boss_queue/boss_queue.rb', line 35

def create_queue
  AWS::SQS::QueueCollection.new.create(queue_name, :default_visibility_timeout => 5 * 60)
end

#create_table(read_capacity = 1, write_capacity = 1, options = {}) ⇒ Object



28
29
30
31
32
33
# File 'lib/boss_queue/boss_queue.rb', line 28

def create_table(read_capacity=1, write_capacity=1, options={})
  create_opts = {}
  create_opts[:hash_key] = { :id => :string }

  AWS::DynamoDB.new.tables.create(table_name, read_capacity, write_capacity, create_opts)
end

#enqueue(class_or_instance, method_name, *args) ⇒ Object



54
55
56
57
# File 'lib/boss_queue/boss_queue.rb', line 54

def enqueue(class_or_instance, method_name, *args)
  job = create_job(class_or_instance, method_name, *args)
  job.enqueue
end

#enqueue_with_delay(delay, class_or_instance, method_name, *args) ⇒ Object



59
60
61
62
# File 'lib/boss_queue/boss_queue.rb', line 59

def enqueue_with_delay(delay, class_or_instance, method_name, *args)
  job = create_job(class_or_instance, method_name, *args)
  job.enqueue_with_delay(delay)
end

#failure_actionObject



15
16
17
# File 'lib/boss_queue/boss_queue.rb', line 15

def failure_action
  @failure_action ||= 'retry'
end

#queue_nameObject



23
24
25
# File 'lib/boss_queue/boss_queue.rb', line 23

def queue_name
  "#{BossQueue.queue_prefix}boss_queue#{@queue_postfix}"
end

#sqs_queueObject

:nodoc:



85
86
87
# File 'lib/boss_queue/boss_queue.rb', line 85

def sqs_queue # :nodoc:
  @sqs_queue ||= AWS::SQS.new.queues[AWS::SQS.new.queues.url_for(queue_name)]
end

#table_nameObject



19
20
21
# File 'lib/boss_queue/boss_queue.rb', line 19

def table_name
  "#{BossQueue.queue_prefix}boss_queue_jobs"
end

#workObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/boss_queue/boss_queue.rb', line 39

def work
  job_dequeued = false
  sqs_queue.receive_message do |job_id|
    job_dequeued = true
    # When a block is given, each message is yielded to the block and then deleted as long as the block exits normally - http://docs.aws.amazon.com/AWSRubySDK/latest/frames.html
    begin
      job = BossQueue::Job.shard(table_name).find(job_id.body)
      job.sqs_queue = sqs_queue
      job.work
    rescue AWS::Record::RecordNotFound
    end
  end
  job_dequeued
end