Class: ActiveMessaging::Adapters::Kestrel::Connection

Inherits:
BaseConnection
  • Object
show all
Includes:
ActiveMessaging::Adapter
Defined in:
lib/active_messaging/adapters/kestrel.rb

Overview

Connection to a kestrel message queue server

Constant Summary collapse

KESTREL_STATS_QUEUE_KEYS =
[:items, :bytes, :total_items, :logsize, :expired_items, :mem_items, :mem_bytes, :age, :discarded, :waiters, :open_transactions]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cfg = {}) ⇒ Connection

Create a new Kestrel adapter using the provided config



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/active_messaging/adapters/kestrel.rb', line 68

def initialize(cfg = {})
  # Like symbol keys
  cfg = symbolize_keys(cfg)

  # Create a logger.  Use framework loggers when available.
  @logger = cfg.delete(:logger) || ActiveMessaging.logger || (defined?(::Rails) && ::Rails.logger ? ::Rails.logger : nil) || default_logger

  # Get the retry policy
  @retry_policy = cfg.delete(:retry_policy) || {:strategy => SimpleRetry, :config => {:tries => 1, :delay => 5}}
  # If the retry policy came from the cfg, make sure we set the :logger
  @retry_policy[:config][:logger] ||= @logger
  # Turn the strategy into a Class if it is a String
  if @retry_policy[:strategy].is_a?(String)
    # Convert strategy from string to class
    @retry_policy[:strategy] = Kestrel.const_get(@retry_policy[:strategy]) rescue Kestrel.to_class(@retry_policy[:strategy])
  end

  @empty_queues_delay = cfg.delete(:empty_queues_delay)
  @config = cfg
  @subscriptions = {}
  retrier
  connect
  nil
end

Instance Attribute Details

#loggerObject

Logging



63
64
65
# File 'lib/active_messaging/adapters/kestrel.rb', line 63

def logger
  @logger
end

#retry_policyObject

Reconnect on error



61
62
63
# File 'lib/active_messaging/adapters/kestrel.rb', line 61

def retry_policy
  @retry_policy
end

Instance Method Details

#connectObject

Connect to the kestrel server using a Memcached client



122
123
124
125
126
# File 'lib/active_messaging/adapters/kestrel.rb', line 122

def connect
  logger.debug("Creating connection to Kestrel using config #{@config.inspect}") if logger && logger.debug?
  @kestrel = MemCache.new(@config)
  @kestrel.servers = @config[:servers]
end

#queue_statsObject

Returns hash of hashes of hashes containing stats for each active queue in each member of the kestrel cluster. top level hash has following structure:

{ "server1_def" => { "queue1" => { ... }, "queue2" => { ... } },
  "server2_def" => { "queue1" => { ... }, "queue2" => { ... } } }

“server_def” are host:port



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/active_messaging/adapters/kestrel.rb', line 99

def queue_stats
  stats = @kestrel.stats
  queues = stats.values.inject([]) do |queue_names, hash|
    hash.keys.each do |key|
      if md = /queue_(.+)_total_items/.match(key)
        queue_names << md[1]
      end
    end
    queue_names
  end
  stats.inject(Hash.new{|h,k| h[k] = Hash.new(&h.default_proc) }) do |return_hash, (server_def, stats_hash)|
    queues.each do |queue|
      KESTREL_STATS_QUEUE_KEYS.each do |key|
        stats_key = "queue_#{queue}_#{key}"
        denormalized_name = denormalize_name(queue) # denormalize the name ...
        return_hash[server_def][denormalized_name][key] = stats_hash[stats_key]
      end
    end
    return_hash
  end
end

#receiveObject

Gets a message from any subscribed destination and returns it as a ActiveMessaging::Adaptors::Kestrel::Message object



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/active_messaging/adapters/kestrel.rb', line 166

def receive

  if @subscriptions.size > 0
    @retrier.do_work(@retry_policy[:config]) do
      queues_to_check = @subscriptions.size > 1 ? @subscriptions.keys.sort_by{rand} : @subscriptions.keys
      queues_to_check.each do |queue|
        if item = @kestrel.get(normalize(queue))
          # TODO: ActiveMessaging ought to provide a way to do messaging
          # without having to wrap the messages in another object
          #logger.debug("Got message from queue #{queue}: #{item}") if logger && logger.debug?
          return Message.new({'destination' => queue}, item, queue)
        end
      end
    end
  end
  # Sleep a little to avoid a spin loop (ActiveMessaging Gateway ought to do this)
  sleep(@empty_queues_delay) if @empty_queues_delay && @empty_queues_delay > 0
  return nil
end

#retrierObject

Creates a retrier object according to the @retry_policy



129
130
131
132
133
# File 'lib/active_messaging/adapters/kestrel.rb', line 129

def retrier
  @retrier ||= begin
    @retry_policy[:strategy].new
  end
end

#send(destination_name, body, headers = {}) ⇒ Object

Send a message to the named destination. headers can include any of the following keys:

:ttl => Set the time to live of the message in seconds


155
156
157
158
159
160
161
162
# File 'lib/active_messaging/adapters/kestrel.rb', line 155

def send(destination_name, body, headers = {})
  ttl = (headers[:ttl] || 0).to_i
  if ttl <= 0
    @kestrel.set(normalize(destination_name), body)
  else
    @kestrel.set(normalize(destination_name), body, ttl)
  end
end

#subscribe(destination_name, headers = {}) ⇒ Object

Subscribe to the named destination and begin receiving messages from it



137
138
139
140
141
142
143
144
145
# File 'lib/active_messaging/adapters/kestrel.rb', line 137

def subscribe(destination_name, headers = {})
  headers[:destination] = destination_name
  if @subscriptions[destination_name]
    # TODO: Should you get an exception or no?
  else
    @subscriptions[destination_name] = headers
  end
  nil
end

#unsubscribe(destination_name, headers = {}) ⇒ Object

Stop receiving messages from the named destination



148
149
150
# File 'lib/active_messaging/adapters/kestrel.rb', line 148

def unsubscribe(destination_name, headers = {})
  @subscriptions.delete(destination_name)
end