Class: Krakow::Consumer

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Utils::Lazy
Defined in:
lib/krakow/consumer.rb,
lib/krakow/consumer/queue.rb

Overview

Consume messages from a server

Defined Under Namespace

Classes: Queue

Instance Attribute Summary collapse

Attributes collapse

Instance Method Summary collapse

Methods included from Utils::Lazy

included

Methods included from Utils::Logging

level=, #log

Constructor Details

#initialize(args = {}) ⇒ Consumer

Returns a new instance of Consumer.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/krakow/consumer.rb', line 41

def initialize(args={})
  super
  arguments[:connection_options] = {:features => {}, :config => {}}.merge(
    arguments[:connection_options] || {}
  )
  @connections = {}
  @queue = Queue.new(
    current_actor,
    :removal_callback => :remove_message
  )
  @distribution = Distribution::Default.new(
    :max_in_flight => max_in_flight,
    :backoff_interval => backoff_interval,
    :consumer => current_actor
  )
  if(nsqlookupd)
    debug "Connections will be established via lookup #{nsqlookupd.inspect}"
    @discovery = Discovery.new(:nsqlookupd => nsqlookupd)
    discover
  elsif(host && port)
    direct_connect
  else
    abort Error::ConfigurationError.new('No connection information provided!')
  end
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



18
19
20
# File 'lib/krakow/consumer.rb', line 18

def connections
  @connections
end

#discoveryObject (readonly)

Returns the value of attribute discovery.



18
19
20
# File 'lib/krakow/consumer.rb', line 18

def discovery
  @discovery
end

#distributionObject (readonly)

Returns the value of attribute distribution.



18
19
20
# File 'lib/krakow/consumer.rb', line 18

def distribution
  @distribution
end

#queueObject (readonly)

Returns the value of attribute queue.



18
19
20
# File 'lib/krakow/consumer.rb', line 18

def queue
  @queue
end

Instance Method Details

#backoff_intervalNumeric

Returns the backoff_interval attribute.

Returns:

  • (Numeric)

    the backoff_interval attribute



33
# File 'lib/krakow/consumer.rb', line 33

attribute :backoff_interval, Numeric

#backoff_interval?TrueClass, FalseClass

Returns truthiness of the backoff_interval attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the backoff_interval attribute



33
# File 'lib/krakow/consumer.rb', line 33

attribute :backoff_interval, Numeric

#build_connection(host, port, queue) ⇒ Krakow::Connection?

Build a new [Krakow::Connection]

Parameters:

  • host (String)

    remote host

  • port (String, Integer)

    remote port

  • queue (Queue)

    queue for messages

Returns:



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/krakow/consumer.rb', line 131

def build_connection(host, port, queue)
  begin
    connection = Connection.new(
      :host => host,
      :port => port,
      :queue => queue,
      :topic => topic,
      :channel => channel,
      :notifier => notifier,
      :features => connection_options[:features],
      :features_args => connection_options[:config],
      :callbacks => {
        :handle => {
          :actor => current_actor,
          :method => :process_message
        }
      }
    )
    queue.register_connection(connection)
    connection
  rescue => e
    error "Failed to build connection (host: #{host} port: #{port} queue: #{queue}) - #{e.class}: #{e}"
    debug "#{e.class}: #{e}\n#{e.backtrace.join("\n")}"
    nil
  end
end

#channelString

Returns the channel attribute.

Returns:

  • (String)

    the channel attribute



28
# File 'lib/krakow/consumer.rb', line 28

attribute :channel, String, :required => true

#channel?TrueClass, FalseClass

Returns truthiness of the channel attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the channel attribute



28
# File 'lib/krakow/consumer.rb', line 28

attribute :channel, String, :required => true

#confirm(message_id) ⇒ TrueClass Also known as: finish

Confirm message has been processed

Parameters:

Returns:

  • (TrueClass)

Raises:

  • (KeyError)

    connection not found



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/krakow/consumer.rb', line 286

