Class: NFAgent::Chunk

Inherits:
Array show all
Defined in:
lib/nfagent/chunk.rb

Constant Summary collapse

DEFAULT_MAX_SIZE =
500

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_size = DEFAULT_MAX_SIZE) ⇒ Chunk

Returns a new instance of Chunk.



15
16
17
18
# File 'lib/nfagent/chunk.rb', line 15

def initialize(max_size = DEFAULT_MAX_SIZE)
  @max_size = max_size
  @created_at = Time.now
end

Instance Attribute Details

#created_atObject (readonly)

Returns the value of attribute created_at.



10
11
12
# File 'lib/nfagent/chunk.rb', line 10

def created_at
  @created_at
end

#max_sizeObject (readonly)

Returns the value of attribute max_size.



11
12
13
# File 'lib/nfagent/chunk.rb', line 11

def max_size
  @max_size
end

Instance Method Details

#<<(line) ⇒ Object

Raises:



20
21
22
23
24
25
# File 'lib/nfagent/chunk.rb', line 20

def <<(line)
  raise ChunkExpired if expired?
  raise ChunkFull if full?
  raise DayBoundary if Time.now.day != self.created_at.day
  super(line)
end

#dump(key = nil) ⇒ Object



35
36
37
38
39
40
41
42
43
44
# File 'lib/nfagent/chunk.rb', line 35

def dump(key = nil)
  Payload.new do |payload|
    Log.info("Dumping payload from chunk (#{self.size || 0} lines #{'due to expiry' if expired?}")
    payload.line_count = self.size
    payload.chunk_expired = expired?
    payload.key = key
    payload.data = Encoder.encode64url(Zlib::Deflate.deflate(self.join("\n"), Zlib::BEST_COMPRESSION))
    payload.checksum = Digest::SHA1.hexdigest(payload.data)
  end
end

#expired?Boolean

Returns:

  • (Boolean)


31
32
33
# File 'lib/nfagent/chunk.rb', line 31

def expired?
  (Time.now - @created_at > Config.chunk_timeout) && !self.empty?
end

#full?Boolean

Returns:

  • (Boolean)


27
28
29
# File 'lib/nfagent/chunk.rb', line 27

def full?
  self.size >= @max_size
end

#submit(key = nil) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/nfagent/chunk.rb', line 46

def submit(key = nil)
  Log.info("Submitting...")
  # TODO God knows why EM Deferrable isn't working - defer here is OK
  EM.defer {
    submitter = Submitter.new(self.dump(key))
    submitter.errback { |payload|
      payload.write_to_disk(Config.dump_dir)
    }
    submitter.perform
  }
  # Callback and remove from chunk group
end