Class: LogStash::Filters::Memcached

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/filters/memcached.rb

Overview

This filter provides facilities to interact with Memcached.

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#cacheObject (readonly)

Returns the value of attribute cache.



76
77
78
# File 'lib/logstash/filters/memcached.rb', line 76

def cache
  @cache
end

Instance Method Details

#closeObject



115
116
117
118
119
120
121
122
123
124
# File 'lib/logstash/filters/memcached.rb', line 115

def close
  @connection_mutex.synchronize do
    @connected.make_false
    cache.close
  end
rescue => e
  # we basically ignore any error here as we may be trying to close an invalid
  # connection or if we close on shutdown we can also ignore any error
  logger.debug("error closing memcached connection", :message => e.message)
end

#filter(event) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/logstash/filters/memcached.rb', line 93

def filter(event)
  unless connection_available?
    event.tag(@tag_on_failure)
    return
  end

  begin
    set_success = do_set(event)
    get_success = do_get(event)
    filter_matched(event) if (set_success || get_success)
  rescue Dalli::NetworkError, Dalli::RingError => e
    event.tag(@tag_on_failure)
    logger.error("memcached communication error",  hosts: @memcached_hosts, options: @memcached_options, message: e.message)
    close
  rescue => e
    meta = { :message => e.message }
    meta[:backtrace] = e.backtrace if logger.debug?
    logger.error("unexpected error", meta)
    event.tag(@tag_on_failure)
  end
end

#registerObject

Raises:

  • (LogStash::ConfigurationError)


78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/logstash/filters/memcached.rb', line 78

def register
  raise(LogStash::ConfigurationError, "'ttl' option cannot be negative") if @ttl < 0

  @memcached_hosts = validate_connection_hosts
  @memcached_options = validate_connection_options
  begin
    @cache = new_connection(@memcached_hosts, @memcached_options)
  rescue => e
    logger.error("failed to connect to memcached", hosts: @memcached_hosts, options: @memcached_options, message: e.message)
    fail("failed to connect to memcached")
  end
  @connected = Concurrent::AtomicBoolean.new(true)
  @connection_mutex = Mutex.new
end