Class: Krakow::Distribution Abstract

Inherits:
Object
  • Object
show all
Extended by:
Utils::Lazy::ClassMethods
Includes:
Celluloid, Utils::Lazy, Utils::Lazy::InstanceMethods
Defined in:
lib/krakow/distribution.rb,
lib/krakow/distribution/default.rb

Overview

This class is abstract.

Message distribution

Direct Known Subclasses

Default

Defined Under Namespace

Classes: Default

Instance Attribute Summary collapse

Attributes included from Utils::Lazy::InstanceMethods

#arguments

Attributes collapse

Instance Method Summary collapse

Methods included from Utils::Lazy::ClassMethods

attribute, attributes, set_attributes

Methods included from Utils::Lazy::InstanceMethods

#inspect, #to_s

Methods included from Utils::Lazy

included

Methods included from Utils::Logging

level=, #log

Constructor Details

#initialize(args = {}) ⇒ Distribution

Returns a new instance of Distribution.



33
34
35
36
37
38
# File 'lib/krakow/distribution.rb', line 33

def initialize(args={})
  super
  @ideal = 0
  @flight_record = {}
  @registry = {}
end

Instance Attribute Details

#flight_recordObject

Returns the value of attribute flight_record.



17
18
19
# File 'lib/krakow/distribution.rb', line 17

def flight_record
  @flight_record
end

#idealObject

Returns the value of attribute ideal.



17
18
19
# File 'lib/krakow/distribution.rb', line 17

def ideal
  @ideal
end

#registryObject

Returns the value of attribute registry.



17
18
19
# File 'lib/krakow/distribution.rb', line 17

def registry
  @registry
end

Instance Method Details

#add_connection(connection) ⇒ TrueClass

Add connection to make available for RDY distribution

Parameters:

Returns:

  • (TrueClass)


123
124
125
126
127
128
129
130
131
132
133
# File 'lib/krakow/distribution.rb', line 123

def add_connection(connection)
  unless(registry[connection.identifier])
    registry[connection.identifier] = {
      :ready => initial_ready,
      :in_flight => 0,
      :failures => 0,
      :backoff_until => 0
    }
  end
  true
end

#backoff_intervalNumeric

Returns the backoff_interval attribute.

Returns:

  • (Numeric)

    the backoff_interval attribute



28
# File 'lib/krakow/distribution.rb', line 28

attribute :backoff_interval, Numeric

#backoff_interval?TrueClass, FalseClass

Returns truthiness of the backoff_interval attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the backoff_interval attribute



28
# File 'lib/krakow/distribution.rb', line 28

attribute :backoff_interval, Numeric

#calculate_ready!(connection_identifier) ⇒ Integer

Abstract

Determine RDY value for given connection

Parameters:

  • connection_identifier (String)

Returns:

  • (Integer)


48
49
50
# File 'lib/krakow/distribution.rb', line 48

def calculate_ready!(connection_identifier)
  raise NotImplementedError.new 'Custom `#calculate_ready!` method must be provided!'
end

#connection_lookup(identifier) ⇒ Krakow::Connection?

Return connection associated with given registry key

Parameters:

  • identifier (String)

    connection identifier

Returns:



156
157
158
# File 'lib/krakow/distribution.rb', line 156

def connection_lookup(identifier)
  consumer.connection(identifier)
end

#connectionsArray<Krakow::Connection>

Returns connections in registry.

Returns:



192
193
194
195
196
# File 'lib/krakow/distribution.rb', line 192

def connections
  registry.keys.map do |identifier|
    connection_lookup(identifier)
  end.compact
end

#consumerKrakow::Consumer

Returns the consumer attribute.

Returns:



26
# File 'lib/krakow/distribution.rb', line 26

attribute :consumer, Krakow::Consumer, :required => true

#consumer?TrueClass, FalseClass

Returns truthiness of the consumer attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the consumer attribute



26
# File 'lib/krakow/distribution.rb', line 26

attribute :consumer, Krakow::Consumer, :required => true

#failure(connection_identifier) ⇒ TrueClass

Log failure of processed message

Parameters:

  • connection_identifier (String)

Returns:

  • (TrueClass)


202
203
204
205
206
207
208
209
# File 'lib/krakow/distribution.rb', line 202

def failure(connection_identifier)
  if(backoff_interval)
    registry_info = registry_lookup(connection_identifier)
    registry_info[:failures] += 1
    registry_info[:backoff_until] = Time.now.to_i + (registry_info[:failures] * backoff_interval)
  end
  true
end

#in_flight_lookup(msg_id) {|connection| ... } ⇒ Krakow::Connection, Object

Return source connection for given message ID

Parameters:

  • msg_id (String)

Yields:

  • execute with connection

Yield Parameters:

Returns:



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/krakow/distribution.rb', line 166

def in_flight_lookup(msg_id)
  connection = connection_lookup(flight_record[msg_id])
  unless(connection)
    abort Krakow::Error::LookupFailed.new("Failed to locate in flight message (ID: #{msg_id})")
  end
  if(block_given?)
    begin
      yield connection
    rescue => e
      abort e
    end
  else
    connection
  end
