Class: Fwd::Buffer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/fwd/buffer.rb

Constant Summary collapse

MAX_LIMIT =

64M

64 * 1024 * 1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(core) ⇒ Buffer

Constructor

Parameters:



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/fwd/buffer.rb', line 10

def initialize(core)
  @core     = core
  @interval = (core.opts[:flush_interval] || 60).to_i
  @rate     = (core.opts[:flush_rate] || 10_000).to_i
  @limit    = [core.opts[:buffer_limit].to_i, MAX_LIMIT].reject(&:zero?).min
  @count    = 0
  cleanup!
  reschedule!

  at_exit { rotate! }
end

Instance Attribute Details

#coreObject (readonly)

Returns the value of attribute core.



6
7
8
# File 'lib/fwd/buffer.rb', line 6

def core
  @core
end

#countObject (readonly)

Returns the value of attribute count.



6
7
8
# File 'lib/fwd/buffer.rb', line 6

def count
  @count
end

#fdObject (readonly)

Returns the value of attribute fd.



6
7
8
# File 'lib/fwd/buffer.rb', line 6

def fd
  @fd
end

#intervalObject (readonly)

Returns the value of attribute interval.



6
7
8
# File 'lib/fwd/buffer.rb', line 6

def interval
  @interval
end

#limitObject (readonly)

Returns the value of attribute limit.



6
7
8
# File 'lib/fwd/buffer.rb', line 6

def limit
  @limit
end

#rateObject (readonly)

Returns the value of attribute rate.



6
7
8
# File 'lib/fwd/buffer.rb', line 6

def rate
  @rate
end

#timerObject (readonly)

Returns the value of attribute timer.



6
7
8
# File 'lib/fwd/buffer.rb', line 6

def timer
  @timer
end

Instance Method Details

#concat(data) ⇒ Object

Parameters:

  • data (String)

    binary data



23
24
25
26
27
28
# File 'lib/fwd/buffer.rb', line 23

def concat(data)
  rotate! if limit_reached?
  @fd.write(data)
  @count += 1
  flush! if flush?
end

#flush!Object

(Force) flush buffer



31
32
33
34
35
36
37
# File 'lib/fwd/buffer.rb', line 31

def flush!
  @count = 0
  rotate!
  core.flush!
ensure
  reschedule!
end

#flush?Boolean

Returns true if flush is due.

Returns:

  • (Boolean)

    true if flush is due



40
41
42
# File 'lib/fwd/buffer.rb', line 40

def flush?
  @rate > 0 && @count >= @rate
end

#limit_reached?Boolean

Returns true if limit reached.

Returns:

  • (Boolean)

    true if limit reached



59
60
61
62
63
# File 'lib/fwd/buffer.rb', line 59

def limit_reached?
  @fd.nil? || @fd.size >= @limit
rescue Errno::ENOENT
  false
end

#rotate!Object

(Force) rotate buffer file



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fwd/buffer.rb', line 45

def rotate!
  return if @fd && @fd.size.zero?

  if @fd
    close(@fd.path)
    logger.debug { "Rotated #{File.basename(@fd.path)}, #{@fd.size / 1024}k" }
  end

  @fd = new_file
rescue Errno::ENOENT => e
  logger.warn "Rotation delayed: #{e.message}"
end