Class: JobDispatch::Worker::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/job_dispatch/worker/socket.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connect_address, item_klass) ⇒ Socket

Returns a new instance of Socket.



13
14
15
16
17
# File 'lib/job_dispatch/worker/socket.rb', line 13

def initialize(connect_address, item_klass)
  @socket = JobDispatch.context.socket(ZMQ::REQ)
  @socket.connect(connect_address)
  @item_class = item_klass
end

Instance Attribute Details

#item_classObject (readonly)

Returns the value of attribute item_class.



11
12
13
# File 'lib/job_dispatch/worker/socket.rb', line 11

def item_class
  @item_class
end

#socketObject (readonly)

Returns the value of attribute socket.



10
11
12
# File 'lib/job_dispatch/worker/socket.rb', line 10

def socket
  @socket
end

Instance Method Details

#ask_for_work(queue) ⇒ Object



23
24
25
# File 'lib/job_dispatch/worker/socket.rb', line 23

def ask_for_work(queue)
  @socket.send(JSON.dump({command: 'ready', queue: queue, worker_name: identity}))
end

#closeObject



31
32
33
# File 'lib/job_dispatch/worker/socket.rb', line 31

def close
  @socket.close
end

#identityObject



35
36
37
38
39
40
41
42
# File 'lib/job_dispatch/worker/socket.rb', line 35

def identity
  @identity ||= begin
    hostname = ::Socket.gethostname
    process = Process.pid
    thread = Thread.current.object_id.to_s(16)
    ['ruby', hostname, process, thread].join(':')
  end
end

#poll_itemObject



19
20
21
# File 'lib/job_dispatch/worker/socket.rb', line 19

def poll_item
  @poll_item ||= ZMQ::Pollitem(@socket, ZMQ::POLLIN)
end

#read_itemJobDispatch::Item

read an incoming message. The thread will block if there is no readable message.

Returns:

  • (JobDispatch::Item)

    the item to be processed (or nil if there isn’t a valid job)



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/job_dispatch/worker/socket.rb', line 47

def read_item
  json = @socket.recv
  begin
    params = JSON.parse(json)
    case params["command"]
      when "job"
        item = item_class.new params["target"], params["method"], *params["parameters"]
      when "idle"
        item = item_class.new "JobDispatch", "idle"
      when "quit"
        puts "It's quittin' time!"
        Process.exit(0)
      else
        item = item_class.new "JobDispatch", "unknown_command", params
    end
    item.job_id = params["job_id"]
  rescue StandardError => e
    JobDispatch.logger.error "Failed to read message from worker socket: #{e}"
    nil
  end
  item
end

#send_goodbye(queue) ⇒ Object



27
28
29
# File 'lib/job_dispatch/worker/socket.rb', line 27

def send_goodbye(queue)
  @socket.send(JSON.dump({command: 'goodbye', worker_name: identity}))
end

#send_response(job_id, status, result) ⇒ Object

after execution, send the response.



71
72
73
74
75
76
77
78
79
80
81
# File 'lib/job_dispatch/worker/socket.rb', line 71

def send_response(job_id, status, result)
  JobDispatch.logger.info "Worker #{Process.pid} completed job_id: #{job_id}: #{status}, result: #{result}"
  response = {
      command: 'completed',
      ready: true,
      job_id: job_id,
      result: result,
      status: status
  }
  @socket.send(JSON.dump(response))
end

#send_touch(job_id, timeout = nil) ⇒ Object



83
84
85
86
87
88
89
90
91
92
# File 'lib/job_dispatch/worker/socket.rb', line 83

def send_touch(job_id, timeout=nil)
  hash = {
      command: 'touch',
      job_id: job_id
  }
  hash[:timeout] = timeout if timeout
  @socket.send(JSON.dump(hash))
  json = @socket.recv # wait for acknowledgement... this could be done via pub/sub to be asynchronous.
  JSON.parse(json) rescue {:error => "Failed to decode JSON from dispatcher: #{json}"}
end