Class: Isono::NodeModules::JobChannel

Inherits:
Base
  • Object
show all
Defined in:
lib/isono/node_modules/job_channel.rb

Instance Attribute Summary

Attributes inherited from Base

#node

Instance Method Summary collapse

Methods inherited from Base

#config_section, #initialize, #manifest, #value_object

Constructor Details

This class inherits a constructor from Isono::NodeModules::Base

Instance Method Details

#cancel(job_id) ⇒ Object



73
74
# File 'lib/isono/node_modules/job_channel.rb', line 73

def cancel(job_id)
end

#register_endpoint(endpoint, app, opts = {}) ⇒ Object

Raises:

  • (ArgumentError)


76
77
78
79
80
81
82
83
84
# File 'lib/isono/node_modules/job_channel.rb', line 76

def register_endpoint(endpoint, app, opts={})
  raise ArgumentError unless endpoint.is_a?(String)
  raise ArgumentError unless app.respond_to?(:call)
  opts = {
    :concurrency=>config_section.concurrency,
    :thread_pool => nil,
  }.merge(opts)
  rpc.register_endpoint("job.#{endpoint}", Rack::Job.new(app, JobWorker.new(@node, opts[:thread_pool])), {:prefetch=>opts[:concurrency]})
end

#run(endpoint, command, *args) {|modify| ... } ⇒ Object

Send a new job request and wait until the job finished. The difference to submit() is that this method will stop the thread until the called job completed.

Examples:

wait while the job running.

run('endpoint1', 'command1', 1, 2, 3) #=> result of 'endpoint1/command1'.

receive progress messages from long running job.

run('endpoint1', 'command1', 1, 2, 3) do |req|
   req.on_progress { |r|
     puts r #=> show progress message
   }
end

Parameters:

  • endpoint (String)

    endpoint name created by JobChannel#register_endpoint()

  • command (String)

    command name in the endpoint.

  • *args (any)

    arguments to run the job.

Yield Parameters:

  • modify (RpcChannel::RequestContext)

    RPC request before sending. The method will stop the current thread if it does not exist.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/isono/node_modules/job_channel.rb', line 51

def run(endpoint, command, *args, &blk)
  cur_job_ctx = Thread.current[JobWorker::JOB_CTX_KEY]
  req = rpc.request("job.#{endpoint}", command, *args) { |req|
    req.timeout_sec = config_section.timeout_sec
    req.request[:job_id] = Util.gen_id
    req.request[:session_id] = req.request[:job_id]
    req.request[:job_name] = command
   
    # A job is working on this current thread if cur_job_ctx is
    # not nil. Let the new job know the current job ID
    # as its parent job ID.
    if cur_job_ctx
      req.request[:parent_job_id] = cur_job_ctx.job_id
      req.request[:session_id] = cur_job_ctx.session_id
    end

    blk ? blk.call(req) : req.synchronize
  }

  blk ? req : req.wait
end

#submit(endpoint, command, *args) {|modify| ... } ⇒ Rack::Request, String

Send a new job request to the endpoint.

Examples:

call job endpoint ‘endpoint1’.

puts  submit('endpoint1', 'command1', 1, 2, 3) # => show Job ID

Parameters:

  • endpoint (String)

    endpoint name created by JobChannel#register_endpoint()

  • command (String)

    command name in the endpoint.

  • *args (any)

    arguments to run the job.

Yield Parameters:

  • modify (RpcChannel::RequestContext)

    RPC request before sending. The method will stop the current thread if it does not exist.

Returns:



25
26
27
28
29
30
31
# File 'lib/isono/node_modules/job_channel.rb', line 25

def submit(endpoint, command, *args, &blk)
  req = run(endpoint, command, *args) { |req|
    blk.call(req) if blk
  }

  req.request[:job_id]
end