Class: ActiveJobK8s::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/active_job_k8s/scheduler.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**opts) ⇒ Scheduler

Returns a new instance of Scheduler.

Parameters:

  • opts (Hash{ kubeclient_context: [Kubeclient::Config::Context] })


11
12
13
14
15
16
# File 'lib/active_job_k8s/scheduler.rb', line 11

def initialize(**opts)
  raise "No KubeClientContext given" if opts[:kubeclient_context].nil?
  # or to use a specific context, by name:
  @kubeclient_context = opts[:kubeclient_context]

end

Instance Attribute Details

#kubeclient_contextObject (readonly)

Returns the value of attribute kubeclient_context.



8
9
10
# File 'lib/active_job_k8s/scheduler.rb', line 8

def kubeclient_context
  @kubeclient_context
end

Class Method Details

.execute_jobObject



46
47
48
# File 'lib/active_job_k8s/scheduler.rb', line 46

def self.execute_job
  ActiveJob::Base.execute(JSON.parse(ENV['SERIALIZED_JOB']))
end

Instance Method Details

#create_job(job) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/active_job_k8s/scheduler.rb', line 18

def create_job(job)

  serialized_job = JSON.dump(job.serialize)

  kube_job = Kubeclient::Resource.new(job.manifest)

  # kube_job.spec.suspend = false FIXME complete for delayed jobs
  kube_job..name = "#{kube_job.metadata.name}-#{job.job_id}"
  kube_job..job_id = job.job_id
  kube_job..queue_name = job.queue_name
  kube_job..namespace = kube_job..namespace || kubeclient_context.namespace
  kube_job.spec.template.spec.containers.map do |container|
    container.env ||= []
    container.env.push({
                         'name' => 'SERIALIZED_JOB',
                         'value' => serialized_job
                       })

    if container.command.blank?
      container.command = ["rails"]
      container.args = ["active_job_k8s:run_job"]
    end
  end
  kube_job.spec.ttlSecondsAfterFinished = 300 #number of seconds the job will be erased

  client.create_job(kube_job)
end