Class: Carnivore::Source Abstract

Inherits:
Object
  • Object
show all
Extended by:
Bogo::Memoization
Includes:
Utils::Failure, Utils::Failure, Utils::Logging, Zoidberg::SoftShell, Zoidberg::Supervise
Defined in:
lib/carnivore/source.rb,
lib/carnivore/source/test.rb,
lib/carnivore/spec_helper.rb,
lib/carnivore/source_container.rb

Overview

This class is abstract.

Message source

Direct Known Subclasses

Spec, Test

Defined Under Namespace

Classes: SourceContainer, Spec, Test

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utils::Failure

#execute_and_retry_forever

Methods included from Utils::Logging

#exception_log, #log

Constructor Details

#initialize(args = {}) ⇒ Source

Create new Source

Parameters:

  • args (Hash) (defaults to: {})

Options Hash (args):

  • :name (String, Symbol)

    name of source

  • :auto_process (TrueClass, FalseClass)

    start processing on initialization

  • :auto_confirm (TrueClass, FalseClass)

    confirm messages automatically on receive

  • :orphan_callback (Proc)

    execute block when no callbacks are valid for message

  • :multiple_callback (Proc)

    execute block when multiple callbacks are valid and multiple support is disabled

  • :prevent_duplicates (TrueClass, FalseClass)

    setup and use message registry

  • :allow_multiple_matches (TrueClass, FalseClass)

    allow multiple callback matches (defaults true)

  • :callbacks (Array<Callback>)

    callbacks to register on this source



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/carnivore/source.rb', line 162

def initialize(args={})
  @arguments = args.dup
  @name = args[:name]
  @args = Smash.new(args)
  @callbacks = []
  @message_loop = Queue.new
  @message_remote = Queue.new
  @callback_names = {}
  @auto_process = !!args.fetch(:auto_process, true)
  @run_process = true
  @auto_confirm = !!args[:auto_confirm]
  @callback_supervisor = Carnivore::Supervisor.create!.last
  @allow_multiple_matches = !!args.fetch(:allow_multiple_matches, true)
  [:orphan_callback, :multiple_callback].each do |key|
    if(args[key])
      unless(args[key].is_a?(Proc))
        raise TypeError.new("Expected `Proc` type for `#{key}` but received `#{args[key].class}`")
      end
      define_singleton_method(key, &args[key])
    end
  end
  if(args[:prevent_duplicates])
    init_registry
  end
  @processing = false
  @name = args[:name] || Carnivore.uuid
  if(args[:callbacks])
    args[:callbacks].each do |name, block|
      add_callback(name, block)
    end
  end
  info 'Source initialization is complete'
rescue => e
  debug "Failed to initialize: #{self} - #{e.class}: #{e}\n#{e.backtrace.join("\n")}"
  raise
end

Instance Attribute Details

#allow_multiple_matchesTrueClass, FalseClass (readonly)

Returns allow multiple callback matches.

Returns:

  • (TrueClass, FalseClass)

    allow multiple callback matches



144
145
146
# File 'lib/carnivore/source.rb', line 144

def allow_multiple_matches
  @allow_multiple_matches
end

#argumentsHash (readonly) Also known as: args

Returns original options hash.

Returns:

  • (Hash)

    original options hash



146
147
148
# File 'lib/carnivore/source.rb', line 146

def arguments
  @arguments
end

#auto_confirmTrueClass, FalseClass (readonly)

Returns auto confirm received messages.

Returns:

  • (TrueClass, FalseClass)

    auto confirm received messages



128
129
130
# File 'lib/carnivore/source.rb', line 128

def auto_confirm
  @auto_confirm
end

#auto_processTrueClass, FalseClass (readonly)

Returns start source processing on initialization.

Returns:

  • (TrueClass, FalseClass)

    start source processing on initialization



130
131
132
# File 'lib/carnivore/source.rb', line 130

def auto_process
  @auto_process
end

#callback_supervisorCarnivore::Supervisor (readonly)

Returns supervisor maintaining callback instances.

Returns:



134
135
136
# File 'lib/carnivore/source.rb', line 134

def callback_supervisor
  @callback_supervisor
end

#callbacksArray<Callback> (readonly)

Returns registered callbacks.

Returns:

  • (Array<Callback>)

    registered callbacks



126
127
128
# File 'lib/carnivore/source.rb', line 126

def callbacks
  @callbacks
end

