Class: LogStash::Inputs::Redis

Inherits:
Threadable
  • Object
show all
Defined in:
lib/logstash/inputs/redis.rb

Constant Summary collapse

BATCH_EMPTY_SLEEP =
0.25

Instance Method Summary collapse

Instance Method Details

#add_external_redis_builder(builder) ⇒ Object

public API use to store a proc that can provide a Redis instance or mock



65
66
67
68
# File 'lib/logstash/inputs/redis.rb', line 65

def add_external_redis_builder(builder) #callable
  @redis_builder = builder
  self
end

#new_redis_instanceObject



76
77
78
# File 'lib/logstash/inputs/redis.rb', line 76

def new_redis_instance
  @redis_builder.call
end

#registerObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/logstash/inputs/redis.rb', line 80

def register
  @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}"

  @redis_builder ||= method(:internal_redis_builder)

  # just switch on data_type once
  if @data_type == 'list' || @data_type == 'dummy'
    @run_method = method(:list_runner)
    @stop_method = method(:list_stop)
  elsif @data_type == 'channel'
    @run_method = method(:channel_runner)
    @stop_method = method(:subscribe_stop)
  elsif @data_type == 'pattern_channel'
    @run_method = method(:pattern_channel_runner)
    @stop_method = method(:subscribe_stop)
  end

  @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)

  @identity = "#{@redis_url} #{@data_type}:#{@key}"
  @logger.info("Registering Redis", :identity => @identity)
end

#run(output_queue) ⇒ Object

def register



103
104
105
106
107
# File 'lib/logstash/inputs/redis.rb', line 103

def run(output_queue)
  @run_method.call(output_queue)
rescue LogStash::ShutdownSignal
  # ignore and quit
end

#stopObject

def run



109
110
111
# File 'lib/logstash/inputs/redis.rb', line 109

def stop
  @stop_method.call
end

#use_redis(instance) ⇒ Object

use to apply an instance directly and bypass the builder



71
72
73
74
# File 'lib/logstash/inputs/redis.rb', line 71

def use_redis(instance)
  @redis = instance
  self
end