Class: Isono::NodeModules::JobWorker

Inherits:
Base
  • Object
show all
Includes:
Logger
Defined in:
lib/isono/node_modules/job_worker.rb

Defined Under Namespace

Classes: JobContext

Constant Summary collapse

JOB_CTX_KEY =
:job_worker_ctx

Instance Attribute Summary

Attributes inherited from Base

#node

Instance Method Summary collapse

Methods included from Logger

included, initialize

Methods inherited from Base

#config_section, #manifest, #value_object

Constructor Details

#initialize(node, thread_pool = nil) ⇒ JobWorker

Returns a new instance of JobWorker.

Raises:

  • (ArgumentError)


34
35
36
37
38
# File 'lib/isono/node_modules/job_worker.rb', line 34

def initialize(node, thread_pool=nil)
  super(node)
  raise ArgumentError unless thread_pool.nil? || thread_pool.is_a?(ThreadPool)
  @thread_pool = thread_pool || @default_thread_pool
end

Instance Method Details

#run(run_cb = nil, fail_cb = nil, &blk) ⇒ Object

Run the block/proc. This is simple utility method for start().

Examples:

Run

run {
  puts "message"
}

Use proc{} for setting both run and fail block.

run(proc{
      # do something
    }, proc{
      # do rollback on fail
    })

Parameters:

  • run_cb (Proc, nil) (defaults to: nil)
  • fail_cb (Proc, nil) (defaults to: nil)


107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/isono/node_modules/job_worker.rb', line 107

def run(run_cb=nil, fail_cb=nil, &blk)
  if run_cb.is_a?(Proc)
    start do |job|
      job.run_cb = run_cb
      job.fail_cb = fail_cb if fail_cb.is_a?(Proc)
    end
  elsif blk
    start do |job|
      job.run_cb = blk
    end
  end
end

#start { ... } ⇒ Object

Start a new long term job.

Examples:

Initialize JobContext within the block.

start { |ctx|
  # setup JobContext
  ctx.job_id = 'xxxx'
  ctx.parent_job_id = 'yyyy'
}

Yields:

  • The block to setup JobContext object.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/isono/node_modules/job_worker.rb', line 50

def start(&blk)
  job = JobContext.new()
  blk.call(job)
  @active_jobs[job.job_id] = job
  rpc = RpcChannel.new(node)

  rpc.request('job-collector', 'record', job.to_hash) { |req|
    req.oneshot = true
  }
  
  @thread_pool.pass {
    begin
      Thread.current[JOB_CTX_KEY]=job
      job.process_event(:on_start)
      rpc.request('job-collector', 'record', job.to_hash) { |req|
        req.oneshot = true
      }
      job.run_cb.call
      job.process_event(:on_done)
    rescue Exception => e
      job.process_event(:on_fail, e)
      if job.fail_cb
        begin
          job.fail_cb.arity == 1 ? job.fail_cb.call(e) : job.fail_cb.call
        rescue Exception => e
          logger.error("Failed to call the failure call back #{job.fail_cb}: #{job_id}: #{e}")
          logger.error(e)
        end
      end
    ensure
      Thread.current[JOB_CTX_KEY]=nil
      EventMachine.schedule {
        rpc.request('job-collector', 'record', job.to_hash) { |req|
          req.oneshot = true
        }
        @active_jobs.delete(job.job_id)
      }
    end
  }
  job
end