Class: MultiBackgroundJob::Adapters::Faktory

Inherits:
Adapter
  • Object
show all
Defined in:
lib/multi_background_job/adapters/faktory.rb

Overview

This is a Faktory adapter that converts MultiBackgroundJob::Worker object into a faktory readable format and then push the jobs into the service.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker) ⇒ Faktory

Returns a new instance of Faktory.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/multi_background_job/adapters/faktory.rb', line 10

def initialize(worker)
  @worker = worker
  @queue = worker.options.fetch(:queue, 'default')

  @payload = worker.payload.merge(
    'jobtype' => worker.worker_class,
    'queue'   => @queue,
    'retry'   => parse_retry(worker.options[:retry]),
  )
  @payload['created_at'] ||= Time.now.to_f
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



8
9
10
# File 'lib/multi_background_job/adapters/faktory.rb', line 8

def queue
  @queue
end

#workerObject (readonly)

Returns the value of attribute worker.



8
9
10
# File 'lib/multi_background_job/adapters/faktory.rb', line 8

def worker
  @worker
end

Class Method Details

.coerce_to_worker(payload, **options) ⇒ MultiBackgroundJob::Worker

Coerces the raw payload into an instance of Worker

Parameters:

  • payload (Hash)

    The job as json from redis

Returns:

Raises:



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/multi_background_job/adapters/faktory.rb', line 26

def self.coerce_to_worker(payload, **options)
  raise(Error, 'invalid payload') unless payload.is_a?(Hash)
  raise(Error, 'invalid payload') unless payload['jobtype'].is_a?(String)

  options[:retry] ||= payload['retry'] if payload.key?('retry')
  options[:queue] ||= payload['queue'] if payload.key?('queue')

  MultiBackgroundJob[payload['jobtype'], **options].tap do |worker|
    worker.with_args(*Array(payload['args'])) if payload.key?('args')
    worker.with_job_jid(payload['jid']) if payload.key?('jid')
    worker.created_at(payload['created_at']) if payload.key?('created_at')
    worker.enqueued_at(payload['enqueued_at']) if payload.key?('enqueued_at')
    worker.at(payload['at']) if payload.key?('at')
    worker.unique(payload['uniq']) if payload.key?('uniq')
  end
end

.push(worker) ⇒ Hash

Initializes adapter and push job into the faktory service

Parameters:

Returns:

  • (Hash)

    Job payload

See Also:



48
49
50
# File 'lib/multi_background_job/adapters/faktory.rb', line 48

def self.push(worker)
  new(worker).push
end

Instance Method Details

#pushHash

Push job to Faktory

* If job has the 'at' key. Then schedule it
* Otherwise enqueue for immediate execution

Returns:

  • (Hash)

    Payload that was sent to server

Raises:



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/multi_background_job/adapters/faktory.rb', line 58

def push
  unless Object.const_defined?(:Faktory)
    raise MultiBackgroundJob::Error, "    Faktory client for ruby is not loaded. You must install and require https://github.com/contribsys/faktory_worker_ruby.\n    ERR\n  end\n  @payload['enqueued_at'] ||= Time.now.to_f\n  {'created_at' => false, 'enqueued_at' => false, 'at' => true}.each do |field, past_remove|\n    # Optimization to enqueue something now that is scheduled to go out now or in the past\n    if (time = @payload.delete(field)) &&\n        (!past_remove || (past_remove && time > Time.now.to_f))\n      @payload[field] = parse_time(time)\n    end\n  end\n\n  pool = Thread.current[:faktory_via_pool] || ::Faktory.server_pool\n  ::Faktory.client_middleware.invoke(@payload, pool) do\n    pool.with do |c|\n      c.push(@payload)\n    end\n  end\n  @payload\nend\n"