Class: Pigato::Worker

Inherits:
Base
  • Object
show all
Defined in:
lib/pigato/worker.rb

Constant Summary collapse

HEARTBEAT_LIVENESS =

3-5 is reasonable

3

Instance Method Summary collapse

Methods inherited from Base

#get_iid, #get_mtx, #get_proc_id, #get_socket, #get_thread_id, #init, #sock_close, #sock_create

Constructor Details

#initialize(broker, service, conf = {}) ⇒ Worker

Returns a new instance of Worker.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/pigato/worker.rb', line 7

def initialize broker, service, conf = {}
  @broker = broker
  @service = service

  @conf = {
    :autostart => false,
    :timeout => 2500,
    :reconnect => 2500
  }
  
  @conf.merge!(conf)

  @heartbeat_at = Time.now - 60
  @liveness = 0
  @reply_to = nil
  @reply_rid = nil
  @reply_service = nil

  init

  if @conf[:autostart]
    start
  end

  @@mtx.lock
  if @@global_thread.nil?
    @@global_thread = true
    Thread.new do
      client = Pigato::Client.new(broker, { :autostart => true })
      loop do
        @@mtx.lock
        begin
          if Time.now > @@global_heartbeat_at
            @@sockets_ids.each do |iid, sid|
              request = [Pigato::C_CLIENT, Pigato::W_HEARTBEAT, "worker", sid]
              msg = ZMQ::Message.new
              request.reverse.each{|p| msg.push(ZMQ::Frame(p))}
              client.send msg
            end
            @@global_heartbeat_at = Time.now + 2.5
          end
        rescue => e
          puts e
        end
        @@mtx.unlock
        sleep 2.5
      end
    end
  end
  @@mtx.unlock
end

Instance Method Details

#recvObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/pigato/worker.rb', line 64

def recv
  val = nil

  @reply_rid = nil
  @reply_to = nil
  @reply_service = nil

  iid = get_iid

  start if @@sockets[iid] == nil && @conf[:autostart]

  socket = get_socket
  return nil if socket.nil?

  socket.rcvtimeo = @conf[:timeout]

  msg = socket.recv_message

  if msg && msg.size
    @liveness = HEARTBEAT_LIVENESS

    header = msg.pop.data
    if header != Pigato::W_WORKER
      puts "E: Header is not Pigato::WORKER"
      return nil
    end

    command = msg.pop.data

    case command
    when Pigato::W_REQUEST
      @reply_to = msg.pop.data
      @reply_service = msg.pop.data
      msg.pop # empty
      @reply_rid = msg.pop.data
      val = Oj.load(msg.pop.data) 
    when Pigato::W_HEARTBEAT
    when Pigato::W_DISCONNECT
      start
    else
    end
  else
    @liveness -= 1
    if @liveness == 0
      sleep 0.001 * @conf[:reconnect]
      start
    end
  end
    
  if Time.now > @heartbeat_at
    send(Pigato::W_HEARTBEAT, ['', Oj.dump({ 'concurrency' => 1 })])
    @heartbeat_at = Time.now + 0.001 * (@conf[:timeout] * 1.5)
  end
  
  val
end

#reply(reply) ⇒ Object



59
60
61
62
# File 'lib/pigato/worker.rb', line 59

def reply reply
  reply = [@reply_to, '', @reply_rid, '0'].concat([Oj.dump(reply)])
  send Pigato::W_REPLY, reply
end

#send(command, data = nil) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/pigato/worker.rb', line 134

def send command, data = nil
  if data.nil?
    data = []
  elsif not data.is_a?(Array)
    data = [data]
  end

  socket = get_socket

  data = [Pigato::W_WORKER, command].concat data
  msg = ZMQ::Message.new
  data.reverse.each{|p| msg.push(ZMQ::Frame(p))}
  socket.send_message msg
end

#startObject



121
122
123
124
125
126
127
# File 'lib/pigato/worker.rb', line 121

def start 
  stop
  sock_create
  send Pigato::W_READY, @service
  super
  @liveness = HEARTBEAT_LIVENESS
end

#stopObject



129
130
131
132
# File 'lib/pigato/worker.rb', line 129

def stop
  sock_close
  super
end