Class: Flume::LogDevice
- Inherits:
-
Object
- Object
- Flume::LogDevice
- Defined in:
- lib/flume/log_device.rb
Instance Method Summary collapse
- #channel ⇒ Object
- #close ⇒ Object
-
#initialize(*args, &block) ⇒ LogDevice
constructor
A new instance of LogDevice.
- #size ⇒ Object
- #tail(n = 80) ⇒ Object
- #tailf(&block) ⇒ Object
- #truncate(n) ⇒ Object
- #write(message) ⇒ Object
Constructor Details
#initialize(*args, &block) ⇒ LogDevice
Returns a new instance of LogDevice.
9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/flume/log_device.rb', line 9 def initialize(*args, &block) = args.last.is_a?(Hash) ? args.pop : {} @config = OpenStruct.new() block.call(@config) if block @redis = @config.redis || proc { Redis.new } @cap = @config.cap || (2 ** 16) @step = @config.step || 0 @cycle = @config.cycle || (2 ** 8) @list = @config.list || 'flume:log' end |
Instance Method Details
#channel ⇒ Object
22 23 24 |
# File 'lib/flume/log_device.rb', line 22 def channel "flume:#{list}" end |
#close ⇒ Object
44 45 46 |
# File 'lib/flume/log_device.rb', line 44 def close redis.quit rescue nil end |
#size ⇒ Object
70 71 72 |
# File 'lib/flume/log_device.rb', line 70 def size redis.llen(list) end |
#tail(n = 80) ⇒ Object
48 49 50 |
# File 'lib/flume/log_device.rb', line 48 def tail(n = 80) redis.lrange(list, 0, n - 1).reverse end |
#tailf(&block) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/flume/log_device.rb', line 52 def tailf(&block) begin redis.subscribe(channel) do |on| on. do |channel, | block.call() end end rescue Redis::BaseConnectionError => error puts "#{error}, retrying in 1s" sleep 1 retry end end |
#truncate(n) ⇒ Object
66 67 68 |
# File 'lib/flume/log_device.rb', line 66 def truncate(n) redis.ltrim(list, 0, n - 1) end |
#write(message) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/flume/log_device.rb', line 26 def write() begin redis.lpush(list, ) rescue Object => e error = "#{ e. } (#{ e.class })\n#{ Array(e.backtrace).join(10.chr) }" STDERR.puts(error) STDERR.puts() end ensure redis.publish(channel, ) if (step % cycle).zero? truncate(cap) rescue nil end self.step = (step + 1) % cycle end |