Class: Fluent::Plugin::Buffer::Chunk
- Inherits:
-
Object
- Object
- Fluent::Plugin::Buffer::Chunk
show all
- Includes:
- UniqueId::Mixin, MonitorMixin
- Defined in:
- lib/fluent/plugin/buffer/chunk.rb
Defined Under Namespace
Modules: Decompressable
Instance Attribute Summary collapse
Instance Method Summary
collapse
#dump_unique_id_hex, #generate_unique_id
Constructor Details
#initialize(metadata, compress: :text) ⇒ Chunk
TODO: CompressedPackedMessage of forward protocol?
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 51
def initialize(metadata, compress: :text)
super()
@unique_id = generate_unique_id
@metadata = metadata
@state = :unstaged
@size = 0
@created_at = Time.now
@modified_at = Time.now
extend Decompressable if compress == :gzip
end
|
Instance Attribute Details
#created_at ⇒ Object
Returns the value of attribute created_at.
66
67
68
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 66
def created_at
@created_at
end
|
Returns the value of attribute metadata.
66
67
68
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 66
def metadata
@metadata
end
|
#modified_at ⇒ Object
Returns the value of attribute modified_at.
66
67
68
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 66
def modified_at
@modified_at
end
|
#state ⇒ Object
Returns the value of attribute state.
66
67
68
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 66
def state
@state
end
|
#unique_id ⇒ Object
Returns the value of attribute unique_id.
66
67
68
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 66
def unique_id
@unique_id
end
|
Instance Method Details
#append(data, **kwargs) ⇒ Object
data is array of formatted record string
69
70
71
72
73
74
75
76
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 69
def append(data, **kwargs)
raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
adding = ''.b
data.each do |d|
adding << d.b
end
concat(adding, data.size)
end
|
#bytesize ⇒ Object
91
92
93
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 91
def bytesize
raise NotImplementedError, "Implement this method in child class"
end
|
#close ⇒ Object
139
140
141
142
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 139
def close
@state = :closed
self
end
|
#closed? ⇒ Boolean
120
121
122
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 120
def closed?
@state == :closed
end
|
#commit ⇒ Object
83
84
85
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 83
def commit
raise NotImplementedError, "Implement this method in child class"
end
|
#concat(bulk, records) ⇒ Object
for event streams which is packed or zipped (and we want not to unpack/uncompress)
79
80
81
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 79
def concat(bulk, records)
raise NotImplementedError, "Implement this method in child class"
end
|
#empty? ⇒ Boolean
100
101
102
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 100
def empty?
size == 0
end
|
#enqueued! ⇒ Object
134
135
136
137
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 134
def enqueued!
@state = :queued
self
end
|
#open(**kwargs, &block) ⇒ Object
154
155
156
157
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 154
def open(**kwargs, &block)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise NotImplementedError, "Implement this method in child class"
end
|
#purge ⇒ Object
144
145
146
147
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 144
def purge
@state = :closed
self
end
|
#queued? ⇒ Boolean
116
117
118
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 116
def queued?
@state == :queued
end
|
#read(**kwargs) ⇒ Object
149
150
151
152
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 149
def read(**kwargs)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise NotImplementedError, "Implement this method in child class"
end
|
#rollback ⇒ Object
87
88
89
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 87
def rollback
raise NotImplementedError, "Implement this method in child class"
end
|
#size ⇒ Object
Also known as:
length
95
96
97
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 95
def size
raise NotImplementedError, "Implement this method in child class"
end
|
#staged! ⇒ Object
124
125
126
127
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 124
def staged!
@state = :staged
self
end
|
#staged? ⇒ Boolean
112
113
114
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 112
def staged?
@state == :staged
end
|
#unstaged! ⇒ Object
129
130
131
132
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 129
def unstaged!
@state = :unstaged
self
end
|
#unstaged? ⇒ Boolean
108
109
110
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 108
def unstaged?
@state == :unstaged
end
|
#writable? ⇒ Boolean
104
105
106
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 104
def writable?
@state == :staged || @state == :unstaged
end
|
#write_to(io, **kwargs) ⇒ Object
159
160
161
162
163
164
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 159
def write_to(io, **kwargs)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
open do |i|
IO.copy_stream(i, io)
end
end
|