Class: Fluent::KeepForwardOutput
- Inherits:
-
ForwardOutput
- Object
- ForwardOutput
- Fluent::KeepForwardOutput
- Defined in:
- lib/fluent/plugin/out_keep_forward.rb
Defined Under Namespace
Classes: NonHeartbeatNode
Instance Attribute Summary collapse
-
#watcher_interval ⇒ Object
for test.
Instance Method Summary collapse
- #cache_node(tag, node) ⇒ Object
- #cache_sock(node, sock) ⇒ Object
- #configure(conf) ⇒ Object
- #get_mutex(node) ⇒ Object
- #get_node(tag) ⇒ Object
- #get_sock ⇒ Object
- #get_sock_expired_at ⇒ Object
- #keepforward(tag) ⇒ Object
- #reconnect(node) ⇒ Object
-
#run ⇒ Object
Override to disable heartbeat.
-
#send_data(node, tag, chunk) ⇒ Object
Override for keepalive.
- #shutdown ⇒ Object
- #sock_write(sock, tag, chunk) ⇒ Object
- #start ⇒ Object
- #start_watcher ⇒ Object
- #stop_watcher ⇒ Object
-
#watch_keepalive_time ⇒ Object
watcher thread callback.
- #weight_send_data(tag, chunk) ⇒ Object
-
#write_objects(tag, chunk) ⇒ Object
Override.
Instance Attribute Details
#watcher_interval ⇒ Object
for test
40 41 42 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 40 def watcher_interval @watcher_interval end |
Instance Method Details
#cache_node(tag, node) ⇒ Object
257 258 259 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 257 def cache_node(tag, node) @node[keepforward(tag)] = node end |
#cache_sock(node, sock) ⇒ Object
271 272 273 274 275 276 277 278 279 280 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 271 def cache_sock(node, sock) if sock get_sock[node] = sock get_sock_expired_at[node] = Time.now + @keepalive_time if @keepalive_time log.info "out_keep_forward: keepalive connection opened", :host=>node.host, :port=>node.port else get_sock[node] = nil get_sock_expired_at[node] = nil end end |
#configure(conf) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 42 def configure(conf) super if @heartbeat_type == :none @nodes = @nodes.map {|node| NonHeartbeatNode.new(node) } end @node = {} @sock = {} @sock_expired_at = {} @mutex = {} @watcher_interval = 1 end |
#get_mutex(node) ⇒ Object
265 266 267 268 269 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 265 def get_mutex(node) thread_id = Thread.current.object_id @mutex[thread_id] ||= {} @mutex[thread_id][node] ||= Mutex.new end |
#get_node(tag) ⇒ Object
261 262 263 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 261 def get_node(tag) @node[keepforward(tag)] end |
#get_sock ⇒ Object
282 283 284 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 282 def get_sock @sock[Thread.current.object_id] ||= {} end |
#get_sock_expired_at ⇒ Object
286 287 288 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 286 def get_sock_expired_at @sock_expired_at[Thread.current.object_id] ||= {} end |
#keepforward(tag) ⇒ Object
246 247 248 249 250 251 252 253 254 255 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 246 def keepforward(tag) case @keepforward when :tag tag when :thread Thread.current.object_id else # :one :one end end |
#reconnect(node) ⇒ Object
191 192 193 194 195 196 197 198 199 200 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 191 def reconnect(node) sock = connect(node) opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) sock end |
#run ⇒ Object
Override to disable heartbeat
84 85 86 87 88 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 84 def run unless @heartbeat_type == :none super end end |
#send_data(node, tag, chunk) ⇒ Object
Override for keepalive
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 163 def send_data(node, tag, chunk) sock = nil get_mutex(node).synchronize do sock = get_sock[node] if @keepalive unless sock sock = reconnect(node) cache_sock(node, sock) if @keepalive end begin sock_write(sock, tag, chunk) node.heartbeat(false) log.debug "out_keep_forward: write to", :host=>node.host, :port=>node.port rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ETIMEDOUT => e log.warn "out_keep_forward: send_data failed #{e.class} #{e.message}", :host=>node.host, :port=>node.port if @keepalive sock.close rescue IOError cache_sock(node, nil) end raise e ensure unless @keepalive sock.close if sock end end end end |
#shutdown ⇒ Object
61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 61 def shutdown @finished = true @loop.watchers.each {|w| w.detach } @loop.stop unless @heartbeat_type == :none # custom @thread.join @usock.close if @usock stop_watcher end |
#sock_write(sock, tag, chunk) ⇒ Object
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 202 def sock_write(sock, tag, chunk) # beginArray(2) sock.write FORWARD_HEADER # writeRaw(tag) sock.write tag.to_msgpack # tag # beginRaw(size) sz = chunk.size #if sz < 32 # # FixRaw # sock.write [0xa0 | sz].pack('C') #elsif sz < 65536 # # raw 16 # sock.write [0xda, sz].pack('Cn') #else # raw 32 sock.write [0xdb, sz].pack('CN') #end # writeRawBody(packed_es) chunk.write_to(sock) end |
#start ⇒ Object
56 57 58 59 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 56 def start super start_watcher end |
#start_watcher ⇒ Object
70 71 72 73 74 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 70 def start_watcher if @keepalive and @keepalive_time @watcher = Thread.new(&method(:watch_keepalive_time)) end end |
#stop_watcher ⇒ Object
76 77 78 79 80 81 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 76 def stop_watcher if @watcher @watcher.terminate @watcher.join end end |
#watch_keepalive_time ⇒ Object
watcher thread callback
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 227 def watch_keepalive_time while true sleep @watcher_interval thread_ids = @sock.keys thread_ids.each do |thread_id| @sock[thread_id].each do |node, sock| @mutex[thread_id][node].synchronize do next unless sock_expired_at = @sock_expired_at[thread_id][node] next unless Time.now >= sock_expired_at sock.close rescue IOError if sock @sock[thread_id][node] = nil @sock_expired_at[thread_id][node] = nil log.debug "out_keep_forward: keepalive connection closed", :host=>node.host, :port=>node.port, :thread_id=>thread_id end end end end end |
#weight_send_data(tag, chunk) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 134 def weight_send_data(tag, chunk) error = nil wlen = @weight_array.length wlen.times do @rr = (@rr + 1) % wlen node = @weight_array[@rr] if node.available? begin send_data(node, tag, chunk) cache_node(tag, node) return rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end end cache_node(tag, nil) if error raise error else raise "no nodes are available" # TODO message end end |
#write_objects(tag, chunk) ⇒ Object
Override
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/fluent/plugin/out_keep_forward.rb', line 115 def write_objects(tag, chunk) return if chunk.empty? error = nil node = get_node(tag) if node and node.available? and (!@prefer_recover or @weight_array.include?(node)) begin send_data(node, tag, chunk) return rescue rebuild_weight_array if @heartbeat_type == :none weight_send_data(tag, chunk) end else rebuild_weight_array if @heartbeat_type == :none weight_send_data(tag, chunk) end end |