Class: Agent::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/agent/queue.rb,
lib/agent/queue/buffered.rb,
lib/agent/queue/unbuffered.rb

Direct Known Subclasses

Buffered, Unbuffered

Defined Under Namespace

Classes: Buffered, Unbuffered

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(type) ⇒ Queue

Returns a new instance of Queue.

Raises:



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/agent/queue.rb', line 9

def initialize(type)
  @type = type

  raise Errors::Untyped unless @type
  raise Errors::InvalidType unless @type.is_a?(Module)

  @closed = false

  @queue      = []
  @operations = []
  @pushes     = []
  @pops       = []

  @mutex = Mutex.new

  reset_custom_state
end

Instance Attribute Details

#mutexObject (readonly)

Returns the value of attribute mutex.



7
8
9
# File 'lib/agent/queue.rb', line 7

def mutex
  @mutex
end

#operationsObject (readonly)

Returns the value of attribute operations.



7
8
9
# File 'lib/agent/queue.rb', line 7

def operations
  @operations
end

#popsObject (readonly)

Returns the value of attribute pops.



7
8
9
# File 'lib/agent/queue.rb', line 7

def pops
  @pops
end

#pushesObject (readonly)

Returns the value of attribute pushes.



7
8
9
# File 'lib/agent/queue.rb', line 7

def pushes
  @pushes
end

#queueObject (readonly)

Returns the value of attribute queue.



7
8
9
# File 'lib/agent/queue.rb', line 7

def queue
  @queue
end

#typeObject (readonly)

Returns the value of attribute type.



7
8
9
# File 'lib/agent/queue.rb', line 7

def type
  @type
end

Instance Method Details

#buffered?Boolean

Returns:

  • (Boolean)

Raises:



27
28
29
30
# File 'lib/agent/queue.rb', line 27

def buffered?
  # implement in subclass
  raise Errors::NotImplementedError
end

#closeObject



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/agent/queue.rb', line 47

def close
  mutex.synchronize do
    raise Errors::ChannelClosed if @closed
    @closed = true
    @operations.each{|o| o.close }
    @operations.clear
    @queue.clear
    @pushes.clear
    @pops.clear

    reset_custom_state
  end
end

#closed?Boolean

Returns:

  • (Boolean)


61
# File 'lib/agent/queue.rb', line 61

def closed?; @closed; end

#open?Boolean

Returns:

  • (Boolean)


62
# File 'lib/agent/queue.rb', line 62

def open?;   !@closed;   end

#pop(options = {}) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/agent/queue.rb', line 81

def pop(options={})
  pop = Pop.new(options)

  mutex.synchronize do
    pop.close if @closed
    operations << pop
    pops << pop
    process
  end

  return pop if options[:deferred]

  ok = pop.wait
  [pop.object, ok]
end

#pop?Boolean

Returns:

  • (Boolean)

Raises:



37
38
39
40
# File 'lib/agent/queue.rb', line 37

def pop?
  # implement in subclass
  raise Errors::NotImplementedError
end

#push(object, options = {}) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/agent/queue.rb', line 64

def push(object, options={})
  raise Errors::InvalidType unless object.is_a?(@type)

  push = Push.new(object, options)

  mutex.synchronize do
    raise Errors::ChannelClosed if @closed
    operations << push
    pushes << push
    process
  end

  return push if options[:deferred]

  push.wait
end

#push?Boolean

Returns:

  • (Boolean)

Raises:



42
43
44
45
# File 'lib/agent/queue.rb', line 42

def push?
  # implement in subclass
  raise Errors::NotImplementedError
end

#remove_operations(ops) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/agent/queue.rb', line 97

def remove_operations(ops)
  mutex.synchronize do
    return if @closed

    ops.each do |operation|
      operations.delete(operation)
    end

    pushes.clear
    pops.clear

    operations.each do |operation|
      if operation.is_a?(Push)
        pushes << operation
      else
        pops << operation
      end
    end

    reset_custom_state
  end
end

#unbuffered?Boolean

Returns:

  • (Boolean)

Raises:



32
33
34
35
# File 'lib/agent/queue.rb', line 32

def unbuffered?
  # implement in subclass
  raise Errors::NotImplementedError
end