Class: Sensu::Transport::Redis

Inherits:
Base
  • Object
show all
Defined in:
lib/sensu/transport/redis.rb

Constant Summary collapse

REDIS_KEYSPACE =

The Redis keyspace to use for the transport.

"transport"

Instance Attribute Summary

Attributes inherited from Base

#logger

Instance Method Summary collapse

Methods inherited from Base

#ack, #acknowledge, #after_reconnect, #before_reconnect, descendants, #on_error

Constructor Details

#initializeRedis

Returns a new instance of Redis.



11
12
13
14
15
# File 'lib/sensu/transport/redis.rb', line 11

def initialize
  @options = {}
  @connections = {}
  super
end

Instance Method Details

#closeObject

Close ALL Redis connections.



59
60
61
62
63
# File 'lib/sensu/transport/redis.rb', line 59

def close
  @connections.each_value do |connection|
    connection.close
  end
end

#connect(options = {}) ⇒ Object

Redis transport connection setup. This method sets ‘@options`, creates a named Redis connection “redis”, and sets the deferred status to `:succeeded` via `succeed()`.

Parameters:

  • options (Hash, String) (defaults to: {})


22
23
24
25
26
27
28
29
# File 'lib/sensu/transport/redis.rb', line 22

def connect(options={})
  @options = options || {}
  redis_connection("redis") do |connection|
    connection.callback do
      succeed
    end
  end
end

#connected?TrueClass, FalseClass

Indicates if ALL Redis connections are connected.

Returns:

  • (TrueClass, FalseClass)


52
53
54
55
56
# File 'lib/sensu/transport/redis.rb', line 52

def connected?
  !@connections.empty? && @connections.values.all? do |connection|
    connection.connected?
  end
end

#publish(type, pipe, message, options = {}) {|info| ... } ⇒ Object

Publish a message to the Redis transport. The transport pipe type determines the method of sending messages to consumers using Redis, either using PubSub or a list. The appropriate publish method is call for the pipe type given. The Redis transport ignores publish options.

Parameters:

  • type (Symbol)

    the transport pipe type, possible values are: :direct and :fanout.

  • pipe (String)

    the transport pipe name.

  • message (String)

    the message to be published to the transport.

  • options (Hash) (defaults to: {})

    IGNORED by this transport.

Yields:

  • (info)

    passes publish info to an optional callback/block.

Yield Parameters:

  • info (Hash)

    contains publish information, which may contain an error object.



79
80
81
82
83
84
85
86
# File 'lib/sensu/transport/redis.rb', line 79

def publish(type, pipe, message, options={}, &callback)
  case type.to_sym
  when :fanout
    pubsub_publish(pipe, message, &callback)
  when :direct
    list_publish(pipe, message, &callback)
  end
end

#reconnect(force = false) ⇒ Object

Reconnect to the Redis transport. The Redis connections used by the transport have auto-reconnect disabled; if a single connection is unhealthy, all connections are closed, the transport is reset, and new connections are made. If the transport is not already reconnecting to Redis, the ‘@before_reconnect` transport callback is called.

Parameters:

  • force (Boolean) (defaults to: false)

    the reconnect.



39
40
41
42
43
44
45
46
47
# File 'lib/sensu/transport/redis.rb', line 39

def reconnect(force=false)
  @before_reconnect.call unless @reconnecting
  unless @reconnecting && !force
    @reconnecting = true
    close
    reset
    connect(@options)
  end
end

#stats(funnel, options = {}) {|info| ... } ⇒ Object

Redis transport pipe/funnel stats, such as message and consumer counts. This method is currently unable to determine the consumer count for a Redis list.

Parameters:

  • funnel (String)

    the transport funnel to get stats for.

  • options (Hash) (defaults to: {})

    IGNORED by this transport.

Yields:

  • (info)

    passes list stats to the callback/block.

Yield Parameters:

  • info (Hash)

    contains list stats.



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/sensu/transport/redis.rb', line 140

def stats(funnel, options={})
  redis_connection("redis") do |connection|
    connection.llen(funnel) do |messages|
      info = {
        :messages => messages,
        :consumers => 0
      }
      yield(info)
    end
  end
end

#subscribe(type, pipe, funnel = nil, options = {}) {|info, message| ... } ⇒ Object

Subscribe to a Redis transport pipe. The transport pipe type determines the method of consuming messages from Redis, either using PubSub or a list. The appropriate subscribe method is call for the pipe type given. The Redis transport ignores subscribe options and the funnel name.

Parameters:

  • type (Symbol)

    the transport pipe type, possible values are: :direct and :fanout.

  • pipe (String)

    the transport pipe name.

  • funnel (String) (defaults to: nil)

    IGNORED by this transport.

  • options (Hash) (defaults to: {})

    IGNORED by this transport.

Yields:

  • (info, message)

    passes message info and content to the consumer callback/block.

Yield Parameters:

  • info (Hash)

    contains message information.

  • message (String)

    message.



103
104
105
106
107
108
109
110
# File 'lib/sensu/transport/redis.rb', line 103

def subscribe(type, pipe, funnel=nil, options={}, &callback)
  case type.to_sym
  when :fanout
    pubsub_subscribe(pipe, &callback)
  when :direct
    list_subscribe(pipe, &callback)
  end
end

#unsubscribe {|info| ... } ⇒ Object

Unsubscribe from all transport pipes. This method iterates through the current named Redis connections, unsubscribing the “pubsub” connection from Redis channels, and closing/deleting BLPOP connections.

Yields:

  • (info)

    passes info to an optional callback/block.

Yield Parameters:

  • info (Hash)

    empty hash.



119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/sensu/transport/redis.rb', line 119

def unsubscribe
  @connections.each do |name, connection|
    case name
    when "pubsub"
      connection.unsubscribe
    when /^#{REDIS_KEYSPACE}/
      connection.close
      @connections.delete(name)
    end
  end
  super
end