Class: ZMQPoll::ZMQPoll

Inherits:
Object
  • Object
show all
Defined in:
lib/log-courier/zmq_qpoll.rb

Instance Method Summary collapse

Constructor Details

#initialize(context, logger = nil) ⇒ ZMQPoll

Returns a new instance of ZMQPoll.



31
32
33
34
35
36
37
38
39
# File 'lib/log-courier/zmq_qpoll.rb', line 31

def initialize(context, logger=nil)
	@logger = logger
	@context = context
	@poller = ZMQ::Poller.new
	@sockets = []
	@socket_to_socket = []
	@handlers = {}
	@queues = {}
end

Instance Method Details

#close_socket_to_socket(socket) ⇒ Object



126
127
128
129
130
131
132
# File 'lib/log-courier/zmq_qpoll.rb', line 126

def close_socket_to_socket(socket)
	return if !@socket_to_socket.include?(socket)
	state = @socket_to_socket[socket]
	@socket_to_socket.delete socket
	_close_socket_to_socket(state)
	return
end

#create_socket_to_socket(socket) ⇒ Object



120
121
122
123
124
# File 'lib/log-courier/zmq_qpoll.rb', line 120

def create_socket_to_socket(socket)
	state = _create_socket_to_socket(socket)
	@socket_to_socket[state[:sender]] = state
	state[:sender]
end

#deregister_queue(queue) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/log-courier/zmq_qpoll.rb', line 104

def deregister_queue(queue)
	return if !@queues.key?(queue)

	# Push nil so if we're idle we jump into action and exit
	# But also set shutdown to try so if we're mid-send and timeout, we exit
	@queues[queue][:mutex].synchronize do
		queue.push nil
		@queues[queue][:shutdown] = true
	end
	@queues[queue][:thread].join

	_close_socket_to_socket @queues[queue][:state]
	@queues.delete queue
	return
end

#deregister_socket(socket) ⇒ Object



69
70
71
72
73
74
# File 'lib/log-courier/zmq_qpoll.rb', line 69

def deregister_socket(socket)
	return if @handlers.key?(socket)

	@poller.delete socket
	return
end

#poll(timeout) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/log-courier/zmq_qpoll.rb', line 134

def poll(timeout)
	if @poller.size == 0
		fail ZMQError, 'poll run called with zero socket/queues'
	end

	rc = @poller.poll(timeout)
	unless ZMQ::Util.resultcode_ok?(rc)
		# If we get ETERM or ENOTSOCK - we may have hit JRuby bug where finalisers are called too early
		# We throw it so the caller knows NOT to retry and hit a bad loop
		# The downside is - this Jruby bug will cause an inevitable deadlock
		# The finaliser for the context will have run before finalisers for sockets (random order?)
		# The resulting ZMQ context termination will block, waiting for sockets to be closed
		# However, when closing a socket, ffi-rzmq will attempt to remove it's finaliser
		# This removal will deadlock as it attempts to lock the finaliser list, which is locked by the
		# thread that is running the ZMQ context termination, which is blocked...
		# TODO: Raise issue in JRuby github and try to track down why finalisers run while code is running
		# if the exit! call is made
		fail ZMQTerm, 'poll error: ' + ZMQ::Util.error_string if ZMQ::Util.errno == ZMQ::ETERM || ZMQ::Util.errno == ZMQ::ENOTSOCK
		fail ZMQError, 'poll error: ' + ZMQ::Util.error_string
	end

	return if rc == 0

	ready = (@poller.readables|@poller.writables)

	ready.each do |socket|
		if @handlers.key?(socket)
			__send__ @handlers[socket][:callback], @handlers[socket]
		end

		yield socket, @poller.readables.include?(socket), @poller.writables.include?(socket)
	end

	return
end

#readablesObject



41
42
43
# File 'lib/log-courier/zmq_qpoll.rb', line 41

def readables
	@poller.readables
end

#register_queue_to_socket(queue, socket) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/log-courier/zmq_qpoll.rb', line 76

def register_queue_to_socket(queue, socket)
	s2s_state = _create_socket_to_socket(socket)

	state = {
		state:    s2s_state,
		mutex:    Mutex.new,
		shutdown: false,
	}

	state[:thread] = Thread.new do
		loop do
			data = queue.pop
			break if data.nil?
			begin
				send s2s_state[:sender], data
			rescue TimeoutError
				state[:mutex].synchronize do
					break if state[:shutdown]
				end
				retry
			end
		end
	end

	@queues[queue] = state
	return
end

#register_socket(socket, flags) ⇒ Object



64
65
66
67
# File 'lib/log-courier/zmq_qpoll.rb', line 64

def register_socket(socket, flags)
	@poller.register socket, flags
	return
end

#shutdownObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/log-courier/zmq_qpoll.rb', line 49

def shutdown
	@queues.each_key do |queue|
		deregister_queue queue
	end

	@socket_to_socket.each do |socket|
		_close_socket_to_socket socket
	end

	@sockets.each do |socket|
		socket.close
	end
	return
end

#writablesObject



45
46
47
# File 'lib/log-courier/zmq_qpoll.rb', line 45

def writables
	@poller.writables
end