Class: JobDispatch::Worker::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/job_dispatch/worker/socket.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connect_address, item_klass) ⇒ Socket

Returns a new instance of Socket.



14
15
16
17
18
19
20
# File 'lib/job_dispatch/worker/socket.rb', line 14

def initialize(connect_address, item_klass)
  @socket = JobDispatch.context.socket(ZMQ::REQ)
  @socket.connect(connect_address)
  @touch_socket = JobDispatch.context.socket(ZMQ::DEALER)
  @touch_socket.connect(connect_address)
  @item_class = item_klass
end

Instance Attribute Details

#item_classObject (readonly)

Returns the value of attribute item_class.



12
13
14
# File 'lib/job_dispatch/worker/socket.rb', line 12

def item_class
  @item_class
end

#socketObject (readonly)

Returns the value of attribute socket.



10
11
12
# File 'lib/job_dispatch/worker/socket.rb', line 10

def socket
  @socket
end

#touch_socketObject (readonly)

Returns the value of attribute touch_socket.



11
12
13
# File 'lib/job_dispatch/worker/socket.rb', line 11

def touch_socket
  @touch_socket
end

Instance Method Details

#ask_for_work(queue) ⇒ Object



26
27
28
# File 'lib/job_dispatch/worker/socket.rb', line 26

def ask_for_work(queue)
  @socket.send(JSON.dump({command: 'ready', queue: queue, worker_name: identity}))
end

#closeObject



34
35
36
37
38
39
40
41
42
43
# File 'lib/job_dispatch/worker/socket.rb', line 34

def close
  if @socket
    @socket.close rescue nil
    @socket = nil
  end
  if @touch_socket
    @touch_socket.close rescue nil
    @touch_socket = nil
  end
end

#drain_touch_socketObject

drain any messages that may have been received on the touch socket.



82
83
84
85
86
87
# File 'lib/job_dispatch/worker/socket.rb', line 82

def drain_touch_socket
  loop do
    message = @touch_socket.recv_nonblock
    break if message.nil?
  end
end

#identityObject



45
46
47
48
49
50
51
52
# File 'lib/job_dispatch/worker/socket.rb', line 45

def identity
  @identity ||= begin
    hostname = ::Socket.gethostname
    process = Process.pid
    thread = Thread.current.object_id.to_s(16)
    ['ruby', hostname, process, thread].join(':')
  end
end

#poll_itemObject



22
23
24
# File 'lib/job_dispatch/worker/socket.rb', line 22

def poll_item
  @poll_item ||= ZMQ::Pollitem(@socket, ZMQ::POLLIN)
end

#read_itemJobDispatch::Item

read an incoming message. The thread will block if there is no readable message.

Returns:

  • (JobDispatch::Item)

    the item to be processed (or nil if there isn’t a valid job)



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/job_dispatch/worker/socket.rb', line 57

def read_item
  begin
    drain_touch_socket
    json = @socket.recv
    params = JSON.parse(json)
    case params["command"]
      when "job"
        item = item_class.new params["target"], params["method"], *params["parameters"]
      when "idle"
        item = item_class.new "JobDispatch", "idle"
      when "quit"
        puts "It's quittin' time!"
        Process.exit(0)
      else
        item = item_class.new "JobDispatch", "unknown_command", params
    end
    item.job_id = params["job_id"]
  rescue StandardError => e
    JobDispatch.logger.error "Failed to read message from worker socket: #{e}"
    nil
  end
  item
end

#send_goodbye(queue) ⇒ Object



30
31
32
# File 'lib/job_dispatch/worker/socket.rb', line 30

def send_goodbye(queue)
  @socket.send(JSON.dump({command: 'goodbye', worker_name: identity}))
end

#send_response(job_id, status, result) ⇒ Object

after execution, send the response.



90
91
92
93
94
95
96
97
98
99
100
# File 'lib/job_dispatch/worker/socket.rb', line 90

def send_response(job_id, status, result)
  JobDispatch.logger.info "Worker #{Process.pid} completed job_id: #{job_id}: #{status}, result: #{result}"
  response = {
      command: 'completed',
      ready: true,
      job_id: job_id,
      result: result,
      status: status
  }
  @socket.send(JSON.dump(response))
end

#send_touch(job_id, timeout = nil) ⇒ Object



102
103
104
105
106
107
108
109
# File 'lib/job_dispatch/worker/socket.rb', line 102

def send_touch(job_id, timeout=nil)
  hash = {
      command: 'touch',
      job_id: job_id
  }
  hash[:timeout] = timeout if timeout
  @touch_socket.send(JSON.dump(hash))
end