Class: BossQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/boss_queue/job.rb,
lib/boss_queue/boss_queue.rb

Defined Under Namespace

Classes: Job

Constant Summary collapse

@@environment =
nil

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ BossQueue

Returns a new instance of BossQueue.



6
7
8
9
10
11
# File 'lib/boss_queue/boss_queue.rb', line 6

def initialize(options={})
  @failure_action = options[:failure_action] if options[:failure_action]
  @failure_callback = options[:failure_callback] if options[:failure_callback]
  @delete_if_target_missing = options[:delete_if_target_missing] if options[:delete_if_target_missing]
  @queue_postfix = options[:queue] ? '_' + options[:queue] : ''
end

Class Method Details

.environmentObject

:nodoc:



114
115
116
117
118
119
120
121
122
# File 'lib/boss_queue/boss_queue.rb', line 114

def self.environment # :nodoc:
  @@environment ||= if Module.const_get('Rails')
                      Rails.env
                    elsif Module.const_get('Rack')
                      Rack.env
                    else
                      raise 'BossQueue requires an environment'
                    end
end

.environment=(env) ⇒ Object



13
14
15
# File 'lib/boss_queue/boss_queue.rb', line 13

def self.environment=(env)
  @@environment = env
end

.queue_prefixObject

:nodoc:



124
125
126
127
128
129
130
131
132
133
# File 'lib/boss_queue/boss_queue.rb', line 124

def self.queue_prefix # :nodoc:
  case self.environment
  when 'production'
    ''
  when 'development'
    'dev_'
  else
    environment + '_'
  end
end

.sqs_queue_url(name) ⇒ Object

:nodoc:



109
110
111
112
# File 'lib/boss_queue/boss_queue.rb', line 109

def self.sqs_queue_url(name) # :nodoc:
  @url_mapping ||= {}
  @url_mapping[name] ||= BossQueue.sqs_queues.url_for(name)
end

.sqs_queuesObject

:nodoc:



105
106
107
# File 'lib/boss_queue/boss_queue.rb', line 105

def self.sqs_queues # :nodoc:
  @sqs_queues ||= AWS::SQS.new.queues
end

Instance Method Details

#create_job(class_or_instance, callback_method, *args) ⇒ Object

:nodoc:



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/boss_queue/boss_queue.rb', line 80

def create_job(class_or_instance, callback_method, *args) # :nodoc:
  job = BossQueue::Job.shard(table_name).new
  if class_or_instance.is_a?(Class)
    class_name = class_or_instance.to_s
    instance_id = nil
  else
    class_name = class_or_instance.class.to_s
    instance_id = class_or_instance.id
  end
  job.queue_name = queue_name
  job.failure_action = failure_action
  job.failure_callback = failure_callback.to_s if failure_action == 'callback' && failure_callback
  job.delete_if_target_missing = delete_if_target_missing if delete_if_target_missing
  job.model_class_name = class_name
  job.model_id = instance_id unless instance_id.nil?
  job.callback = callback_method.to_s
  job.args = JSON.generate(args)
  job.save!
  job
end

#create_queueObject



45
46
47
48
49
50
51
52
53
# File 'lib/boss_queue/boss_queue.rb', line 45

def create_queue
  # small message size because we are only sending id
  # minimum 1 second delay so that we don't even try to pick it up until it is likely that the
  # message is ready (or else we waste time just waiting for the saved data to become available)
  AWS::SQS::QueueCollection.new.create(queue_name, :visibility_timeout => 180,
                                                   :maximum_message_size => 1024,
                                                   :delay_seconds => 1,
                                                   :message_retention_period => 1209600)
end

#create_table(read_capacity = 1, write_capacity = 1, options = {}) ⇒ Object



38
39
40
41
42
43
# File 'lib/boss_queue/boss_queue.rb', line 38

def create_table(read_capacity=1, write_capacity=1, options={})
  create_opts = {}
  create_opts[:hash_key] = { :id => :string }

  AWS::DynamoDB.new.tables.create(table_name, read_capacity, write_capacity, create_opts)
end

#delete_if_target_missingObject



25
26
27
# File 'lib/boss_queue/boss_queue.rb', line 25

def delete_if_target_missing
  @delete_if_target_missing
end

#enqueue(class_or_instance, callback_method, *args) ⇒ Object



70
71
72
73
# File 'lib/boss_queue/boss_queue.rb', line 70

def enqueue(class_or_instance, callback_method, *args)
  job = create_job(class_or_instance, callback_method, *args)
  job.enqueue
end

#enqueue_with_delay(delay, class_or_instance, callback_method, *args) ⇒ Object



75
76
77
78
# File 'lib/boss_queue/boss_queue.rb', line 75

def enqueue_with_delay(delay, class_or_instance, callback_method, *args)
  job = create_job(class_or_instance, callback_method, *args)
  job.enqueue_with_delay(delay)
end

#failure_actionObject



17
18
19
# File 'lib/boss_queue/boss_queue.rb', line 17

def failure_action
  @failure_action ||= 'retry'
end

#failure_callbackObject



21
22
23
# File 'lib/boss_queue/boss_queue.rb', line 21

def failure_callback
  @failure_callback
end

#queue_nameObject



33
34
35
# File 'lib/boss_queue/boss_queue.rb', line 33

def queue_name
  "#{BossQueue.queue_prefix}boss_queue#{@queue_postfix}"
end

#sqs_queueObject

:nodoc:



101
102
103
# File 'lib/boss_queue/boss_queue.rb', line 101

def sqs_queue # :nodoc:
  @sqs_queue ||= BossQueue.sqs_queues[BossQueue.sqs_queue_url(queue_name)]
end

#table_nameObject



29
30
31
# File 'lib/boss_queue/boss_queue.rb', line 29

def table_name
  "#{BossQueue.queue_prefix}boss_queue_jobs"
end

#workObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/boss_queue/boss_queue.rb', line 55

def work
  job_dequeued = false
  sqs_queue.receive_message do |job_id|
    job_dequeued = true
    # When a block is given, each message is yielded to the block and then deleted as long as the block exits normally - http://docs.aws.amazon.com/AWSRubySDK/latest/frames.html
    begin
      job = BossQueue::Job.find_by_id(job_id.body, :shard => table_name, :consistent_read => true)
      job.sqs_queue = sqs_queue
      job.work
    rescue AWS::Record::RecordNotFound
    end
  end
  job_dequeued
end