#message_loopQueue (readonly)

Returns local loop message queue.

Returns:

  • (Queue)

    local loop message queue



138
139
140
# File 'lib/carnivore/source.rb', line 138

def message_loop
  @message_loop
end

#message_registryHash (readonly)

Returns registry of processed messages.

Returns:

  • (Hash)

    registry of processed messages



136
137
138
# File 'lib/carnivore/source.rb', line 136

def message_registry
  @message_registry
end

#message_remoteQueue (readonly)

Returns remote message queue.

Returns:

  • (Queue)

    remote message queue



140
141
142
# File 'lib/carnivore/source.rb', line 140

def message_remote
  @message_remote
end

#nameString, Symbol (readonly)

Returns name of source.

Returns:

  • (String, Symbol)

    name of source



124
125
126
# File 'lib/carnivore/source.rb', line 124

def name
  @name
end

#processingTrueClass, FalseClass (readonly)

Returns currently processing a message.

Returns:

  • (TrueClass, FalseClass)

    currently processing a message



142
143
144
# File 'lib/carnivore/source.rb', line 142

def processing
  @processing
end

#run_processTrueClass, FalseClass (readonly)

Returns message processing control switch.

Returns:

  • (TrueClass, FalseClass)

    message processing control switch



132
133
134
# File 'lib/carnivore/source.rb', line 132

def run_process
  @run_process
end

Class Method Details

.build(args = {}) ⇒ SourceContainer

Builds a source container

Parameters:

  • args (Hash) (defaults to: {})

    source configuration

Options Hash (args):

  • :type (String, Symbol)

    type of source to build

  • :args (Hash)

    configuration hash for source initialization

Returns:



21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/carnivore/source.rb', line 21

def build(args={})
  [:args, :type].each do |key|
    unless(args.has_key?(key))
      abort ArgumentError.new "Missing required parameter `:#{key}`"
    end
  end
  require Source.require_path(args[:type]) || "carnivore/source/#{args[:type]}"
  klass = args[:type].to_s.split('_').map(&:capitalize).join
  klass = Source.const_get(klass)
  args[:args][:name] ||= Carnivore.uuid
  inst = SourceContainer.new(klass, args[:args])
  register(args[:args][:name], inst)
  inst
end

.clear!NilClass

Returns Remove any registered sources.

Returns:

  • (NilClass)

    Remove any registered sources



97
98
99
# File 'lib/carnivore/source.rb', line 97

def clear!
  sources_registry.clear
end

.provide(type, require_path) ⇒ TrueClass

Register a new source type

Parameters:

  • type (Symbol)

    name of source type

  • require_path (String)

    path to require when requested

Returns:

  • (TrueClass)


55
56
57
58
# File 'lib/carnivore/source.rb', line 55

def provide(type, require_path)
  source_classes[type] = require_path
  true
end

.register(name, inst) ⇒ TrueClass

Register the container

Parameters:

Returns:

  • (TrueClass)


73
74
75
76
# File 'lib/carnivore/source.rb', line 73

def register(name, inst)
  sources_registry[name] = inst
  true
end

.require_path(type) ⇒ String, NilClass

Registered path for given source type

Parameters:

  • type (String, Symbol)

    name of source type

Returns:

  • (String, NilClass)


64
65
66
# File 'lib/carnivore/source.rb', line 64

def require_path(type)
  source_classes[type]
end

.reset_comms!Object

Reset communication methods within class



102
103
104
105
106
107
108
109
110
111
112
# File 'lib/carnivore/source.rb', line 102

def reset_comms!
  self.class_eval do
    unless(method_defined?(:reset_communications?))
      alias_method :custom_transmit, :transmit
      alias_method :transmit, :_transmit
      def reset_communications?
        true
      end
    end
  end
end

.source(name) ⇒ SourceContainer

Source container with given name

Parameters:

  • name (String, Symbol)

    name of source

Returns:



82
83
84
85
86
87
88
89
# File 'lib/carnivore/source.rb', line 82

def source(name)
  if(sources_registry[name])
    sources_registry[name]
  else
    Carnivore::Logger.error "Source lookup failed (name: #{name})"
    abort KeyError.new("Requested named source is not registered: #{name}")
  end
end

.source_classesSmash

Returns Source class information.

Returns:

  • (Smash)

    Source class information



37
38
39
40
41
# File 'lib/carnivore/source.rb', line 37

