Class: BossQueue
- Inherits:
-
Object
- Object
- BossQueue
- Defined in:
- lib/boss_queue/job.rb,
lib/boss_queue/boss_queue.rb
Defined Under Namespace
Classes: Job
Constant Summary collapse
- @@environment =
nil
Class Method Summary collapse
-
.environment ⇒ Object
:nodoc:.
- .environment=(env) ⇒ Object
-
.queue_prefix ⇒ Object
:nodoc:.
-
.sqs_queue_url(name) ⇒ Object
:nodoc:.
-
.sqs_queues ⇒ Object
:nodoc:.
Instance Method Summary collapse
-
#create_job(class_or_instance, callback_method, *args) ⇒ Object
:nodoc:.
- #create_queue ⇒ Object
- #create_table(read_capacity = 1, write_capacity = 1, options = {}) ⇒ Object
- #delete_if_target_missing ⇒ Object
- #enqueue(class_or_instance, callback_method, *args) ⇒ Object
- #enqueue_with_delay(delay, class_or_instance, callback_method, *args) ⇒ Object
- #failure_action ⇒ Object
- #failure_callback ⇒ Object
-
#initialize(options = {}) ⇒ BossQueue
constructor
A new instance of BossQueue.
- #queue_name ⇒ Object
-
#sqs_queue ⇒ Object
:nodoc:.
- #table_name ⇒ Object
- #work ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ BossQueue
Returns a new instance of BossQueue.
6 7 8 9 10 11 |
# File 'lib/boss_queue/boss_queue.rb', line 6 def initialize(={}) @failure_action = [:failure_action] if [:failure_action] @failure_callback = [:failure_callback] if [:failure_callback] @delete_if_target_missing = [:delete_if_target_missing] if [:delete_if_target_missing] @queue_postfix = [:queue] ? '_' + [:queue] : '' end |
Class Method Details
.environment ⇒ Object
:nodoc:
114 115 116 117 118 119 120 121 122 |
# File 'lib/boss_queue/boss_queue.rb', line 114 def self.environment # :nodoc: @@environment ||= if Module.const_get('Rails') Rails.env elsif Module.const_get('Rack') Rack.env else raise 'BossQueue requires an environment' end end |
.environment=(env) ⇒ Object
13 14 15 |
# File 'lib/boss_queue/boss_queue.rb', line 13 def self.environment=(env) @@environment = env end |
.queue_prefix ⇒ Object
:nodoc:
124 125 126 127 128 129 130 131 132 133 |
# File 'lib/boss_queue/boss_queue.rb', line 124 def self.queue_prefix # :nodoc: case self.environment when 'production' '' when 'development' 'dev_' else environment + '_' end end |
.sqs_queue_url(name) ⇒ Object
:nodoc:
109 110 111 112 |
# File 'lib/boss_queue/boss_queue.rb', line 109 def self.sqs_queue_url(name) # :nodoc: @url_mapping ||= {} @url_mapping[name] ||= BossQueue.sqs_queues.url_for(name) end |
.sqs_queues ⇒ Object
:nodoc:
105 106 107 |
# File 'lib/boss_queue/boss_queue.rb', line 105 def self.sqs_queues # :nodoc: @sqs_queues ||= AWS::SQS.new.queues end |
Instance Method Details
#create_job(class_or_instance, callback_method, *args) ⇒ Object
:nodoc:
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/boss_queue/boss_queue.rb', line 80 def create_job(class_or_instance, callback_method, *args) # :nodoc: job = BossQueue::Job.shard(table_name).new if class_or_instance.is_a?(Class) class_name = class_or_instance.to_s instance_id = nil else class_name = class_or_instance.class.to_s instance_id = class_or_instance.id end job.queue_name = queue_name job.failure_action = failure_action job.failure_callback = failure_callback.to_s if failure_action == 'callback' && failure_callback job.delete_if_target_missing = delete_if_target_missing if delete_if_target_missing job.model_class_name = class_name job.model_id = instance_id unless instance_id.nil? job.callback = callback_method.to_s job.args = JSON.generate(args) job.save! job end |
#create_queue ⇒ Object
45 46 47 48 49 50 51 52 53 |
# File 'lib/boss_queue/boss_queue.rb', line 45 def create_queue # small message size because we are only sending id # minimum 1 second delay so that we don't even try to pick it up until it is likely that the # message is ready (or else we waste time just waiting for the saved data to become available) AWS::SQS::QueueCollection.new.create(queue_name, :visibility_timeout => 180, :maximum_message_size => 1024, :delay_seconds => 1, :message_retention_period => 1209600) end |
#create_table(read_capacity = 1, write_capacity = 1, options = {}) ⇒ Object
38 39 40 41 42 43 |
# File 'lib/boss_queue/boss_queue.rb', line 38 def create_table(read_capacity=1, write_capacity=1, ={}) create_opts = {} create_opts[:hash_key] = { :id => :string } AWS::DynamoDB.new.tables.create(table_name, read_capacity, write_capacity, create_opts) end |
#delete_if_target_missing ⇒ Object
25 26 27 |
# File 'lib/boss_queue/boss_queue.rb', line 25 def delete_if_target_missing @delete_if_target_missing end |
#enqueue(class_or_instance, callback_method, *args) ⇒ Object
70 71 72 73 |
# File 'lib/boss_queue/boss_queue.rb', line 70 def enqueue(class_or_instance, callback_method, *args) job = create_job(class_or_instance, callback_method, *args) job.enqueue end |
#enqueue_with_delay(delay, class_or_instance, callback_method, *args) ⇒ Object
75 76 77 78 |
# File 'lib/boss_queue/boss_queue.rb', line 75 def enqueue_with_delay(delay, class_or_instance, callback_method, *args) job = create_job(class_or_instance, callback_method, *args) job.enqueue_with_delay(delay) end |
#failure_action ⇒ Object
17 18 19 |
# File 'lib/boss_queue/boss_queue.rb', line 17 def failure_action @failure_action ||= 'retry' end |
#failure_callback ⇒ Object
21 22 23 |
# File 'lib/boss_queue/boss_queue.rb', line 21 def failure_callback @failure_callback end |
#queue_name ⇒ Object
33 34 35 |
# File 'lib/boss_queue/boss_queue.rb', line 33 def queue_name "#{BossQueue.queue_prefix}boss_queue#{@queue_postfix}" end |
#sqs_queue ⇒ Object
:nodoc:
101 102 103 |
# File 'lib/boss_queue/boss_queue.rb', line 101 def sqs_queue # :nodoc: @sqs_queue ||= BossQueue.sqs_queues[BossQueue.sqs_queue_url(queue_name)] end |
#table_name ⇒ Object
29 30 31 |
# File 'lib/boss_queue/boss_queue.rb', line 29 def table_name "#{BossQueue.queue_prefix}boss_queue_jobs" end |
#work ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/boss_queue/boss_queue.rb', line 55 def work job_dequeued = false sqs_queue. do |job_id| job_dequeued = true # When a block is given, each message is yielded to the block and then deleted as long as the block exits normally - http://docs.aws.amazon.com/AWSRubySDK/latest/frames.html begin job = BossQueue::Job.find_by_id(job_id.body, :shard => table_name, :consistent_read => true) job.sqs_queue = sqs_queue job.work rescue AWS::Record::RecordNotFound end end job_dequeued end |