Class: Carnivore::Source Abstract

Inherits:
Object
  • Object
show all
Extended by:
Bogo::Memoization
Includes:
Utils::Failure, Utils::Failure, Utils::Logging, Zoidberg::SoftShell, Zoidberg::Supervise
Defined in:
lib/carnivore/source.rb,
lib/carnivore/source/test.rb,
lib/carnivore/spec_helper.rb,
lib/carnivore/source_container.rb

Overview

This class is abstract.

Message source

Direct Known Subclasses

Spec, Test

Defined Under Namespace

Classes: SourceContainer, Spec, Test

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utils::Failure

#execute_and_retry_forever

Methods included from Utils::Logging

#exception_log, #log

Constructor Details

#initialize(args = {}) ⇒ Source

Create new Source

Parameters:

  • args (Hash) (defaults to: {})

Options Hash (args):

  • :name (String, Symbol)

    name of source

  • :auto_process (TrueClass, FalseClass)

    start processing on initialization

  • :auto_confirm (TrueClass, FalseClass)

    confirm messages automatically on receive

  • :orphan_callback (Proc)

    execute block when no callbacks are valid for message

  • :multiple_callback (Proc)

    execute block when multiple callbacks are valid and multiple support is disabled

  • :prevent_duplicates (TrueClass, FalseClass)

    setup and use message registry

  • :allow_multiple_matches (TrueClass, FalseClass)

    allow multiple callback matches (defaults true)

  • :callbacks (Array<Callback>)

    callbacks to register on this source



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/carnivore/source.rb', line 164

def initialize(args={})
  @arguments = args.dup
  @name = args[:name]
  @args = Smash.new(args)
  @callbacks = []
  @message_loop = Queue.new
  @message_remote = Queue.new
  @callback_names = {}
  @auto_process = !!args.fetch(:auto_process, true)
  @run_process = true
  @auto_confirm = !!args[:auto_confirm]
  @callback_supervisor = Carnivore::Supervisor.create!.last
  @allow_multiple_matches = !!args.fetch(:allow_multiple_matches, true)
  [:orphan_callback, :multiple_callback].each do |key|
    if(args[key])
      unless(args[key].is_a?(Proc))
        raise TypeError.new("Expected `Proc` type for `#{key}` but received `#{args[key].class}`")
      end
      define_singleton_method(key, &args[key])
    end
  end
  if(args[:prevent_duplicates])
    init_registry
  end
  @processing = false
  @name = args[:name] || Carnivore.uuid
  if(args[:callbacks])
    args[:callbacks].each do |name, block|
      add_callback(name, block)
    end
  end
  info 'Source initialization is complete'
rescue => e
  debug "Failed to initialize: #{self} - #{e.class}: #{e}\n#{e.backtrace.join("\n")}"
  raise
end

Instance Attribute Details

#allow_multiple_matchesTrueClass, FalseClass (readonly)

Returns allow multiple callback matches.

Returns:

  • (TrueClass, FalseClass)

    allow multiple callback matches



146
147
148
# File 'lib/carnivore/source.rb', line 146

def allow_multiple_matches
  @allow_multiple_matches
end

#argumentsHash (readonly) Also known as: args

Returns original options hash.

Returns:

  • (Hash)

    original options hash



148
149
150
# File 'lib/carnivore/source.rb', line 148

def arguments
  @arguments
end

#auto_confirmTrueClass, FalseClass (readonly)

Returns auto confirm received messages.

Returns:

  • (TrueClass, FalseClass)

    auto confirm received messages



130
131
132
# File 'lib/carnivore/source.rb', line 130

def auto_confirm
  @auto_confirm
end

#auto_processTrueClass, FalseClass (readonly)

Returns start source processing on initialization.

Returns:

  • (TrueClass, FalseClass)

    start source processing on initialization



132
133
134
# File 'lib/carnivore/source.rb', line 132

def auto_process
  @auto_process
end

#callback_supervisorCarnivore::Supervisor (readonly)

Returns supervisor maintaining callback instances.

Returns:



136
137
138
# File 'lib/carnivore/source.rb', line 136

def callback_supervisor
  @callback_supervisor
end

#callbacksArray<Callback> (readonly)

Returns registered callbacks.

Returns:

  • (Array<Callback>)

    registered callbacks



128
129
130
# File 'lib/carnivore/source.rb', line 128

def callbacks
  @callbacks
end