def source_classes
  memoize(:source_classes, :global) do
    Smash.new
  end
end

.sourcesArray<SourceContainer>

Returns registered source containers.

Returns:



92
93
94
# File 'lib/carnivore/source.rb', line 92

def sources
  sources_registry.values
end

.sources_registrySmash

Returns Registered source information.

Returns:

  • (Smash)

    Registered source information



44
45
46
47
48
# File 'lib/carnivore/source.rb', line 44

def sources_registry
  memoize(:sources, :global) do
    Smash.new
  end
end

Instance Method Details

#_transmit(*args) ⇒ TrueClass, FalseClass

Send to local loop if processing otherwise use regular transmit

Parameters:

  • args (Object)

    argument list

Returns:

  • (TrueClass, FalseClass)


496
497
498
499
500
501
502
503
504
505
506
507
508
# File 'lib/carnivore/source.rb', line 496

def _transmit(*args)
  begin
    if(loop_enabled? && processing)
      loop_transmit(*args)
    else
      custom_transmit(*args)
    end
    true
  rescue EncodingError => e
    error "Transmission failed due to encoding error! Error: #{e.class} - #{e} [(#{args.map(&:to_s).join(')(')})]"
    false
  end
end

#add_callback(callback_name, block_or_class) ⇒ self

Adds the given callback to the source for message processing

Parameters:

  • callback_name (String, Symbol)

    name of callback

  • block_or_class (Carnivore::Callback, Proc)

Returns:

  • (self)


322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/carnivore/source.rb', line 322

def add_callback(callback_name, block_or_class)
  name = "#{self.name}:#{callback_name}"
  if(block_or_class.is_a?(Class))
    size = block_or_class.workers || 1
    if(size < 1)
      warn "Callback class (#{block_or_class}) defined no workers. Skipping."
      return self
    elsif(size == 1)
      debug "Adding callback class (#{block_or_class}) under supervision. Name: #{callback_name(name)}"
      callback_supervisor.supervise_as callback_name(name), block_or_class, name, current_actor
    else
      debug "Adding callback class (#{block_or_class}) under supervision pool (#{size} workers). Name: #{callback_name(name)}"
      callback_supervisor.pool block_or_class, as: callback_name(name), size: size, args: [name, current_actor]
    end
  else
    debug "Adding custom callback class  from block (#{block_or_class}) under supervision. Name: #{callback_name(name)}"
    callback_supervisor.supervise_as callback_name(name), Callback, name, current_actor, block_or_class
  end
  callbacks.push(name).uniq!
  self
end

#auto_confirm?TrueClass, FalseClass

Returns automatic message confirmation enabled.

Returns:

  • (TrueClass, FalseClass)

    automatic message confirmation enabled



255
256
257
# File 'lib/carnivore/source.rb', line 255

def auto_confirm?
  @auto_confirm
end

#auto_process?TrueClass, FalseClass

Returns auto processing enabled.

Returns:

  • (TrueClass, FalseClass)

    auto processing enabled



235
236
237
# File 'lib/carnivore/source.rb', line 235

def auto_process?
  auto_process && !callbacks.empty?
end

#callback_name(name) ⇒ Carnivore::Callback, NilClass

Returns namespaced name (prefixed with source name and instance id)

Parameters:

  • name (String, Symbol)

    name of callback

Returns:



361
362
363
364
365
366
# File 'lib/carnivore/source.rb', line 361

def callback_name(name)
  unless(@callback_names[name])
    @callback_names[name] = [@name, self.object_id, name].join(':').to_sym
  end
  @callback_names[name]
end

#confirm(message) ⇒ Object

Confirm receipt of the message on source

Parameters:



313
314
315
# File 'lib/carnivore/source.rb', line 313

def confirm(message)
  debug 'No custom confirm declared'
end

#connectObject

Connection hook for sources requiring customized connect

Parameters:

  • args (Hash)

    initialization hash



279
280
281
# File 'lib/carnivore/source.rb', line 279

def connect
  debug 'No custom connect declared'
end

#format(msg) ⇒ Carnivore::Message

Create new Message from received payload

Parameters:

  • msg (Object)

    received payload

Returns:



372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/carnivore/source.rb', line 372

