Class: Writer

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/fluent/command/cat.rb

Defined Under Namespace

Classes: TimerThread

Instance Method Summary collapse

Constructor Details

#initialize(tag, connector) ⇒ Writer

Returns a new instance of Writer.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/fluent/command/cat.rb', line 124

def initialize(tag, connector)
  @tag = tag
  @connector = connector
  @socket = false

  @socket_time = Time.now.to_i
  @socket_ttl = 10  # TODO
  @error_history = []

  @pending = []
  @pending_limit = 1024  # TODO
  @retry_wait = 1
  @retry_limit = 5  # TODO

  super()
end

Instance Method Details

#closeObject



180
181
182
183
# File 'lib/fluent/command/cat.rb', line 180

def close
  @socket.close
  @socket = nil
end

#on_timerObject



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/fluent/command/cat.rb', line 161

def on_timer
  now = Time.now.to_i

  synchronize {
    unless @pending.empty?
      # flush pending records
      if write_impl(@pending)
        # write succeeded
        @pending.clear
      end
    end

    if @socket && @socket_time + @socket_ttl < now
      # socket is not used @socket_ttl seconds
      close
    end
  }
end

#shutdownObject



191
192
193
# File 'lib/fluent/command/cat.rb', line 191

def shutdown
  @timer.shutdown
end

#startObject



185
186
187
188
189
# File 'lib/fluent/command/cat.rb', line 185

def start
  @timer = TimerThread.new(self)
  @timer.start
  self
end

#write(record) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/fluent/command/cat.rb', line 141

def write(record)
  if record.class != Hash
    raise ArgumentError, "Input must be a map (got #{record.class})"
  end

  entry = [Time.now.to_i, record]
  synchronize {
    unless write_impl([entry])
      # write failed
      @pending.push(entry)

      while @pending.size > @pending_limit
        # exceeds pending limit; trash oldest record
        time, record = @pending.shift
        abort_message(time, record)
      end
    end
  }
end