def confirm(message_id)
  message_id = message_id.message_id if message_id.respond_to?(:message_id)
  begin
    begin
      connection = distribution.in_flight_lookup(message_id)
      connection.transmit(Command::Fin.new(:message_id => message_id))
      distribution.success(connection.identifier)
    rescue => e
      abort e
    end
    true
  rescue KeyError => e
    error "Message confirmation failed: #{e}"
    abort e
  rescue Error::LookupFailed => e
    error "Lookup of message for confirmation failed! <Message ID: #{message_id} - Error: #{e}>"
    abort e
  rescue Error::ConnectionUnavailable => e
    abort e
  rescue Celluloid::DeadActorError
    abort Error::ConnectionUnavailable.new
  ensure
    con = distribution.unregister_message(message_id)
    update_ready!(con) if con
  end
end

#connected?TrueClass, FalseClass

Returns currently connected to at least one nsqd.

Returns:

  • (TrueClass, FalseClass)

    currently connected to at least one nsqd



69
70
71
72
73
74
75
76
77
# File 'lib/krakow/consumer.rb', line 69

def connected?
  !!connections.values.any? do |con|
    begin
      con.connected?
    rescue Celluloid::DeadActorError
      false
    end
  end
end

#connection(key) ⇒ Krakow::Connection

Returns [Krakow::Connection] associated to key

Parameters:

  • key (Object)

    identifier

Returns:



98
99
100
# File 'lib/krakow/consumer.rb', line 98

def connection(key)
  @connections[key]
end

#connection_failure(actor, reason) ⇒ nil

Remove connection references when connection is terminated

Parameters:

  • actor (Object)

    terminated actor

  • reason (Exception)

    reason for termination

Returns:

  • (nil)


252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/krakow/consumer.rb', line 252

def connection_failure(actor, reason)
  if(reason && key = connections.key(actor))
    warn "Connection failure detected. Removing connection: #{key} - #{reason}"
    connections.delete(key)
    begin
      distribution.remove_connection(key)
    rescue Error::ConnectionUnavailable, Error::ConnectionFailure
      warn 'Caught connection unavailability'
    end
    queue.deregister_connection(key)
    distribution.redistribute!
    direct_connect unless discovery
  end
  nil
end

#connection_optionsHash

Returns the connection_options attribute.

Returns:

  • (Hash)

    the connection_options attribute



37
# File 'lib/krakow/consumer.rb', line 37

attribute :connection_options, Hash, :default => ->{ Hash.new }

#connection_options?TrueClass, FalseClass

Returns truthiness of the connection_options attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the connection_options attribute



37
# File 'lib/krakow/consumer.rb', line 37

attribute :connection_options, Hash, :default => ->{ Hash.new }

#consumer_cleanupnil

Instance destructor

Returns:

  • (nil)


110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/krakow/consumer.rb', line 110

def consumer_cleanup
  debug 'Tearing down consumer'
  if(distribution && distribution.alive?)
    distribution.terminate
  end
  if(queue && queue.alive?)
    queue.terminate
  end
  connections.values.each do |con|
    con.terminate if con.alive?
  end
  info 'Consumer torn down'
  nil
end

#direct_connectConnection

Connect to nsqd instance directly

Returns:



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/krakow/consumer.rb', line 82

def direct_connect
  debug "Connection will be established via direct connection #{host}:#{port}"
  connection = build_connection(host, port, queue)
  if(register(connection))
    info "Registered new connection #{connection}"
    distribution.redistribute!
  else
    abort Error::ConnectionFailure.new("Failed to establish subscription at provided end point (#{host}:#{port}")
  end
  connection
end

#discovernil

Start the discovery interval lookup

Returns:

  • (nil)


223
224
225
226
# File 'lib/krakow/consumer.rb', line 223

def discover
  init!
  after(discovery_interval + (discovery_jitter * rand)){ discover }
end

#discovery_intervalNumeric