#message_loopQueue (readonly)

Returns local loop message queue.

Returns:

  • (Queue)

    local loop message queue



140
141
142
# File 'lib/carnivore/source.rb', line 140

def message_loop
  @message_loop
end

#message_registryHash (readonly)

Returns registry of processed messages.

Returns:

  • (Hash)

    registry of processed messages



138
139
140
# File 'lib/carnivore/source.rb', line 138

def message_registry
  @message_registry
end

#message_remoteQueue (readonly)

Returns remote message queue.

Returns:

  • (Queue)

    remote message queue



142
143
144
# File 'lib/carnivore/source.rb', line 142

def message_remote
  @message_remote
end

#nameString, Symbol (readonly)

Returns name of source.

Returns:

  • (String, Symbol)

    name of source



126
127
128
# File 'lib/carnivore/source.rb', line 126

def name
  @name
end

#processingTrueClass, FalseClass (readonly)

Returns currently processing a message.

Returns:

  • (TrueClass, FalseClass)

    currently processing a message



144
145
146
# File 'lib/carnivore/source.rb', line 144

def processing
  @processing
end

#run_processTrueClass, FalseClass (readonly)

Returns message processing control switch.

Returns:

  • (TrueClass, FalseClass)

    message processing control switch



134
135
136
# File 'lib/carnivore/source.rb', line 134

def run_process
  @run_process
end

Class Method Details

.build(args = {}) ⇒ SourceContainer

Builds a source container

Parameters:

  • args (Hash) (defaults to: {})

    source configuration

Options Hash (args):

  • :type (String, Symbol)

    type of source to build

  • :args (Hash)

    configuration hash for source initialization

Returns:



21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/carnivore/source.rb', line 21

def build(args={})
  [:args, :type].each do |key|
    unless(args.has_key?(key))
      abort ArgumentError.new "Missing required parameter `:#{key}`"
    end
  end
  require Source.require_path(args[:type]) || "carnivore/source/#{args[:type]}"
  klass = args[:type].to_s.split('_').map(&:capitalize).join
  klass = Source.const_get(klass)
  args[:args][:name] ||= Carnivore.uuid
  inst = SourceContainer.new(klass, args[:args])
  register(args[:args][:name], inst)
  inst
end

.clear!NilClass

Returns Remove any registered sources.

Returns:

  • (NilClass)

    Remove any registered sources



97
98
99
# File 'lib/carnivore/source.rb', line 97

def clear!
  sources_registry.clear
end

.provide(type, require_path) ⇒ TrueClass

Register a new source type

Parameters:

  • type (Symbol)

    name of source type

  • require_path (String)

    path to require when requested

Returns:

  • (TrueClass)


55
56
57
58
# File 'lib/carnivore/source.rb', line 55

def provide(type, require_path)
  source_classes[type] = require_path
  true
end

.register(name, inst) ⇒ TrueClass

Register the container

Parameters:

Returns:

  • (TrueClass)


73
74
75
76
# File 'lib/carnivore/source.rb', line 73

def register(name, inst)
  sources_registry[name] = inst
  true
end

.require_path(type) ⇒ String, NilClass

Registered path for given source type

Parameters:

  • type (String, Symbol)

    name of source type

Returns:

  • (String, NilClass)


64
65
66
# File 'lib/carnivore/source.rb', line 64

def require_path(type)
  source_classes[type]
end

.reset_comms!Object

Reset communication methods within class



102
103
104
105
106
107
108
109
110
111
112
# File 'lib/carnivore/source.rb', line 102

def reset_comms!
  self.class_eval do
    unless(method_defined?(:reset_communications?))
      alias_method :custom_transmit, :transmit
      alias_method :transmit, :_transmit
      def reset_communications?
        true
      end
    end
  end
end

.source(name) ⇒ SourceContainer

Source container with given name

Parameters:

  • name (String, Symbol)

    name of source

Returns:



82
83
84
85
86
87
88
89
# File 'lib/carnivore/source.rb', line 82

def source(name)
  if(sources_registry[name])
    sources_registry[name]
  else
    Carnivore::Logger.error "Source lookup failed (name: #{name})"
    abort KeyError.new("Requested named source is not registered: #{name}")
  end
end

.source_classesSmash

Returns Source class information.

Returns:

  • (Smash)

    Source class information



37
38
39
40
41
# File 'lib/carnivore/source.rb', line 37

