Class: LogStash::Inputs::RedisCluster

Inherits:
Threadable
  • Object
show all
Defined in:
lib/logstash/inputs/redis_cluster.rb

Constant Summary collapse

BATCH_EMPTY_SLEEP =
0.25

Instance Method Summary collapse

Instance Method Details

#registerObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/logstash/inputs/redis_cluster.rb', line 67

def register
  host = self.find_redis_node(@key).split(':')
  @logger.info("HOST " + host.to_s)
  @host = host.first
  @port = host.last
  @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}"

  # just switch on data_type once
  if @data_type == 'list' || @data_type == 'dummy'
    @run_method = method(:list_runner)
    @stop_method = method(:list_stop)
  elsif @data_type == 'channel'
    @run_method = method(:channel_runner)
    @stop_method = method(:subscribe_stop)
  elsif @data_type == 'pattern_channel'
    @run_method = method(:pattern_channel_runner)
    @stop_method = method(:subscribe_stop)
  end

  @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)

  @identity = "#{@redis_url} #{@data_type}:#{@key}"
  puts "Identity: " + @identity
  @logger.info("Registering Redis", :identity => @identity)
end

#run(output_queue) ⇒ Object

def register



93
94
95
96
97
# File 'lib/logstash/inputs/redis_cluster.rb', line 93

def run(output_queue)
  @run_method.call(output_queue)
rescue LogStash::ShutdownSignal
  # ignore and quit
end

#stopObject

def run



99
100
101
# File 'lib/logstash/inputs/redis_cluster.rb', line 99

def stop
  @stop_method.call
end