Returns the discovery_interval attribute.

Returns:

  • (Numeric)

    the discovery_interval attribute



34
# File 'lib/krakow/consumer.rb', line 34

attribute :discovery_interval, Numeric, :default => 30

#discovery_interval?TrueClass, FalseClass

Returns truthiness of the discovery_interval attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the discovery_interval attribute



34
# File 'lib/krakow/consumer.rb', line 34

attribute :discovery_interval, Numeric, :default => 30

#discovery_jitterNumeric

Returns the discovery_jitter attribute.

Returns:

  • (Numeric)

    the discovery_jitter attribute



35
# File 'lib/krakow/consumer.rb', line 35

attribute :discovery_jitter, Numeric, :default => 10.0

#discovery_jitter?TrueClass, FalseClass

Returns truthiness of the discovery_jitter attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the discovery_jitter attribute



35
# File 'lib/krakow/consumer.rb', line 35

attribute :discovery_jitter, Numeric, :default => 10.0

#hostString

Returns the host attribute.

Returns:

  • (String)

    the host attribute



29
# File 'lib/krakow/consumer.rb', line 29

attribute :host, String

#host?TrueClass, FalseClass

Returns truthiness of the host attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the host attribute



29
# File 'lib/krakow/consumer.rb', line 29

attribute :host, String

#init!nil

Initialize the consumer by starting lookup and adding connections

Returns:

  • (nil)


201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/krakow/consumer.rb', line 201

def init!
  debug 'Running consumer `init!` connection builds'
  found = discovery.lookup(topic)
  debug "Discovery results: #{found.inspect}"
  connection = nil
  found.each do |node|
    debug "Processing discovery result: #{node.inspect}"
    key = Connection.identifier(node[:broadcast_address], node[:tcp_port], topic, channel)
    unless(connections[key])
      connection = build_connection(node[:broadcast_address], node[:tcp_port], queue)
      info "Registered new connection #{connection}" if register(connection)
    else
      debug "Discovery result already registered: #{node.inspect}"
    end
  end
  distribution.redistribute! if connection
  nil
end

#max_in_flightInteger

Returns the max_in_flight attribute.

Returns:

  • (Integer)

    the max_in_flight attribute



32
# File 'lib/krakow/consumer.rb', line 32

attribute :max_in_flight, Integer, :default => 1

#max_in_flight?TrueClass, FalseClass

Returns truthiness of the max_in_flight attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the max_in_flight attribute



32
# File 'lib/krakow/consumer.rb', line 32

attribute :max_in_flight, Integer, :default => 1

#notifier[Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]

Returns the notifier attribute.

Returns:

  • ([Celluloid::Signals, Celluloid::Condition, Celluloid::Actor])

    the notifier attribute



36
# File 'lib/krakow/consumer.rb', line 36

attribute :notifier, [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]

#notifier?TrueClass, FalseClass

Returns truthiness of the notifier attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the notifier attribute



36
# File 'lib/krakow/consumer.rb', line 36

attribute :notifier, [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]

#nsqlookupd[Array, String]

Returns the nsqlookupd attribute.

Returns:

  • ([Array, String])

    the nsqlookupd attribute



31
# File 'lib/krakow/consumer.rb', line 31

attribute :nsqlookupd, [Array, String]

#nsqlookupd?TrueClass, FalseClass

Returns truthiness of the nsqlookupd attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the nsqlookupd attribute



31
# File 'lib/krakow/consumer.rb', line 31

attribute :nsqlookupd, [Array, String]

#port[String, Integer]

Returns the port attribute.

Returns:

  • ([String, Integer])

    the port attribute



30
# File 'lib/krakow/consumer.rb', line 30

attribute :port, [String, Integer]

#port?TrueClass, FalseClass

Returns truthiness of the port attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the port attribute



30
# File 'lib/krakow/consumer.rb', line 30

attribute :port, [String, Integer]

#process_message(message, connection) ⇒ Krakow::FrameType

