Class: Fluent::Plugin::Buffer::Chunk
Instance Attribute Summary collapse
Instance Method Summary
collapse
#each, #to_msgpack_stream
#msgpack_factory, #msgpack_packer, #msgpack_unpacker
#dump_unique_id_hex, #generate_unique_id
Constructor Details
#initialize(metadata) ⇒ Chunk
TODO: CompressedPackedMessage of forward protocol?
49
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 49
def initialize(metadata)
super()
@unique_id = generate_unique_id
@metadata = metadata
@state = :unstaged
@size = 0
@created_at = Time.now
@modified_at = Time.now
end
|
Instance Attribute Details
#created_at ⇒ Object
Returns the value of attribute created_at.
62
63
64
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 62
def created_at
@created_at
end
|
Returns the value of attribute metadata.
62
63
64
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 62
def metadata
@metadata
end
|
#modified_at ⇒ Object
Returns the value of attribute modified_at.
62
63
64
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 62
def modified_at
@modified_at
end
|
#state ⇒ Object
Returns the value of attribute state.
62
63
64
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 62
def state
@state
end
|
#unique_id ⇒ Object
Returns the value of attribute unique_id.
62
63
64
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 62
def unique_id
@unique_id
end
|
Instance Method Details
#append(data) ⇒ Object
data is array of formatted record string
65
66
67
68
69
70
71
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 65
def append(data)
adding = ''.b
data.each do |d|
adding << d.b
end
concat(adding, data.size)
end
|
#bytesize ⇒ Object
86
87
88
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 86
def bytesize
raise NotImplementedError, "Implement this method in child class"
end
|
#close ⇒ Object
134
135
136
137
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 134
def close
@state = :closed
self
end
|
#closed? ⇒ Boolean
115
116
117
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 115
def closed?
@state == :closed
end
|
#commit ⇒ Object
78
79
80
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 78
def commit
raise NotImplementedError, "Implement this method in child class"
end
|
#concat(bulk, records) ⇒ Object
for event streams which is packed or zipped (and we want not to unpack/uncompress)
74
75
76
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 74
def concat(bulk, records)
raise NotImplementedError, "Implement this method in child class"
end
|
#empty? ⇒ Boolean
95
96
97
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 95
def empty?
size == 0
end
|
#enqueued! ⇒ Object
129
130
131
132
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 129
def enqueued!
@state = :queued
self
end
|
#open(&block) ⇒ Object
148
149
150
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 148
def open(&block)
raise NotImplementedError, "Implement this method in child class"
end
|
#purge ⇒ Object
139
140
141
142
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 139
def purge
@state = :closed
self
end
|
#queued? ⇒ Boolean
111
112
113
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 111
def queued?
@state == :queued
end
|
#read ⇒ Object
144
145
146
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 144
def read
raise NotImplementedError, "Implement this method in child class"
end
|
#rollback ⇒ Object
82
83
84
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 82
def rollback
raise NotImplementedError, "Implement this method in child class"
end
|
#size ⇒ Object
Also known as:
length
90
91
92
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 90
def size
raise NotImplementedError, "Implement this method in child class"
end
|
#staged! ⇒ Object
119
120
121
122
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 119
def staged!
@state = :staged
self
end
|
#staged? ⇒ Boolean
107
108
109
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 107
def staged?
@state == :staged
end
|
#unstaged! ⇒ Object
124
125
126
127
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 124
def unstaged!
@state = :unstaged
self
end
|
#unstaged? ⇒ Boolean
103
104
105
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 103
def unstaged?
@state == :unstaged
end
|
#writable? ⇒ Boolean
99
100
101
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 99
def writable?
@state == :staged || @state == :unstaged
end
|
#write_to(io) ⇒ Object
152
153
154
155
156
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 152
def write_to(io)
open do |i|
IO.copy_stream(i, io)
end
end
|