Class: FreeMessageQueue::LoadBalancedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fmq/queues/load_balanced.rb

Overview

This queue is an approach to the issue that you want to have multiple threads at one queue at a time. Currently this is not considered to be a stable queue. Just use it for experimental things.

configuration sample:

queue-manager:
  auto-create-queues: true
  defined-queues:
    test-queue-1:
      path: /fmq_test/test1
      max-messages: 1000000
      max-size: 10kb
      class: FreeMessageQueue::LoadBalancedQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(manager, queue_count = 5) ⇒ LoadBalancedQueue

Returns a new instance of LoadBalancedQueue.



39
40
41
42
43
44
45
46
47
# File 'lib/fmq/queues/load_balanced.rb', line 39

def initialize(manager, queue_count = 5)
  @manager = manager
  @queues = []
  queue_count.times do
    @queues << SyncronizedQueue.new(manager)
  end
  @poll_queue = @put_queue = 0
  @semaphore = Mutex.new
end

Instance Attribute Details

#managerObject

QueueManager refrence



37
38
39
# File 'lib/fmq/queues/load_balanced.rb', line 37

def manager
  @manager
end

Instance Method Details

#bytesObject

size of queue in bytes



62
63
64
65
66
# File 'lib/fmq/queues/load_balanced.rb', line 62

def bytes
  tmp_bytes = 0
  @queues.each { |q| tmp_bytes += q.bytes }
  return tmp_bytes
end

#clearObject

delete all messages in all queues



50
51
52
# File 'lib/fmq/queues/load_balanced.rb', line 50

def clear
  @queues.each { |q| q.clear }
end

#max_messagesObject

queue has infinite count



79
80
81
# File 'lib/fmq/queues/load_balanced.rb', line 79

def max_messages
  BaseQueue::INFINITE
end

#max_sizeObject

queue has infinite messages



84
85
86
# File 'lib/fmq/queues/load_balanced.rb', line 84

def max_size
  BaseQueue::INFINITE
end

#pollObject

Return one message from one of the queues



69
70
71
# File 'lib/fmq/queues/load_balanced.rb', line 69

def poll
  @queues[next_poll_index].poll
end

#put(data) ⇒ Object

Put an item to one of the queues



74
75
76
# File 'lib/fmq/queues/load_balanced.rb', line 74

def put(data)
  @queues[next_put_index].put(data)
end

#sizeObject

size of the queue is sum of size of all load balanced queues



55
56
57
58
59
# File 'lib/fmq/queues/load_balanced.rb', line 55

def size
  size = 0
  @queues.each { |q| size += q.size }
  return size 
end