end

#initial_readyInteger

Initial ready value used for new connections

Returns:

  • (Integer)


99
100
101
# File 'lib/krakow/distribution.rb', line 99

def initial_ready
  ideal > 0 ? 1 : 0
end

#max_in_flightInteger

Returns the max_in_flight attribute.

Returns:

  • (Integer)

    the max_in_flight attribute



29
# File 'lib/krakow/distribution.rb', line 29

attribute :max_in_flight, Integer, :default => 1

#max_in_flight?TrueClass, FalseClass

Returns truthiness of the max_in_flight attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the max_in_flight attribute



29
# File 'lib/krakow/distribution.rb', line 29

attribute :max_in_flight, Integer, :default => 1

#ready_for(connection_identifier) ⇒ Integer

Return the currently configured RDY value for given connnection

Parameters:

  • connection_identifier (String)

Returns:

  • (Integer)


79
80
81
# File 'lib/krakow/distribution.rb', line 79

def ready_for(connection_identifier)
  registry_lookup(connection_identifier)[:ready]
end

#redistribute!Object

Abstract

Reset flight distributions



41
42
43
# File 'lib/krakow/distribution.rb', line 41

def redistribute!
  raise NotImplementedError.new 'Custom `#redistrubute!` method must be provided!'
end

#register_message(message, connection_identifier) ⇒ Integer

Registers message into registry and configures for distribution

Parameters:

Returns:

  • (Integer)


108
109
110
111
112
113
114
115
116
117
# File 'lib/krakow/distribution.rb', line 108

def register_message(message, connection_identifier)
  if(flight_record[message.message_id])
    abort KeyError.new "Message is already registered in flight record! (#{message.message_id})"
  else
    registry_info = registry_lookup(connection_identifier)
    registry_info[:in_flight] += 1
    flight_record[message.message_id] = connection_identifier
    calculate_ready!(connection_identifier)
  end
end

#registry_lookup(connection_identifier) ⇒ Hash

Return registry information for given connection

Parameters:

  • connection_identifier (String)

Returns:

  • (Hash)

    registry information

Raises:



186
187
188
189
# File 'lib/krakow/distribution.rb', line 186

def registry_lookup(connection_identifier)
  registry[connection_identifier] ||
    abort(Krakow::Error::LookupFailed.new("Failed to locate connection information in registry (#{connection_identifier})"))
end

#remove_connection(connection_identifier, *args) ⇒ TrueClass

Remove connection from RDY distribution

Parameters:

  • connection_identifier (String)

Returns:

  • (TrueClass)


139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/krakow/distribution.rb', line 139

def remove_connection(connection_identifier, *args)
  # remove connection from registry
  registry.delete(connection_identifier)
  # remove any in flight messages
  flight_record.delete_if do |k,v|
    if(v == connection_identifier)
      warn "Removing in flight reference due to failed connection: #{v}"
      true
    end
  end
  true
end

#set_ready_for(connection, *_) ⇒ Krakow::FrameType::Error?

Send RDY for given connection

Parameters:

Returns:



88
89
90
91
92
93
94
# File 'lib/krakow/distribution.rb', line 88

def set_ready_for(connection, *_)
  connection.transmit(
    Command::Rdy.new(
      :count => ready_for(connection.identifier)
    )
  )
end

#success(connection_identifier) ⇒ TrueClass

Log success of processed message

Parameters:

  • connection_identifier (String)

Returns:

  • (TrueClass)


215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/krakow/distribution.rb', line 215

def success(connection_identifier)
  if(backoff_interval)
    registry_info = registry_lookup(connection_identifier)
    if(registry_info[:failures] > 1)
      registry_info[:failures] -= 1
      registry_info[:backoff_until] = Time.now.to_i + (registry_info[:failures] * backoff_interval)
    else
      registry_info[:failures] = 0
    end
  end
  true
end

#unregister_message(message) ⇒ Krakow::Connection, NilClass

Remove message metadata from registry

Parameters:

Returns:



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/krakow/distribution.rb', line 56

def unregister_message(message)
  msg_id = message.respond_to?(:message_id) ? message.message_id : message.to_s
  connection = connection_lookup(flight_record[msg_id])
  flight_record.delete(msg_id)
  if(connection)
    begin
      ident = connection.identifier
      registry_info = registry_lookup(ident)
      registry_info[:in_flight] -= 1
      calculate_ready!(ident)
      connection
    rescue Celluloid::DeadActorError
      warn 'Connection is dead. No recalculation applied on ready.'
    end
  else
    warn 'No connection associated to message via lookup. No recalculation applied on ready.'
  end
end

#watch_dog_intervalNumeric

Returns the watch_dog_interval attribute.

Returns:

  • (Numeric)

    the watch_dog_interval attribute



27
# File 'lib/krakow/distribution.rb', line 27

attribute :watch_dog_interval, Numeric, :default => 1.0

#watch_dog_interval?TrueClass, FalseClass

Returns truthiness of the watch_dog_interval attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the watch_dog_interval attribute



27
# File 'lib/krakow/distribution.rb', line 27

attribute :watch_dog_interval, Numeric, :default => 1.0