Class: JobDispatch::Client
- Inherits:
-
Object
- Object
- JobDispatch::Client
- Defined in:
- lib/job_dispatch/client.rb,
lib/job_dispatch/client/proxy.rb,
lib/job_dispatch/client/proxy_error.rb,
lib/job_dispatch/client/synchronous_proxy.rb
Overview
This is a simple class for making synchronous calls to the Job Queue dispatcher.
Defined Under Namespace
Classes: Proxy, ProxyError, SynchronousProxy
Instance Method Summary collapse
- #close ⇒ Object
-
#enqueue(job_attrs = {}) ⇒ Object
Enqueue a job to be processed describe by the passed job attributes.
-
#fetch(job_id) ⇒ Object
fetch the complete details for hte last job.
-
#initialize(connect_address = nil) ⇒ Client
constructor
A new instance of Client.
-
#last(queue = nil) ⇒ Object
as the dispatcher what was the last job enqueued on the given queue (or default).
- #method_missing(method, *args, **kwargs) ⇒ Object
-
#notify(job_id) ⇒ Object
send a message to the dispatcher requesting to be notified when the job completes (or fails).
- #proxy_for(target, options = {}) ⇒ Object
- #send_request(command, options = {}) ⇒ Object
- #synchronous_proxy_for(target, options = {}) ⇒ Object
Constructor Details
#initialize(connect_address = nil) ⇒ Client
Returns a new instance of Client.
9 10 11 12 |
# File 'lib/job_dispatch/client.rb', line 9 def initialize(connect_address=nil) @socket = JobDispatch.context.socket(ZMQ::REQ) @socket.connect(connect_address || JobDispatch.config.broker[:connect]) end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method, *args, **kwargs) ⇒ Object
23 24 25 26 27 |
# File 'lib/job_dispatch/client.rb', line 23 def method_missing(method, *args, ** kwargs) payload = kwargs payload[:parameters] = args send_request(method, payload) end |
Instance Method Details
#close ⇒ Object
37 38 39 40 41 42 |
# File 'lib/job_dispatch/client.rb', line 37 def close if @socket @socket.close @socket = nil end end |
#enqueue(job_attrs = {}) ⇒ Object
Enqueue a job to be processed describe by the passed job attributes.
Required attributes:
target: The target object that will execute the job. typically a class.
method: the message to be sent to the target.
Optional:
parameters: an array of parameters to be passed to the method.
timeout: number of seconds after which the job is considered timed out and failed.
52 53 54 |
# File 'lib/job_dispatch/client.rb', line 52 def enqueue(job_attrs={}) send_request('enqueue', {job: job_attrs}) end |
#fetch(job_id) ⇒ Object
fetch the complete details for hte last job
67 68 69 |
# File 'lib/job_dispatch/client.rb', line 67 def fetch(job_id) job_or_raise send_request('fetch', {job_id: job_id}) end |
#last(queue = nil) ⇒ Object
as the dispatcher what was the last job enqueued on the given queue (or default)
62 63 64 |
# File 'lib/job_dispatch/client.rb', line 62 def last(queue=nil) job_or_raise send_request('last', {queue: queue||'default'}) end |
#notify(job_id) ⇒ Object
send a message to the dispatcher requesting to be notified when the job completes (or fails).
57 58 59 |
# File 'lib/job_dispatch/client.rb', line 57 def notify(job_id) send_request('notify', {job_id: job_id}) end |
#proxy_for(target, options = {}) ⇒ Object
29 30 31 |
# File 'lib/job_dispatch/client.rb', line 29 def proxy_for(target, ={}) Proxy.new(self, target, ) end |
#send_request(command, options = {}) ⇒ Object
14 15 16 17 18 19 20 21 |
# File 'lib/job_dispatch/client.rb', line 14 def send_request(command, ={}) [:command] = command @socket.send(JSON.dump()) json = @socket.recv #puts "Received: #{json}" response = JSON.parse(json) response.is_a?(Hash) ? response.with_indifferent_access : response end |
#synchronous_proxy_for(target, options = {}) ⇒ Object
33 34 35 |
# File 'lib/job_dispatch/client.rb', line 33 def synchronous_proxy_for(target, ={}) SynchronousProxy.new(self, target, ) end |