Class: LogStash::Inputs::Redis

Inherits:
Threadable
  • Object
show all
Defined in:
lib/logstash/inputs/redis.rb

Instance Method Summary collapse

Instance Method Details

#add_external_redis_builder(builder) ⇒ Object

public API use to store a proc that can provide a redis instance or mock



64
65
66
67
# File 'lib/logstash/inputs/redis.rb', line 64

def add_external_redis_builder(builder) #callable
  @redis_builder = builder
  self
end

#new_redis_instanceObject



75
76
77
# File 'lib/logstash/inputs/redis.rb', line 75

def new_redis_instance
  @redis_builder.call
end

#registerObject



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/logstash/inputs/redis.rb', line 79

def register
  require 'redis'
  @redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"

  # TODO remove after setting key and data_type to true
  if @queue
    if @key || @data_type
      raise RuntimeError.new(
        "Cannot specify queue parameter and key or data_type"
      )
    end
    @key = @queue
    @data_type = 'list'
  end

  if !@key || !@data_type
    raise RuntimeError.new(
      "Must define queue, or key and data_type parameters"
    )
  end
  # end TODO

  @redis_builder ||= method(:internal_redis_builder)

  # just switch on data_type once
  if @data_type == 'list' || @data_type == 'dummy'
    @run_method = method(:list_runner)
    @teardown_method = method(:list_teardown)
  elsif @data_type == 'channel'
    @run_method = method(:channel_runner)
    @teardown_method = method(:subscribe_teardown)
  elsif @data_type == 'pattern_channel'
    @run_method = method(:pattern_channel_runner)
    @teardown_method = method(:subscribe_teardown)
  end

  # TODO(sissel, boertje): set @identity directly when @name config option is removed.
  @identity = @name != 'default' ? @name : "#{@redis_url} #{@data_type}:#{@key}"
  @logger.info("Registering Redis", :identity => @identity)
end

#run(output_queue) ⇒ Object

def register



120
121
122
123
124
# File 'lib/logstash/inputs/redis.rb', line 120

def run(output_queue)
  @run_method.call(output_queue)
rescue LogStash::ShutdownSignal
  # ignore and quit
end

#teardownObject

def run



126
127
128
129
# File 'lib/logstash/inputs/redis.rb', line 126

def teardown
  @shutdown_requested = true
  @teardown_method.call
end

#use_redis(instance) ⇒ Object

use to apply an instance directly and bypass the builder



70
71
72
73
# File 'lib/logstash/inputs/redis.rb', line 70

def use_redis(instance)
  @redis = instance
  self
end