def source_classes
  memoize(:source_classes, :global) do
    Smash.new
  end
end

.sourcesArray<SourceContainer>

Returns registered source containers.

Returns:



92
93
94
# File 'lib/carnivore/source.rb', line 92

def sources
  sources_registry.values
end

.sources_registrySmash

Returns Registered source information.

Returns:

  • (Smash)

    Registered source information



44
45
46
47
48
# File 'lib/carnivore/source.rb', line 44

def sources_registry
  memoize(:sources, :global) do
    Smash.new
  end
end

Instance Method Details

#_transmit(*args) ⇒ TrueClass, FalseClass

Send to local loop if processing otherwise use regular transmit

Parameters:

  • args (Object)

    argument list

Returns:

  • (TrueClass, FalseClass)


498
499
500
501
502
503
504
505
506
507
508
509
510
# File 'lib/carnivore/source.rb', line 498

def _transmit(*args)
  begin
    if(loop_enabled? && processing)
      loop_transmit(*args)
    else
      custom_transmit(*args)
    end
    true
  rescue EncodingError => e
    error "Transmission failed due to encoding error! Error: #{e.class} - #{e} [(#{args.map(&:to_s).join(')(')})]"
    false
  end
end

#add_callback(callback_name, block_or_class) ⇒ self

Adds the given callback to the source for message processing

Parameters:

  • callback_name (String, Symbol)

    name of callback

  • block_or_class (Carnivore::Callback, Proc)

Returns:

  • (self)


324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/carnivore/source.rb', line 324

def add_callback(callback_name, block_or_class)
  name = "#{self.name}:#{callback_name}"
  if(block_or_class.is_a?(Class))
    size = block_or_class.workers || 1
    if(size < 1)
      warn "Callback class (#{block_or_class}) defined no workers. Skipping."
      return self
    elsif(size == 1)
      debug "Adding callback class (#{block_or_class}) under supervision. Name: #{callback_name(name)}"
      callback_supervisor.supervise_as callback_name(name), block_or_class, name, current_actor
    else
      debug "Adding callback class (#{block_or_class}) under supervision pool (#{size} workers). Name: #{callback_name(name)}"
      callback_supervisor.pool block_or_class, as: callback_name(name), size: size, args: [name, current_actor]
    end
  else
    debug "Adding custom callback class  from block (#{block_or_class}) under supervision. Name: #{callback_name(name)}"
    callback_supervisor.supervise_as callback_name(name), Callback, name, current_actor, block_or_class
  end
  callbacks.push(name).uniq!
  self
end

#auto_confirm?TrueClass, FalseClass

Returns automatic message confirmation enabled.

Returns:

  • (TrueClass, FalseClass)

    automatic message confirmation enabled



257
258
259
# File 'lib/carnivore/source.rb', line 257

def auto_confirm?
  @auto_confirm
end

#auto_process?TrueClass, FalseClass

Returns auto processing enabled.

Returns:

  • (TrueClass, FalseClass)

    auto processing enabled



237
238
239
# File 'lib/carnivore/source.rb', line 237

def auto_process?
  auto_process && !callbacks.empty?
end

#callback_name(name) ⇒ Carnivore::Callback, NilClass

Returns namespaced name (prefixed with source name and instance id)

Parameters:

  • name (String, Symbol)

    name of callback

Returns:



363
364
365
366
367
368
# File 'lib/carnivore/source.rb', line 363

def callback_name(name)
  unless(@callback_names[name])
    @callback_names[name] = [@name, self.object_id, name].join(':').to_sym
  end
  @callback_names[name]
end

#confirm(message) ⇒ Object

Confirm receipt of the message on source

Parameters:



315
316
317
# File 'lib/carnivore/source.rb', line 315

def confirm(message)
  debug 'No custom confirm declared'
end

#connectObject

Connection hook for sources requiring customized connect

Parameters:

  • args (Hash)

    initialization hash



281
282
283
# File 'lib/carnivore/source.rb', line 281

def connect
  debug 'No custom connect declared'
end

#format(msg) ⇒ Carnivore::Message

Create new Message from received payload

Parameters:

  • msg (Object)

    received payload

Returns:



374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
# File 'lib/carnivore/source.rb', line 374

