Class: Isono::NodeModules::JobWorker

Inherits:
Base
  • Object
show all
Includes:
Logger
Defined in:
lib/isono/node_modules/job_worker.rb

Defined Under Namespace

Classes: JobContext

Constant Summary collapse

JOB_CTX_KEY =
:job_worker_ctx

Instance Attribute Summary

Attributes inherited from Base

#node

Instance Method Summary collapse

Methods included from Logger

included, initialize

Methods inherited from Base

#config_section, #initialize, #manifest, #value_object

Constructor Details

This class inherits a constructor from Isono::NodeModules::Base

Instance Method Details

#run(run_cb = nil, fail_cb = nil, &blk) ⇒ Object

Run the block/proc. This is simple utility method for start().

Examples:

Run

run {
  puts "message"
}

Use proc{} for setting both run and fail block.

run(proc{
      # do something
    }, proc{
      # do rollback on fail
    })

Parameters:

  • run_cb (Proc, nil) (defaults to: nil)
  • fail_cb (Proc, nil) (defaults to: nil)


96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/isono/node_modules/job_worker.rb', line 96

def run(run_cb=nil, fail_cb=nil, &blk)
  if run_cb.is_a?(Proc)
    start do |job|
      job.run_cb = run_cb
      job.fail_cb = fail_cb if fail_cb.is_a?(Proc)
    end
  elsif blk
    start do |job|
      job.run_cb = blk
    end
  end
end

#start { ... } ⇒ Object

Start a new long term job.

Examples:

Initialize JobContext within the block.

start { |ctx|
  # setup JobContext
  ctx.job_id = 'xxxx'
  ctx.parent_job_id = 'yyyy'
}

Yields:

  • The block to setup JobContext object.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/isono/node_modules/job_worker.rb', line 44

def start(&blk)
  job = JobContext.new()
  blk.call(job)
  @active_jobs[job.job_id] = job
  rpc = RpcChannel.new(node)

  rpc.request('job-collector', 'regist', job.to_hash) { |req|
    req.oneshot = true
  }
  
  @thread_pool.pass {
    begin
      Thread.current[JOB_CTX_KEY]=job
      job.stm.on_start
      rpc.request('job-collector', 'update', job.to_hash) { |req|
        req.oneshot = true
      }
      job.run_cb.call
      job.stm.on_done
    rescue Exception => e
      job.stm.on_fail(e)
      if job.fail_cb
        job.fail_cb.arity == 1 ? job.fail_cb.call(e) : job.fail_cb.call
      end
    ensure
      Thread.current[JOB_CTX_KEY]=nil
      EventMachine.schedule {
        rpc.request('job-collector', 'update', job.to_hash) { |req|
          req.oneshot = true
        }
        @active_jobs.delete(job.job_id)
      }
    end
  }
  job
end