Note:

If we receive a message that is already in flight, attempt to scrub message from wait queue. If message is found, retry distribution registration. If message is not found, assume it is currently being processed and do not allow new message to be queued

Process a given message if required

Parameters:

Returns:



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/krakow/consumer.rb', line 168

def process_message(message, connection)
  discard = false
  if(message.is_a?(FrameType::Message))
    message.origin = current_actor
    message.connection = connection
    retried = false
    begin
      distribution.register_message(message, connection.identifier)
    rescue KeyError => e
      if(!retried && queue.scrub_duplicate_message(message))
        retried = true
        retry
      else
        error "Received message is currently in flight and not in wait queue. Discarding! (#{message})"
        discard = true
      end
    end
  end
  discard ? nil : message
end

#register(connection) ⇒ TrueClass, FalseClass

Register connection with distribution

Parameters:

Returns:

  • (TrueClass, FalseClass)

    true if subscription was successful



232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/krakow/consumer.rb', line 232

def register(connection)
  begin
    connection.init!
    connection.transmit(Command::Sub.new(:topic_name => topic, :channel_name => channel))
    self.link connection
    connections[connection.identifier] = connection
    distribution.add_connection(connection)
    true
  rescue Error::BadResponse => e
    debug "Failed to establish connection: #{e.result ? e.result.error : '<No Response!>'}"
    connection.terminate
    false
  end
end

#remove_message(messages) ⇒ NilClass

Note:

used mainly for queue callback

Remove message

Parameters:

Returns:

  • (NilClass)


273
274
275
276
277
278
279
# File 'lib/krakow/consumer.rb', line 273

def remove_message(messages)
  [messages].flatten.compact.each do |msg|
    distribution.unregister_message(msg.message_id)
    update_ready!(msg.connection)
  end
  nil
end

#requeue(message_id, timeout = 0) ⇒ TrueClass

Requeue message (generally due to processing failure)

Parameters:

Returns:

  • (TrueClass)


319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/krakow/consumer.rb', line 319

def requeue(message_id, timeout=0)
  message_id = message_id.message_id if message_id.respond_to?(:message_id)
  distribution.in_flight_lookup(message_id) do |connection|
    distribution.unregister_message(message_id)
    connection.transmit(
      Command::Req.new(
        :message_id => message_id,
        :timeout => timeout
      )
    )
    distribution.failure(connection.identifier)
    update_ready!(connection)
  end
  true
end

#to_sString

Returns stringify object.

Returns:

  • (String)

    stringify object



103
104
105
# File 'lib/krakow/consumer.rb', line 103

def to_s
  "<#{self.class.name}:#{object_id} T:#{topic} C:#{channel}>"
end

#topicString

Returns the topic attribute.

Returns:

  • (String)

    the topic attribute



27
# File 'lib/krakow/consumer.rb', line 27

attribute :topic, String, :required => true

#topic?TrueClass, FalseClass

Returns truthiness of the topic attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the topic attribute



27
# File 'lib/krakow/consumer.rb', line 27

attribute :topic, String, :required => true

#touch(message_id) ⇒ TrueClass

Touch message (to extend timeout)

Parameters:

Returns:

  • (TrueClass)


339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/krakow/consumer.rb', line 339

def touch(message_id)
  message_id = message_id.message_id if message_id.respond_to?(:message_id)
  begin
    distribution.in_flight_lookup(message_id) do |connection|
      connection.transmit(
        Command::Touch.new(:message_id => message_id)
      )
    end
    true
  rescue Error::LookupFailed => e
    error "Lookup of message for touch failed! <Message ID: #{message_id} - Error: #{e}>"
    abort e
  end
end

#update_ready!(connection) ⇒ nil

Send RDY for connection based on distribution rules

Parameters:

Returns:

  • (nil)


193
194
195
196
# File 'lib/krakow/consumer.rb', line 193

def update_ready!(connection)
  distribution.set_ready_for(connection)
  nil
end