Class: Qwirk::Adapter::InMemory::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/qwirk/adapter/in_memory/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Queue

Returns a new instance of Queue.



11
12
13
14
15
16
17
18
19
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 11

def initialize(name)
  @name            = name
  @max_size        = 0
  @array_mutex     = Mutex.new
  @read_condition  = ConditionVariable.new
  @write_condition = ConditionVariable.new
  @array           = []
  @stopping        = false
end

Instance Attribute Details

#max_sizeObject

TODO: Look into reimplementing using a Ruby Queue which is probably better performant Size of the queue before it write-blocks. If 0, messages will be dropped. If -1, then it’s unlimited. TODO: Should implement a queue_full_strategy which would be publish_block, drop_oldest, drop_newest



9
10
11
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 9

def max_size
  @max_size
end

#nameObject

TODO: Look into reimplementing using a Ruby Queue which is probably better performant Size of the queue before it write-blocks. If 0, messages will be dropped. If -1, then it’s unlimited. TODO: Should implement a queue_full_strategy which would be publish_block, drop_oldest, drop_newest



9
10
11
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 9

def name
  @name
end

Instance Method Details

#interrupt_readObject



40
41
42
43
44
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 40

def interrupt_read
  @array_mutex.synchronize do
    @read_condition.broadcast
  end
end

#read(stoppable) ⇒ Object

Block read until a message or we get stopped.



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 47

def read(stoppable)
  @array_mutex.synchronize do
    until stoppable.stopped || (@stopping  && @array.empty?)
      if @array.empty?
        @read_condition.wait(@array_mutex)
      else
        @write_condition.signal
        return @array.shift
      end
    end
  end
  return nil
end

#sizeObject



21
22
23
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 21

def size
  @array.size
end

#stopObject



25
26
27
28
29
30
31
32
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 25

def stop
  return if @stopping
  @stopping = true
  @array_mutex.synchronize do
    @write_condition.broadcast
    @read_condition.broadcast
  end
end

#stopped?Boolean

Returns:

  • (Boolean)


34
35
36
37
38
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 34

def stopped?
  @array_mutex.synchronize do
    return @stopping && @array.empty?
  end
end

#to_sObject



81
82
83
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 81

def to_s
  "queue:#{@name}"
end

#write(obj) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/qwirk/adapter/in_memory/queue.rb', line 61

def write(obj)
  @array_mutex.synchronize do
    # We just drop the message if no workers have been configured yet
    while !@stopping
      if @max_size == 0
        Qwirk.logger.warn "No worker for queue #{@name}, dropping message #{obj.inspect}"
        return
      end
      if @max_size < 0 || @array.size < @max_size
        @array << obj
        @read_condition.signal
        return
      end
      # TODO: Let's allow various write_full_modes such as :block, :remove_oldest, ? (Currently only blocks)
      @write_condition.wait(@array_mutex)
    end
    Qwirk.logger.warn "Queue has been stopped #{@name}, dropping message #{obj.inspect}"
  end
end