Class: Rjob::Context
- Inherits:
-
Object
- Object
- Rjob::Context
- Defined in:
- lib/rjob/context.rb
Instance Attribute Summary collapse
-
#bucket_count ⇒ Object
readonly
Returns the value of attribute bucket_count.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#job_wrapper_proc ⇒ Object
readonly
Returns the value of attribute job_wrapper_proc.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#prefix ⇒ Object
readonly
Returns the value of attribute prefix.
-
#recurring_jobs ⇒ Object
readonly
Returns the value of attribute recurring_jobs.
-
#script_runner ⇒ Object
readonly
Returns the value of attribute script_runner.
Class Method Summary collapse
-
.configure {|config| ... } ⇒ Object
Available options:.
- .instance ⇒ Object
- .set_instance(instance) ⇒ Object
Instance Method Summary collapse
- #create_redis_connection ⇒ Object
- #demodularize_class(name) ⇒ Object
- #enqueue_job(job_class, args) ⇒ Object
- #enqueue_job_with_redis(job_class, args, r) ⇒ Object
- #fetch_worker_class(class_name:) ⇒ Object
-
#initialize(config) ⇒ Context
constructor
A new instance of Context.
- #redis(&block) ⇒ Object
- #schedule_job_at(timestamp, job_class, args) ⇒ Object
Constructor Details
#initialize(config) ⇒ Context
Returns a new instance of Context.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/rjob/context.rb', line 38 def initialize(config) @config = config.dup @pool_size = @config.fetch(:redis_pool_size, 10) @bucket_count = config.fetch(:bucket_count, 32) @prefix = config.fetch(:prefix, 'rjob') @logger = config[:logger] @job_wrapper_proc = config[:job_wrapper_proc] @script_runner = Rjob::Scripts::ScriptRunner.new @recurring_jobs = nil if config.key?(:recurring_jobs) require "rjob/recurring" @recurring_jobs = config[:recurring_jobs].map do |defn| Rjob::RecurringJob.from_definition(self, defn) end end initialize_connection_pool load_redis_scripts end |
Instance Attribute Details
#bucket_count ⇒ Object (readonly)
Returns the value of attribute bucket_count.
6 7 8 |
# File 'lib/rjob/context.rb', line 6 def bucket_count @bucket_count end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
4 5 6 |
# File 'lib/rjob/context.rb', line 4 def config @config end |
#job_wrapper_proc ⇒ Object (readonly)
Returns the value of attribute job_wrapper_proc.
8 9 10 |
# File 'lib/rjob/context.rb', line 8 def job_wrapper_proc @job_wrapper_proc end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
7 8 9 |
# File 'lib/rjob/context.rb', line 7 def logger @logger end |
#prefix ⇒ Object (readonly)
Returns the value of attribute prefix.
5 6 7 |
# File 'lib/rjob/context.rb', line 5 def prefix @prefix end |
#recurring_jobs ⇒ Object (readonly)
Returns the value of attribute recurring_jobs.
10 11 12 |
# File 'lib/rjob/context.rb', line 10 def recurring_jobs @recurring_jobs end |
#script_runner ⇒ Object (readonly)
Returns the value of attribute script_runner.
9 10 11 |
# File 'lib/rjob/context.rb', line 9 def script_runner @script_runner end |
Class Method Details
.configure {|config| ... } ⇒ Object
Available options:
:redis - (passed to Redis.new) :max_threads - paralallelism :bucket_count - defaults to 32 :redis_pool_size - redis connection pool size. Defaults to 10 :prefix - defaults to “rjob” :job_wrapper_proc - defaults to none :logger - duck-typed Logger, defaults to nil
31 32 33 34 35 36 |
# File 'lib/rjob/context.rb', line 31 def self.configure raise "Already configured!: #{@instance}" if @instance config = {} yield(config) set_instance(new(config)) end |
.instance ⇒ Object
12 13 14 15 |
# File 'lib/rjob/context.rb', line 12 def self.instance return @instance if @instance raise "Rjob is not configured. Please call Rjob.configure first" end |
.set_instance(instance) ⇒ Object
17 18 19 |
# File 'lib/rjob/context.rb', line 17 def self.set_instance(instance) @instance = instance end |
Instance Method Details
#create_redis_connection ⇒ Object
94 95 96 97 |
# File 'lib/rjob/context.rb', line 94 def create_redis_connection redis_args = @config[:redis] Redis.new(redis_args) end |
#demodularize_class(name) ⇒ Object
86 87 88 89 90 91 92 |
# File 'lib/rjob/context.rb', line 86 def demodularize_class(name) const = Kernel name.split('::').each do |n| const = const.const_get(n) end const end |
#enqueue_job(job_class, args) ⇒ Object
65 66 67 |
# File 'lib/rjob/context.rb', line 65 def enqueue_job(job_class, args) redis(&method(:enqueue_job_with_redis).curry[job_class, args]) end |
#enqueue_job_with_redis(job_class, args, r) ⇒ Object
69 70 71 72 |
# File 'lib/rjob/context.rb', line 69 def enqueue_job_with_redis(job_class, args, r) job_data = MessagePack.pack([job_class.to_s, args]) @script_runner.exec(r, :enqueue_job, [], [@prefix, @bucket_count, job_data]) end |
#fetch_worker_class(class_name:) ⇒ Object
82 83 84 |
# File 'lib/rjob/context.rb', line 82 def fetch_worker_class(class_name:) demodularize_class(class_name) end |
#redis(&block) ⇒ Object
61 62 63 |
# File 'lib/rjob/context.rb', line 61 def redis(&block) @pool.with(&block) end |
#schedule_job_at(timestamp, job_class, args) ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/rjob/context.rb', line 74 def schedule_job_at(, job_class, args) job_data = MessagePack.pack([job_class.to_s, args]) redis do |r| @script_runner.exec(r, :schedule_job_at, [], [.to_s, job_data, @prefix, @bucket_count]) end end |