Class: LogCourier::ClientFactoryZmq

Inherits:
Object
  • Object
show all
Defined in:
lib/log-courier/server_zmq.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options, send_queue) ⇒ ClientFactoryZmq

Returns a new instance of ClientFactoryZmq.



211
212
213
214
215
216
217
218
219
# File 'lib/log-courier/server_zmq.rb', line 211

def initialize(options, send_queue)
  @options = options
  @logger = @options[:logger]

  @send_queue = send_queue
  @index = {}
  @client_threads = {}
  @mutex = Mutex.new
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



208
209
210
# File 'lib/log-courier/server_zmq.rb', line 208

def options
  @options
end

#send_queueObject (readonly)

Returns the value of attribute send_queue.



209
210
211
# File 'lib/log-courier/server_zmq.rb', line 209

def send_queue
  @send_queue
end

Instance Method Details

#deliver(source, data, &block) ⇒ Object



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/log-courier/server_zmq.rb', line 237

def deliver(source, data, &block)
  # Find the handling thread
  # We separate each source into threads so that each thread can respond
  # with partial ACKs if we hit a slow down
  # If we processed in a single thread, we'd only be able to respond to
  # a single client with partial ACKs
  @mutex.synchronize do
    index = @index
    source.each do |identity|
      index[identity] = {} if !index.key?(identity)
      index = index[identity]
    end

    if !index.key?('')
      source_str = source.map do |s|
        s.each_byte.map do |b|
          b.to_s(16).rjust(2, '0')
        end
      end.join

      @logger.info 'New source', :source => source_str unless @logger.nil?

      # Create the client and associated thread
      client = ClientZmq.new(self, source, source_str) do
        try_drop source, source_str
      end

      thread = Thread.new do
        client.run &block
      end

      @client_threads[thread] = thread

      index[''] = {
        'client' => client,
        'thread' => thread,
      }
    end

    # Existing thread, throw on the queue, if not enough room (timeout) drop the message
    begin
      index['']['client'].push data, 0
    rescue LogCourier::TimeoutError
      # TODO: Log a warning about this?
    end
  end
  return
end

#shutdownObject



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/log-courier/server_zmq.rb', line 221

def shutdown
  # Stop other threads from try_drop collisions
  client_threads = @mutex.synchronize do
    client_threads = @client_threads
    @client_threads = {}
    client_threads
  end

  client_threads.each_value do |thr|
    thr.raise ShutdownSignal
  end

  client_threads.each_value(&:join)
  return
end