def format(msg)
  actor = Carnivore::Supervisor.supervisor[name]
  if(actor)
    if(msg.is_a?(Hash) && msg.keys.map(&:to_s).sort == ['content', 'raw'])
      Message.new(
        :message => msg[:raw],
        :content => msg[:content],
        :source => actor.current_actor
      )
    else
      Message.new(
        :message => msg,
        :source => actor.current_actor
      )
    end
  else
    abort "Failed to locate self in registry (#{name})"
  end
end

#init_registryMessageRegistry

Load and initialize the message registry

Returns:

  • (MessageRegistry)

    new registry



532
533
534
535
# File 'lib/carnivore/source.rb', line 532

def init_registry
  require 'carnivore/message_registry'
  @message_registry = MessageRegistry.new
end

#inspectString

Returns inspection formatted string.

Returns:

  • (String)

    inspection formatted string



262
263
264
# File 'lib/carnivore/source.rb', line 262

def inspect
  "<#{self.class.name}:#{object_id} @name=#{name} @callbacks=#{Hash[*callbacks.map{|k,v| [k,v.object_id]}.flatten]}>"
end

#loop_enabled?TrueClass, FalseClass

Local message loopback is enabled. Custom sources should override this method to allow loopback delivery if desired

Returns:

  • (TrueClass, FalseClass)


516
517
518
# File 'lib/carnivore/source.rb', line 516

def loop_enabled?
  false
end

#loop_receive(*args) ⇒ Carnivore::Message, NilClass

Get received message on local loopback

Parameters:

  • args (Object)

    argument list (unused)

Returns:



478
479
480
# File 'lib/carnivore/source.rb', line 478

def loop_receive(*args)
  message_loop.shift
end

#loop_transmit(message, original_message = nil, args = {}) ⇒ TrueClass

Push message onto internal loop queue

Parameters:

  • message (Carnivore::Message)
  • original_message (Object) (defaults to: nil)

    unused

  • args (Hash) (defaults to: {})

    unused

Returns:

  • (TrueClass)


488
489
490
491
492
# File 'lib/carnivore/source.rb', line 488

def loop_transmit(message, original_message=nil, args={})
  message_loop.push message
  signal(:messages_available)
  true
end

#multiple_callbacks?TrueClass, FalseClass

Allow sending payload to multiple matching callbacks. Custom sources should override this method to disable multiple callback matches if desired.

Returns:

  • (TrueClass, FalseClass)


525
526
527
# File 'lib/carnivore/source.rb', line 525

def multiple_callbacks?
  allow_multiple_matches
end

#process(*args) ⇒ TrueClass

Process incoming messages from this source

Parameters:

  • args (Object)

    list of arguments

Returns:

  • (TrueClass)


417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
# File 'lib/carnivore/source.rb', line 417

def process(*args)
  unless(processing)
    begin
      @processing = true
      while(run_process && !callbacks.empty?)
        # if(message_loop.empty? && message_remote.empty?)
        #   wait(:messages_available)
        # end
        msgs = receive
        # msgs.push message_loop.pop unless message_loop.empty?
        # msgs.push message_remote.pop unless message_remote.empty?
        msgs = [msgs].flatten.compact.map do |m|
          if(valid_message?(m))
            format(m)
          end
        end.compact
        msgs.each do |msg|
          if(multiple_callbacks? || respond_to?(:orphan_callback))
            valid_callbacks = callbacks.find_all do |name|
              callback_supervisor[callback_name(name)].valid?(msg)
            end
          else
            valid_callbacks = callbacks
          end
          if(valid_callbacks.empty?)
            warn "Received message was not processed through any callbacks on this source: #{msg}"
            orphan_callback(msg) if respond_to?(:orphan_callback)
          elsif(valid_callbacks.size > 1 && !multiple_callbacks?)
            error "Received message is valid for multiple callbacks but multiple callbacks are disabled: #{msg}"
            multiple_callback(msg) if respond_to?(:multiple_callback)
          else
            valid_callbacks.each do |name|
              debug "Dispatching message<#{msg[:message].object_id}> to callback<#{name} (#{callback_name(name)})>"
              callback_supervisor[callback_name(name)].async.call(msg)
            end
          end
        end
      end
    ensure
      @processing = false
    end
    true
  else
    false
  end
end

#receive(n = 1) ⇒ Object+

This method is abstract.

Receive messages from source

Parameters:

  • n (Integer) (defaults to: 1)

    number of messages

Returns:

  • (Object, Array<Object>)

    payload or array of payloads

Raises:

  • (NotImplementedError)


