Class: Zookeeper::Common::QueueWithPipe

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Logger
Defined in:
lib/zookeeper/common/queue_with_pipe.rb

Overview

Ceci n’est pas une pipe

Defined Under Namespace

Classes: ShutdownException

Constant Summary collapse

KILL_TOKEN =
Object.new

Instance Method Summary collapse

Methods included from Logger

const_missing, included, new

Constructor Details

#initializeQueueWithPipe

Returns a new instance of QueueWithPipe.



17
18
19
20
21
22
23
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 17

def initialize
  @queue = Queue.new

  @mutex = Mutex.new
  @closed = false
  @graceful = false
end

Instance Method Details

#closeObject



54
55
56
57
58
59
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 54

def close
  @mutex.synchronize do
    return if @closed
    @closed = true
  end
end

#closed?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 61

def closed?
  @mutex.synchronize { !!@closed }
end

#graceful_close!Object

close the queue and causes ShutdownException to be raised on waiting threads



44
45
46
47
48
49
50
51
52
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 44

def graceful_close!
  @mutex.synchronize do
    return if @graceful or @closed
    logger.debug { "#{self.class}##{__method__} gracefully closing" }
    @graceful = true
    push(KILL_TOKEN)
  end
  nil
end

#pop(non_blocking = false) ⇒ Object

Raises:



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 30

def pop(non_blocking=false)
  raise ShutdownException if closed?  # this may get us in trouble

  rv = @queue.pop(non_blocking)

  if rv == KILL_TOKEN
    close
    raise ShutdownException
  end

  rv
end

#push(obj) ⇒ Object



25
26
27
28
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 25

def push(obj)
  logger.debug { "#{self.class}##{__method__} obj: #{obj.inspect}, kill_token? #{obj == KILL_TOKEN}" }
  @queue.push(obj)
end