def format(msg)
  actor = Carnivore::Supervisor.supervisor[name]
  if(actor)
    if(msg.is_a?(Hash) && msg.keys.map(&:to_s).sort == ['content', 'raw'])
      Message.new(
        :message => msg[:raw],
        :content => msg[:content],
        :source => actor.current_actor
      )
    else
      Message.new(
        :message => msg,
        :source => actor.current_actor
      )
    end
  else
    abort "Failed to locate self in registry (#{name})"
  end
end

#init_registryMessageRegistry

Load and initialize the message registry

Returns:

  • (MessageRegistry)

    new registry



530
531
532
533
# File 'lib/carnivore/source.rb', line 530

def init_registry
  require 'carnivore/message_registry'
  @message_registry = MessageRegistry.new
end

#inspectString

Returns inspection formatted string.

Returns:

  • (String)

    inspection formatted string



260
261
262
# File 'lib/carnivore/source.rb', line 260

def inspect
  "<#{self.class.name}:#{object_id} @name=#{name} @callbacks=#{Hash[*callbacks.map{|k,v| [k,v.object_id]}.flatten]}>"
end

#loop_enabled?TrueClass, FalseClass

Local message loopback is enabled. Custom sources should override this method to allow loopback delivery if desired

Returns:

  • (TrueClass, FalseClass)


514
515
516
# File 'lib/carnivore/source.rb', line 514

def loop_enabled?
  false
end

#loop_receive(*args) ⇒ Carnivore::Message, NilClass

Get received message on local loopback

Parameters:

  • args (Object)

    argument list (unused)

Returns:



476
477
478
# File 'lib/carnivore/source.rb', line 476

def loop_receive(*args)
  message_loop.shift
end

#loop_transmit(message, original_message = nil, args = {}) ⇒ TrueClass

Push message onto internal loop queue

Parameters:

  • message (Carnivore::Message)
  • original_message (Object) (defaults to: nil)

    unused

  • args (Hash) (defaults to: {})

    unused

Returns:

  • (TrueClass)


486
487
488
489
490
# File 'lib/carnivore/source.rb', line 486

def loop_transmit(message, original_message=nil, args={})
  message_loop.push message
  signal(:messages_available)
  true
end

#multiple_callbacks?TrueClass, FalseClass

Allow sending payload to multiple matching callbacks. Custom sources should override this method to disable multiple callback matches if desired.

Returns:

  • (TrueClass, FalseClass)


523
524
525
# File 'lib/carnivore/source.rb', line 523

def multiple_callbacks?
  allow_multiple_matches
end

#process(*args) ⇒ TrueClass

Process incoming messages from this source

Parameters:

  • args (Object)

    list of arguments

Returns:

  • (TrueClass)


415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
# File 'lib/carnivore/source.rb', line 415

def process(*args)
  unless(processing)
    begin
      @processing = true
      while(run_process && !callbacks.empty?)
        # if(message_loop.empty? && message_remote.empty?)
        #   wait(:messages_available)
        # end
        msgs = receive
        # msgs.push message_loop.pop unless message_loop.empty?
        # msgs.push message_remote.pop unless message_remote.empty?
        msgs = [msgs].flatten.compact.map do |m|
          if(valid_message?(m))
            format(m)
          end
        end.compact
        msgs.each do |msg|
          if(multiple_callbacks? || respond_to?(:orphan_callback))
            valid_callbacks = callbacks.find_all do |name|
              defer{ callback_supervisor[callback_name(name)].valid?(msg) }
            end
          else
            valid_callbacks = callbacks
          end
          if(valid_callbacks.empty?)
            warn "Received message was not processed through any callbacks on this source: #{msg}"
            orphan_callback(msg) if respond_to?(:orphan_callback)
          elsif(valid_callbacks.size > 1 && !multiple_callbacks?)
            error "Received message is valid for multiple callbacks but multiple callbacks are disabled: #{msg}"
            multiple_callback(msg) if respond_to?(:multiple_callback)
          else
            valid_callbacks.each do |name|
              debug "Dispatching message<#{msg[:message].object_id}> to callback<#{name} (#{callback_name(name)})>"
              callback_supervisor[callback_name(name)].async(:locked).call(msg)
            end
          end
        end
      end
    ensure
      @processing = false
    end
    true
  else
    false
  end
end

#receive(n = 1) ⇒ Object+

This method is abstract.

Receive messages from source

Parameters:

  • n (Integer) (defaults to: 1)

    number of messages

Returns:

  • (Object, Array<Object>)

    payload or array of payloads

Raises:

  • (NotImplementedError)


