Class: JobDispatch::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/job_dispatch/client.rb,
lib/job_dispatch/client/proxy.rb,
lib/job_dispatch/client/proxy_error.rb,
lib/job_dispatch/client/synchronous_proxy.rb

Overview

This is a simple class for making synchronous calls to the Job Queue dispatcher.

Defined Under Namespace

Classes: Proxy, ProxyError, SynchronousProxy

Instance Method Summary collapse

Constructor Details

#initialize(connect_address = nil) ⇒ Client

Returns a new instance of Client.



9
10
11
12
# File 'lib/job_dispatch/client.rb', line 9

def initialize(connect_address=nil)
  @socket = JobDispatch.context.socket(ZMQ::REQ)
  @socket.connect(connect_address || JobDispatch.config.broker[:connect])
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, **kwargs) ⇒ Object



23
24
25
26
27
# File 'lib/job_dispatch/client.rb', line 23

def method_missing(method, *args, ** kwargs)
  payload = kwargs
  payload[:parameters] = args
  send_request(method, payload)
end

Instance Method Details

#closeObject



37
38
39
40
41
42
# File 'lib/job_dispatch/client.rb', line 37

def close
  if @socket
    @socket.close
    @socket = nil
  end
end

#enqueue(job_attrs = {}) ⇒ Object

Enqueue a job to be processed describe by the passed job attributes.

Required attributes:

target: The target object that will execute the job. typically a class.
method: the message to be sent to the target.

Optional:

parameters: an array of parameters to be passed to the method.
timeout: number of seconds after which the job is considered timed out and failed.


52
53
54
# File 'lib/job_dispatch/client.rb', line 52

def enqueue(job_attrs={})
  send_request('enqueue', {job: job_attrs})
end

#fetch(job_id) ⇒ Object

fetch the complete details for hte last job



67
68
69
# File 'lib/job_dispatch/client.rb', line 67

def fetch(job_id)
  job_or_raise send_request('fetch', {job_id: job_id})
end

#last(queue = nil) ⇒ Object

as the dispatcher what was the last job enqueued on the given queue (or default)



62
63
64
# File 'lib/job_dispatch/client.rb', line 62

def last(queue=nil)
  job_or_raise send_request('last', {queue: queue||'default'})
end

#notify(job_id) ⇒ Object

send a message to the dispatcher requesting to be notified when the job completes (or fails).



57
58
59
# File 'lib/job_dispatch/client.rb', line 57

def notify(job_id)
  send_request('notify', {job_id: job_id})
end

#proxy_for(target, options = {}) ⇒ Object



29
30
31
# File 'lib/job_dispatch/client.rb', line 29

def proxy_for(target, options={})
  Proxy.new(self, target, options)
end

#send_request(command, options = {}) ⇒ Object



14
15
16
17
18
19
20
21
# File 'lib/job_dispatch/client.rb', line 14

def send_request(command, options={})
  options[:command] = command
  @socket.send(JSON.dump(options))
  json = @socket.recv
  #puts "Received: #{json}"
  response = JSON.parse(json)
  response.is_a?(Hash) ? response.with_indifferent_access : response
end

#synchronous_proxy_for(target, options = {}) ⇒ Object



33
34
35
# File 'lib/job_dispatch/client.rb', line 33

def synchronous_proxy_for(target, options={})
  SynchronousProxy.new(self, target, options)
end