Class: MultiBackgroundJob::Adapters::Faktory
- Defined in:
- lib/multi_background_job/adapters/faktory.rb
Overview
This is a Faktory adapter that converts MultiBackgroundJob::Worker object into a faktory readable format and then push the jobs into the service.
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
-
.coerce_to_worker(payload, **options) ⇒ MultiBackgroundJob::Worker
Coerces the raw payload into an instance of Worker.
-
.push(worker) ⇒ Hash
Initializes adapter and push job into the faktory service.
Instance Method Summary collapse
-
#initialize(worker) ⇒ Faktory
constructor
A new instance of Faktory.
-
#push ⇒ Hash
Push job to Faktory * If job has the ‘at’ key.
Constructor Details
#initialize(worker) ⇒ Faktory
Returns a new instance of Faktory.
10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/multi_background_job/adapters/faktory.rb', line 10 def initialize(worker) @worker = worker @queue = worker..fetch(:queue, 'default') @payload = worker.payload.merge( 'jobtype' => worker.worker_class, 'queue' => @queue, 'retry' => parse_retry(worker.[:retry]), ) @payload['created_at'] ||= Time.now.to_f end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
8 9 10 |
# File 'lib/multi_background_job/adapters/faktory.rb', line 8 def queue @queue end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
8 9 10 |
# File 'lib/multi_background_job/adapters/faktory.rb', line 8 def worker @worker end |
Class Method Details
.coerce_to_worker(payload, **options) ⇒ MultiBackgroundJob::Worker
Coerces the raw payload into an instance of Worker
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/multi_background_job/adapters/faktory.rb', line 26 def self.coerce_to_worker(payload, **) raise(Error, 'invalid payload') unless payload.is_a?(Hash) raise(Error, 'invalid payload') unless payload['jobtype'].is_a?(String) [:retry] ||= payload['retry'] if payload.key?('retry') [:queue] ||= payload['queue'] if payload.key?('queue') MultiBackgroundJob[payload['jobtype'], **].tap do |worker| worker.with_args(*Array(payload['args'])) if payload.key?('args') worker.with_job_jid(payload['jid']) if payload.key?('jid') worker.created_at(payload['created_at']) if payload.key?('created_at') worker.enqueued_at(payload['enqueued_at']) if payload.key?('enqueued_at') worker.at(payload['at']) if payload.key?('at') worker.unique(payload['uniq']) if payload.key?('uniq') end end |
.push(worker) ⇒ Hash
Initializes adapter and push job into the faktory service
48 49 50 |
# File 'lib/multi_background_job/adapters/faktory.rb', line 48 def self.push(worker) new(worker).push end |
Instance Method Details
#push ⇒ Hash
Push job to Faktory
* If job has the 'at' key. Then schedule it
* Otherwise enqueue for immediate execution
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/multi_background_job/adapters/faktory.rb', line 58 def push unless Object.const_defined?(:Faktory) raise MultiBackgroundJob::Error, " Faktory client for ruby is not loaded. You must install and require https://github.com/contribsys/faktory_worker_ruby.\n ERR\n end\n @payload['enqueued_at'] ||= Time.now.to_f\n {'created_at' => false, 'enqueued_at' => false, 'at' => true}.each do |field, past_remove|\n # Optimization to enqueue something now that is scheduled to go out now or in the past\n if (time = @payload.delete(field)) &&\n (!past_remove || (past_remove && time > Time.now.to_f))\n @payload[field] = parse_time(time)\n end\n end\n\n pool = Thread.current[:faktory_via_pool] || ::Faktory.server_pool\n ::Faktory.client_middleware.invoke(@payload, pool) do\n pool.with do |c|\n c.push(@payload)\n end\n end\n @payload\nend\n" |