Class: DatTCP::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/dat-tcp/worker_pool.rb

Instance Method Summary collapse

Constructor Details

#initializeQueue

Returns a new instance of Queue.



121
122
123
124
125
126
# File 'lib/dat-tcp/worker_pool.rb', line 121

def initialize
  @items = []
  @shutdown = false
  @mutex              = Mutex.new
  @condition_variable = ConditionVariable.new
end

Instance Method Details

#empty?Boolean

Returns:

  • (Boolean)


146
147
148
# File 'lib/dat-tcp/worker_pool.rb', line 146

def empty?
  @mutex.synchronize{ @items.empty? }
end

#itemsObject



128
129
130
# File 'lib/dat-tcp/worker_pool.rb', line 128

def items
  @mutex.synchronize{ @items }
end

#popObject



142
143
144
# File 'lib/dat-tcp/worker_pool.rb', line 142

def pop
  @mutex.synchronize{ @items.pop }
end

#push(socket) ⇒ Object

Add the connection and wake up the first worker (the ‘signal`) that’s waiting (because of ‘wait_for_new_connection`)



134
135
136
137
138
139
140
# File 'lib/dat-tcp/worker_pool.rb', line 134

def push(socket)
  raise "Unable to add connection while shutting down" if @shutdown
  @mutex.synchronize do
    @items << socket
    @condition_variable.signal
  end
end

#shutdownObject

wake up any workers who are idle (because of ‘wait_for_new_connection`)



156
157
158
159
# File 'lib/dat-tcp/worker_pool.rb', line 156

def shutdown
  @shutdown = true
  @mutex.synchronize{ @condition_variable.broadcast }
end

#wait_for_new_connectionObject

wait to be signaled by ‘push`



151
152
153
# File 'lib/dat-tcp/worker_pool.rb', line 151

def wait_for_new_connection
  @mutex.synchronize{ @condition_variable.wait(@mutex) }
end