Class: Fluent::Plugin::Buffer::Chunk

Inherits:
Object
  • Object
show all
Includes:
ChunkMessagePackEventStreamer, UniqueId::Mixin, MonitorMixin
Defined in:
lib/fluent/plugin/buffer/chunk.rb

Direct Known Subclasses

FileChunk, MemoryChunk

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ChunkMessagePackEventStreamer

#each, #to_msgpack_stream

Methods included from MessagePackFactory::Mixin

#msgpack_factory, #msgpack_packer, #msgpack_unpacker

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Constructor Details

#initialize(metadata) ⇒ Chunk

TODO: CompressedPackedMessage of forward protocol?



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/fluent/plugin/buffer/chunk.rb', line 49

def initialize()
  super()
  @unique_id = generate_unique_id
   = 

  # state: unstaged/staged/queued/closed
  @state = :unstaged

  @size = 0
  @created_at = Time.now
  @modified_at = Time.now
end

Instance Attribute Details

#created_atObject (readonly)

Returns the value of attribute created_at.



62
63
64
# File 'lib/fluent/plugin/buffer/chunk.rb', line 62

def created_at
  @created_at
end

#metadataObject (readonly)

Returns the value of attribute metadata.



62
63
64
# File 'lib/fluent/plugin/buffer/chunk.rb', line 62

def 
  
end

#modified_atObject (readonly)

Returns the value of attribute modified_at.



62
63
64
# File 'lib/fluent/plugin/buffer/chunk.rb', line 62

def modified_at
  @modified_at
end

#stateObject (readonly)

Returns the value of attribute state.



62
63
64
# File 'lib/fluent/plugin/buffer/chunk.rb', line 62

def state
  @state
end

#unique_idObject (readonly)

Returns the value of attribute unique_id.



62
63
64
# File 'lib/fluent/plugin/buffer/chunk.rb', line 62

def unique_id
  @unique_id
end

Instance Method Details

#append(data) ⇒ Object

data is array of formatted record string



65
66
67
68
69
70
71
# File 'lib/fluent/plugin/buffer/chunk.rb', line 65

def append(data)
  adding = ''.b
  data.each do |d|
    adding << d.b
  end
  concat(adding, data.size)
end

#bytesizeObject

Raises:

  • (NotImplementedError)


86
87
88
# File 'lib/fluent/plugin/buffer/chunk.rb', line 86

def bytesize
  raise NotImplementedError, "Implement this method in child class"
end

#closeObject



134
135
136
137
# File 'lib/fluent/plugin/buffer/chunk.rb', line 134

def close
  @state = :closed
  self
end

#closed?Boolean

Returns:

  • (Boolean)


115
116
117
# File 'lib/fluent/plugin/buffer/chunk.rb', line 115

def closed?
  @state == :closed
end

#commitObject

Raises:

  • (NotImplementedError)


78
79
80
# File 'lib/fluent/plugin/buffer/chunk.rb', line 78

def commit
  raise NotImplementedError, "Implement this method in child class"
end

#concat(bulk, records) ⇒ Object

for event streams which is packed or zipped (and we want not to unpack/uncompress)

Raises:

  • (NotImplementedError)


74
75
76
# File 'lib/fluent/plugin/buffer/chunk.rb', line 74

def concat(bulk, records)
  raise NotImplementedError, "Implement this method in child class"
end

#empty?Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/fluent/plugin/buffer/chunk.rb', line 95

def empty?
  size == 0
end

#enqueued!Object



129
130
131
132
# File 'lib/fluent/plugin/buffer/chunk.rb', line 129

def enqueued!
  @state = :queued
  self
end

#open(&block) ⇒ Object

Raises:

  • (NotImplementedError)


148
149
150
# File 'lib/fluent/plugin/buffer/chunk.rb', line 148

def open(&block)
  raise NotImplementedError, "Implement this method in child class"
end

#purgeObject



139
140
141
142
# File 'lib/fluent/plugin/buffer/chunk.rb', line 139

def purge
  @state = :closed
  self
end

#queued?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/fluent/plugin/buffer/chunk.rb', line 111

def queued?
  @state == :queued
end

#readObject

Raises:

  • (NotImplementedError)


144
145
146
# File 'lib/fluent/plugin/buffer/chunk.rb', line 144

def read
  raise NotImplementedError, "Implement this method in child class"
end

#rollbackObject

Raises:

  • (NotImplementedError)


82
83
84
# File 'lib/fluent/plugin/buffer/chunk.rb', line 82

def rollback
  raise NotImplementedError, "Implement this method in child class"
end

#sizeObject Also known as: length

Raises:

  • (NotImplementedError)


90
91
92
# File 'lib/fluent/plugin/buffer/chunk.rb', line 90

def size
  raise NotImplementedError, "Implement this method in child class"
end

#staged!Object



119
120
121
122
# File 'lib/fluent/plugin/buffer/chunk.rb', line 119

def staged!
  @state = :staged
  self
end

#staged?Boolean

Returns:

  • (Boolean)


107
108
109
# File 'lib/fluent/plugin/buffer/chunk.rb', line 107

def staged?
  @state == :staged
end

#unstaged!Object



124
125
126
127
# File 'lib/fluent/plugin/buffer/chunk.rb', line 124

def unstaged!
  @state = :unstaged
  self
end

#unstaged?Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/fluent/plugin/buffer/chunk.rb', line 103

def unstaged?
  @state == :unstaged
end

#writable?Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/fluent/plugin/buffer/chunk.rb', line 99

def writable?
  @state == :staged || @state == :unstaged
end

#write_to(io) ⇒ Object



152
153
154
155
156
# File 'lib/fluent/plugin/buffer/chunk.rb', line 152

def write_to(io)
  open do |i|
    IO.copy_stream(i, io)
  end
end