Class: Fluent::RedisPubsubInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_redis_pubsub.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeRedisPubsubInput

Returns a new instance of RedisPubsubInput.



12
13
14
15
16
# File 'lib/fluent/plugin/in_redis_pubsub.rb', line 12

def initialize
    super
    require 'redis'
    require 'msgpack'
end

Instance Attribute Details

#redisObject (readonly)

Returns the value of attribute redis.



5
6
7
# File 'lib/fluent/plugin/in_redis_pubsub.rb', line 5

def redis
  @redis
end

Instance Method Details

#configure(config) ⇒ Object

Raises:

  • (Fluent::ConfigError)


18
19
20
21
22
23
24
# File 'lib/fluent/plugin/in_redis_pubsub.rb', line 18

def configure(config)
    super
    @host    = config.has_key?('host')    ? config['host']         : 'localhost'
    @port    = config.has_key?('port')    ? config['port'].to_i    : 6379
    raise Fluent::ConfigError, "need channel" if not config.has_key?('channel') or config['channel'].empty?
    @channel = config['channel'].to_s
end

#runObject



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/in_redis_pubsub.rb', line 32

def run
    @redis.subscribe @channel do |on|
        on.message do |channel,msg|
            parsed = nil
            begin
                parsed = JSON.parse msg
            rescue JSON::ParserError => e
                $log.error e
            end
            Engine.emit @tag, Engine.now, parsed || msg
        end
    end
end

#shutdownObject



46
47
48
49
# File 'lib/fluent/plugin/in_redis_pubsub.rb', line 46

def shutdown
    Thread.kill(@thread)
    @redis.quit
end

#startObject



26
27
28
29
30
# File 'lib/fluent/plugin/in_redis_pubsub.rb', line 26

def start
    super
    @redis  = Redis.new(:host => @host, :port => @port ,:thread_safe => true)
    @thread = Thread.new(&method(:run))
end