290
291
292
# File 'lib/carnivore/source.rb', line 290

def receive(n=1)
  raise NotImplementedError.new('Abstract method not valid for runtime')
end

#receive_messagesTrueClass

Receive messages from source

Returns:

  • (TrueClass)


466
467
468
469
470
471
472
# File 'lib/carnivore/source.rb', line 466

def receive_messages
  loop do
    message_remote.push receive
    signal(:messages_available)
  end
  true
end

#remove_callback(name) ⇒ self

Remove the named callback from the source

Parameters:

  • name (String, Symbol)

Returns:

  • (self)


350
351
352
353
354
355
356
357
# File 'lib/carnivore/source.rb', line 350

def remove_callback(name)
  unless(@callbacks.include?(callback_name(name)))
    abort NameError.new("Failed to locate callback named: #{name}")
  end
  actors[callback_name(name)].terminate
  @callbacks.delete(name)
  self
end

#restarted!Object

Fully restore the source if it is restarted



202
203
204
205
206
# File 'lib/carnivore/source.rb', line 202

def restarted!
  run_setup
  run_connect
  start!
end

#run_connectObject

Run the source connect action



216
217
218
219
220
# File 'lib/carnivore/source.rb', line 216

def run_connect
  execute_and_retry_forever(:connect) do
    connect
  end
end

#run_setupObject

Run the source setup action



209
210
211
212
213
# File 'lib/carnivore/source.rb', line 209

def run_setup
  execute_and_retry_forever(:setup) do
    setup(args)
  end
end

#setup(args = {}) ⇒ Object

Setup hook for source requiring customized setup

Parameters:

  • args (Hash) (defaults to: {})

    initialization hash



274
275
276
# File 'lib/carnivore/source.rb', line 274

def setup(args={})
  debug 'No custom setup declared'
end

#start!TrueClass, FalseClass

Start source if auto_process is enabled

Returns:

  • (TrueClass, FalseClass)


225
226
227
228
229
230
231
232
233
234
# File 'lib/carnivore/source.rb', line 225

def start!
  if(auto_process?)
    info 'Message processing started via auto start'
    async.process
    true
  else
    warn 'Message processing is disabled via auto start'
    false
  end
end

#terminateObject

Ensure we cleanup our internal supervisor before bailing out



242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/carnivore/source.rb', line 242

def terminate
  warn 'Termination request received. Tearing down!'
  if(callback_supervisor && callback_supervisor.alive?)
    begin
      warn "Tearing down callback supervisor! (#{callback_supervisor})"
      callback_supervisor.terminate
    rescue Zoidberg::DeadException
      warn 'Terminated task error during callback supervisor teardown. Moving on.'
    end
  else
    warn 'Callback supervisor is not alive. No teardown issued'
  end
end

#to_sString

Returns stringified instance.

Returns:

  • (String)

    stringified instance



267
268
269
# File 'lib/carnivore/source.rb', line 267

def to_s
  "<#{self.class.name}:#{object_id} @name=#{name}>"
end

#touch(message) ⇒ TrueClass, FalseClass

Touch message to reset timeout

Parameters:

Returns:

  • (TrueClass, FalseClass)


307
308
309
310
# File 'lib/carnivore/source.rb', line 307

def touch(message)
  warn 'Source#touch was not implemented for this source!'
  true
end

#transmit(message, original_message = nil, args = {}) ⇒ Object

Send payload to source

Parameters:

  • message (Object)

    payload

  • original_message (Carnviore::Message) (defaults to: nil)

    original message if reply to extract optional metadata

  • args (Hash) (defaults to: {})

    optional extra arguments

Raises:

  • (NotImplemented)


299
300
301
# File 'lib/carnivore/source.rb', line 299

def transmit(message, original_message=nil, args={})
  raise NotImplemented.new('Abstract method not valid for runtime')
end

#valid_message?(m) ⇒ TrueClass, FalseClass

Validate message is allowed before processing. This is currently only used when the message registry is enabled to prevent duplicate message processing.

Parameters:

Returns:

  • (TrueClass, FalseClass)


400
401
402
403
404
405
406
407
408
409
410
411
# File 'lib/carnivore/source.rb', line 400

def valid_message?(m)
  if(message_registry)
    if(message_registry.valid?(m))
      true
    else
      warn "Message was already received. Discarding: #{m.inspect}"
      false
    end
  else
    true
  end
end