Class: LogStash::Inputs::Redis

Inherits:
Threadable
  • Object
show all
Defined in:
lib/logstash/inputs/redis.rb

Constant Summary collapse

BATCH_EMPTY_SLEEP =
0.25

Instance Method Summary collapse

Instance Method Details

#add_external_redis_builder(builder) ⇒ Object

public API use to store a proc that can provide a redis instance or mock



65
66
67
68
# File 'lib/logstash/inputs/redis.rb', line 65

def add_external_redis_builder(builder) #callable
  @redis_builder = builder
  self
end

#new_redis_instanceObject



76
77
78
# File 'lib/logstash/inputs/redis.rb', line 76

def new_redis_instance
  @redis_builder.call
end

#registerObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/logstash/inputs/redis.rb', line 80

def register
  @redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"

  # TODO remove after setting key and data_type to true
  if @queue
    if @key || @data_type
      raise RuntimeError.new(
        "Cannot specify queue parameter and key or data_type"
      )
    end
    @key = @queue
    @data_type = 'list'
  end

  if !@key || !@data_type
    raise RuntimeError.new(
      "Must define queue, or key and data_type parameters"
    )
  end
  # end TODO

  @redis_builder ||= method(:internal_redis_builder)

  # just switch on data_type once
  if @data_type == 'list' || @data_type == 'dummy'
    @run_method = method(:list_runner)
    @stop_method = method(:list_stop)
  elsif @data_type == 'channel'
    @run_method = method(:channel_runner)
    @stop_method = method(:subscribe_stop)
  elsif @data_type == 'pattern_channel'
    @run_method = method(:pattern_channel_runner)
    @stop_method = method(:subscribe_stop)
  end

  @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)

  # TODO(sissel, boertje): set @identity directly when @name config option is removed.
  @identity = @name != 'default' ? @name : "#{@redis_url} #{@data_type}:#{@key}"
  @logger.info("Registering Redis", :identity => @identity)
end

#run(output_queue) ⇒ Object

def register



122
123
124
125
126
# File 'lib/logstash/inputs/redis.rb', line 122

def run(output_queue)
  @run_method.call(output_queue)
rescue LogStash::ShutdownSignal
  # ignore and quit
end

#stopObject

def run



128
129
130
# File 'lib/logstash/inputs/redis.rb', line 128

def stop
  @stop_method.call
end

#use_redis(instance) ⇒ Object

use to apply an instance directly and bypass the builder



71
72
73
74
# File 'lib/logstash/inputs/redis.rb', line 71

def use_redis(instance)
  @redis = instance
  self
end