Class: ZMQPoll::ZMQPoll

Inherits:
Object
  • Object
show all
Defined in:
lib/log-courier/zmq_qpoll.rb

Instance Method Summary collapse

Constructor Details

#initialize(context, logger = nil) ⇒ ZMQPoll

Returns a new instance of ZMQPoll.



31
32
33
34
35
36
37
38
39
# File 'lib/log-courier/zmq_qpoll.rb', line 31

def initialize(context, logger=nil)
  @logger = logger
  @context = context
  @poller = ZMQ::Poller.new
  @sockets = []
  @socket_to_socket = []
  @handlers = {}
  @queues = {}
end

Instance Method Details

#close_socket_to_socket(socket) ⇒ Object



126
127
128
129
130
131
132
# File 'lib/log-courier/zmq_qpoll.rb', line 126

def close_socket_to_socket(socket)
  return if !@socket_to_socket.include?(socket)
  state = @socket_to_socket[socket]
  @socket_to_socket.delete socket
  _close_socket_to_socket(state)
  return
end

#create_socket_to_socket(socket) ⇒ Object



120
121
122
123
124
# File 'lib/log-courier/zmq_qpoll.rb', line 120

def create_socket_to_socket(socket)
  state = _create_socket_to_socket(socket)
  @socket_to_socket[state[:sender]] = state
  state[:sender]
end

#deregister_queue(queue) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/log-courier/zmq_qpoll.rb', line 104

def deregister_queue(queue)
  return if !@queues.key?(queue)

  # Push nil so if we're idle we jump into action and exit
  # But also set shutdown to try so if we're mid-send and timeout, we exit
  @queues[queue][:mutex].synchronize do
    queue.push nil
    @queues[queue][:shutdown] = true
  end
  @queues[queue][:thread].join

  _close_socket_to_socket @queues[queue][:state]
  @queues.delete queue
  return
end

#deregister_socket(socket) ⇒ Object



69
70
71
72
73
74
# File 'lib/log-courier/zmq_qpoll.rb', line 69

def deregister_socket(socket)
  return if @handlers.key?(socket)

  @poller.delete socket
  return
end

#poll(timeout) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/log-courier/zmq_qpoll.rb', line 134

def poll(timeout)
  if @poller.size == 0
    fail ZMQError, 'poll run called with zero socket/queues'
  end

  rc = @poller.poll(timeout)
  unless ZMQ::Util.resultcode_ok?(rc)
    # If we get ETERM or ENOTSOCK - we may have hit JRuby bug where finalisers are called too early
    # We throw it so the caller knows NOT to retry and hit a bad loop
    # The downside is - this Jruby bug will cause an inevitable deadlock
    # The finaliser for the context will have run before finalisers for sockets (random order?)
    # The resulting ZMQ context termination will block, waiting for sockets to be closed
    # However, when closing a socket, ffi-rzmq will attempt to remove it's finaliser
    # This removal will deadlock as it attempts to lock the finaliser list, which is locked by the
    # thread that is running the ZMQ context termination, which is blocked...
    # TODO: Raise issue in JRuby github and try to track down why finalisers run while code is running
    # if the exit! call is made
    fail ZMQTerm, 'poll error: ' + ZMQ::Util.error_string if ZMQ::Util.errno == ZMQ::ETERM || ZMQ::Util.errno == ZMQ::ENOTSOCK
    fail ZMQError, 'poll error: ' + ZMQ::Util.error_string
  end

  return if rc == 0

  ready = (@poller.readables|@poller.writables)

  ready.each do |socket|
    if @handlers.key?(socket)
      __send__ @handlers[socket][:callback], @handlers[socket]
    end

    yield socket, @poller.readables.include?(socket), @poller.writables.include?(socket)
  end

  return
end

#readablesObject



41
42
43
# File 'lib/log-courier/zmq_qpoll.rb', line 41

def readables
  @poller.readables
end

#register_queue_to_socket(queue, socket) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/log-courier/zmq_qpoll.rb', line 76

def register_queue_to_socket(queue, socket)
  s2s_state = _create_socket_to_socket(socket)

  state = {
    state:    s2s_state,
    mutex:    Mutex.new,
    shutdown: false,
  }

  state[:thread] = Thread.new do
    loop do
      data = queue.pop
      break if data.nil?
      begin
        send s2s_state[:sender], data
      rescue TimeoutError
        state[:mutex].synchronize do
          break if state[:shutdown]
        end
        retry
      end
    end
  end

  @queues[queue] = state
  return
end

#register_socket(socket, flags) ⇒ Object



64
65
66
67
# File 'lib/log-courier/zmq_qpoll.rb', line 64

def register_socket(socket, flags)
  @poller.register socket, flags
  return
end

#shutdownObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/log-courier/zmq_qpoll.rb', line 49

def shutdown
  @queues.each_key do |queue|
    deregister_queue queue
  end

  @socket_to_socket.each do |socket|
    _close_socket_to_socket socket
  end

  @sockets.each do |socket|
    socket.close
  end
  return
end

#writablesObject



45
46
47
# File 'lib/log-courier/zmq_qpoll.rb', line 45

def writables
  @poller.writables
end