Class: ZookeeperCommon::QueueWithPipe

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/zookeeper/common/queue_with_pipe.rb

Overview

Ceci n’est pas une pipe

Defined Under Namespace

Classes: ShutdownException

Constant Summary collapse

KILL_TOKEN =
Object.new

Instance Method Summary collapse

Constructor Details

#initializeQueueWithPipe

Returns a new instance of QueueWithPipe.



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 15

def initialize
#       r, w = IO.pipe
#       @pipe = { :read => r, :write => w }
  @queue = Queue.new

  # with the EventMachine client, we want to let EM handle clearing the
  # event pipe, so we set this to false
#       @clear_reads_on_pop = true

  @mutex = Mutex.new
  @closed = false
  @graceful = false
end

Instance Method Details

#closeObject



58
59
60
61
62
63
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 58

def close
  @mutex.synchronize do
    return if @closed
    @closed = true
  end
end

#closed?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 65

def closed?
  @mutex.synchronize { !!@closed }
end

#graceful_close!Object

close the queue and causes ShutdownException to be raised on waiting threads



48
49
50
51
52
53
54
55
56
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 48

def graceful_close!
  @mutex.synchronize do
    return if @graceful or @closed
    logger.debug { "#{self.class}##{__method__} gracefully closing" }
    @graceful = true
    push(KILL_TOKEN)
  end
  nil
end

#pop(non_blocking = false) ⇒ Object

Raises:



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 34

def pop(non_blocking=false)
  raise ShutdownException if closed?  # this may get us in trouble

  rv = @queue.pop(non_blocking)

  if rv == KILL_TOKEN
    close
    raise ShutdownException
  end

  rv
end

#push(obj) ⇒ Object



29
30
31
32
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 29

def push(obj)
  logger.debug { "#{self.class}##{__method__} obj: #{obj.inspect}, kill_token? #{obj == KILL_TOKEN}" }
  @queue.push(obj)
end