288
289
290
# File 'lib/carnivore/source.rb', line 288

def receive(n=1)
  raise NotImplementedError.new('Abstract method not valid for runtime')
end

#receive_messagesTrueClass

Receive messages from source

Returns:

  • (TrueClass)


464
465
466
467
468
469
470
# File 'lib/carnivore/source.rb', line 464

def receive_messages
  loop do
    message_remote.push receive
    signal(:messages_available)
  end
  true
end

#remove_callback(name) ⇒ self

Remove the named callback from the source

Parameters:

  • name (String, Symbol)

Returns:

  • (self)


348
349
350
351
352
353
354
355
# File 'lib/carnivore/source.rb', line 348

def remove_callback(name)
  unless(@callbacks.include?(callback_name(name)))
    abort NameError.new("Failed to locate callback named: #{name}")
  end
  actors[callback_name(name)].terminate
  @callbacks.delete(name)
  self
end

#restarted!Object

Fully restore the source if it is restarted



200
201
202
203
204
# File 'lib/carnivore/source.rb', line 200

def restarted!
  run_setup
  run_connect
  start!
end

#run_connectObject

Run the source connect action



214
215
216
217
218
# File 'lib/carnivore/source.rb', line 214

def run_connect
  execute_and_retry_forever(:connect) do
    connect
  end
end

#run_setupObject

Run the source setup action



207
208
209
210
211
# File 'lib/carnivore/source.rb', line 207

def run_setup
  execute_and_retry_forever(:setup) do
    setup(args)
  end
end

#setup(args = {}) ⇒ Object

Setup hook for source requiring customized setup

Parameters:

  • args (Hash) (defaults to: {})

    initialization hash



272
273
274
# File 'lib/carnivore/source.rb', line 272

def setup(args={})
  debug 'No custom setup declared'
end

#start!TrueClass, FalseClass

Start source if auto_process is enabled

Returns:

  • (TrueClass, FalseClass)


223
224
225
226
227
228
229
230
231
232
# File 'lib/carnivore/source.rb', line 223

def start!
  if(auto_process?)
    info 'Message processing started via auto start'
    async(:locked).process
    true
  else
    warn 'Message processing is disabled via auto start'
    false
  end
end

#terminateObject

Ensure we cleanup our internal supervisor before bailing out



240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/carnivore/source.rb', line 240

def terminate
  warn 'Termination request received. Tearing down!'
  if(callback_supervisor && callback_supervisor.alive?)
    begin
      warn "Tearing down callback supervisor! (#{callback_supervisor})"
      callback_supervisor.terminate
    rescue Zoidberg::DeadException
      warn 'Terminated task error during callback supervisor teardown. Moving on.'
    end
  else
    warn 'Callback supervisor is not alive. No teardown issued'
  end
end

#to_sString

Returns stringified instance.

Returns:

  • (String)

    stringified instance



265
266
267
# File 'lib/carnivore/source.rb', line 265

def to_s
  "<#{self.class.name}:#{object_id} @name=#{name}>"
end

#touch(message) ⇒ TrueClass, FalseClass

Touch message to reset timeout

Parameters:

Returns:

  • (TrueClass, FalseClass)


305
306
307
308
# File 'lib/carnivore/source.rb', line 305

def touch(message)
  debug 'Source#touch was not implemented for this source!'
  true
end

#transmit(message, original_message = nil, args = {}) ⇒ Object

Send payload to source

Parameters:

  • message (Object)

    payload

  • original_message (Carnviore::Message) (defaults to: nil)

    original message if reply to extract optional metadata

  • args (Hash) (defaults to: {})

    optional extra arguments

Raises:

  • (NotImplemented)


297
298
299
# File 'lib/carnivore/source.rb', line 297

def transmit(message, original_message=nil, args={})
  raise NotImplemented.new('Abstract method not valid for runtime')
end

#valid_message?(m) ⇒ TrueClass, FalseClass

Validate message is allowed before processing. This is currently only used when the message registry is enabled to prevent duplicate message processing.

Parameters:

Returns:

  • (TrueClass, FalseClass)


398
399
400
401
402
403
404
405
406
407
408
409
# File 'lib/carnivore/source.rb', line 398

def valid_message?(m)
  if(message_registry)
    if(message_registry.valid?(m))
      true
    else
      warn "Message was already received. Discarding: #{m.inspect}"
      false
    end
  else
    true
  end
end