Class: Fluent::RedisPubsubOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_redis_pubsub.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeRedisPubsubOutput

Returns a new instance of RedisPubsubOutput.



6
7
8
9
10
# File 'lib/fluent/plugin/out_redis_pubsub.rb', line 6

def initialize
    super
    require 'redis'
    require 'msgpack'
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



4
5
6
# File 'lib/fluent/plugin/out_redis_pubsub.rb', line 4

def channel
  @channel
end

#hostObject (readonly)

Returns the value of attribute host.



4
5
6
# File 'lib/fluent/plugin/out_redis_pubsub.rb', line 4

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



4
5
6
# File 'lib/fluent/plugin/out_redis_pubsub.rb', line 4

def port
  @port
end

#redisObject (readonly)

Returns the value of attribute redis.



4
5
6
# File 'lib/fluent/plugin/out_redis_pubsub.rb', line 4

def redis
  @redis
end

Instance Method Details

#configure(config) ⇒ Object

Raises:

  • (Fluent::ConfigError)


12
13
14
15
16
17
18
# File 'lib/fluent/plugin/out_redis_pubsub.rb', line 12

def configure(config)
    super
    @host    = config.has_key?('host')    ? config['host']         : 'localhost'
    @port    = config.has_key?('port')    ? config['port'].to_i    : 6379
    raise Fluent::ConfigError, "need channel" if not config.has_key?('channel') or config['channel'].empty?
    @channel = config['channel'].to_s
end

#format(tag, time, record) ⇒ Object



29
30
31
32
33
# File 'lib/fluent/plugin/out_redis_pubsub.rb', line 29

def format(tag, time, record)
    record['__tag__']  = tag
    record['__time__'] = time
    record.to_msgpack
end

#shutdownObject



25
26
27
# File 'lib/fluent/plugin/out_redis_pubsub.rb', line 25

def shutdown
    @redis.quit
end

#startObject



20
21
22
23
# File 'lib/fluent/plugin/out_redis_pubsub.rb', line 20

def start
    super
    @redis = Redis.new(:host => @host, :port => @port ,:thread_safe => true)
end

#write(chunk) ⇒ Object



35
36
37
38
39
40
41
# File 'lib/fluent/plugin/out_redis_pubsub.rb', line 35

def write(chunk)
    @redis.pipelined do
        chunk.msgpack_each do |record|
            @redis.publish @channel, record.to_json
        end
    end
end