Class: Carnivore::Source Abstract

Inherits:
Object
  • Object
show all
Extended by:
Bogo::Memoization
Includes:
Utils::Failure, Utils::Failure, Utils::Logging, Celluloid
Defined in:
lib/carnivore/source.rb,
lib/carnivore/source/test.rb,
lib/carnivore/spec_helper.rb,
lib/carnivore/source_container.rb

Overview

This class is abstract.

Message source

Direct Known Subclasses

Spec, Test

Defined Under Namespace

Classes: SourceContainer, Spec, Test

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utils::Failure

#execute_and_retry_forever

Methods included from Utils::Logging

#exception_log, #log

Constructor Details

#initialize(args = {}) ⇒ Source

Create new Source

Parameters:

  • args (Hash) (defaults to: {})

Options Hash (args):

  • :name (String, Symbol)

    name of source

  • :auto_process (TrueClass, FalseClass)

    start processing on initialization

  • :auto_confirm (TrueClass, FalseClass)

    confirm messages automatically on receive

  • :orphan_callback (Proc)

    execute block when no callbacks are valid for message

  • :multiple_callback (Proc)

    execute block when multiple callbacks are valid and multiple support is disabled

  • :prevent_duplicates (TrueClass, FalseClass)

    setup and use message registry

  • :allow_multiple_matches (TrueClass, FalseClass)

    allow multiple callback matches (defaults true)

  • :callbacks (Array<Callback>)

    callbacks to register on this source



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/carnivore/source.rb', line 163

def initialize(args={})
  @arguments = args.dup
  @name = args[:name]
  @args = Smash.new(args)
  @callbacks = []
  @message_loop = Queue.new
  @message_remote = Queue.new
  @callback_names = {}
  @auto_process = !!args.fetch(:auto_process, true)
  @run_process = true
  @auto_confirm = !!args[:auto_confirm]
  @callback_supervisor = Carnivore::Supervisor.create!.last
  @allow_multiple_matches = !!args.fetch(:allow_multiple_matches, true)
  [:orphan_callback, :multiple_callback].each do |key|
    if(args[key])
      unless(args[key].is_a?(Proc))
        raise TypeError.new("Expected `Proc` type for `#{key}` but received `#{args[key].class}`")
      end
      define_singleton_method(key, &args[key])
    end
  end
  if(args[:prevent_duplicates])
    init_registry
  end
  @processing = false
  @name = args[:name] || Celluloid.uuid
  if(args[:callbacks])
    args[:callbacks].each do |name, block|
      add_callback(name, block)
    end
  end
  execute_and_retry_forever(:setup) do
    setup(args)
  end
  execute_and_retry_forever(:connect) do
    connect
  end
  info 'Source initialization is complete'
rescue => e
  debug "Failed to initialize: #{self} - #{e.class}: #{e}\n#{e.backtrace.join("\n")}"
  raise
end

Instance Attribute Details

#allow_multiple_matchesTrueClass, FalseClass (readonly)

Returns allow multiple callback matches.

Returns:

  • (TrueClass, FalseClass)

    allow multiple callback matches



145
146
147
# File 'lib/carnivore/source.rb', line 145

def allow_multiple_matches
  @allow_multiple_matches
end

#argumentsHash (readonly) Also known as: args

Returns original options hash.

Returns:

  • (Hash)

    original options hash



147
148
149
# File 'lib/carnivore/source.rb', line 147

def arguments
  @arguments
end

#auto_confirmTrueClass, FalseClass (readonly)

Returns auto confirm received messages.

Returns:

  • (TrueClass, FalseClass)

    auto confirm received messages



129
130
131
# File 'lib/carnivore/source.rb', line 129

def auto_confirm
  @auto_confirm
end

#auto_processTrueClass, FalseClass (readonly)

Returns start source processing on initialization.

Returns:

  • (TrueClass, FalseClass)

    start source processing on initialization



131
132
133
# File 'lib/carnivore/source.rb', line 131

def auto_process
  @auto_process
end

#callback_supervisorCarnivore::Supervisor (readonly)

Returns supervisor maintaining callback instances.

Returns:



135
136
137
# File 'lib/carnivore/source.rb', line 135

def callback_supervisor
  @callback_supervisor
end

#callbacksArray<Callback> (readonly)

Returns registered callbacks.

Returns:

  • (Array<Callback>)

    registered callbacks



127
128
129
# File 'lib/carnivore/source.rb', line 127

def callbacks
  @callbacks
end

#message_loopQueue (readonly)

Returns local loop message queue.

Returns:

  • (Queue)

    local loop message queue



139
140
141
# File 'lib/carnivore/source.rb', line 139

def message_loop
  @message_loop
end

#message_registryHash (readonly)

Returns registry of processed messages.

Returns:

  • (Hash)

    registry of processed messages



137
138
139
# File 'lib/carnivore/source.rb', line 137

def message_registry
  @message_registry
end

#message_remoteQueue (readonly)

Returns remote message queue.

Returns:

  • (Queue)

    remote message queue



141
142
143
# File 'lib/carnivore/source.rb', line 141

def message_remote
  @message_remote
end

#nameString, Symbol (readonly)

Returns name of source.

Returns:

  • (String, Symbol)

    name of source



125
126
127
# File 'lib/carnivore/source.rb', line 125

def name
  @name
end

#processingTrueClass, FalseClass (readonly)

Returns currently processing a message.

Returns:

  • (TrueClass, FalseClass)

    currently processing a message



143
144
145
# File 'lib/carnivore/source.rb', line 143

def processing
  @processing
end

#run_processTrueClass, FalseClass (readonly)

Returns message processing control switch.

Returns:

  • (TrueClass, FalseClass)

    message processing control switch



133
134
135
# File 'lib/carnivore/source.rb', line 133

def run_process
  @run_process
end

Class Method Details

.build(args = {}) ⇒ SourceContainer

Builds a source container

Parameters:

  • args (Hash) (defaults to: {})

    source configuration

Options Hash (args):

  • :type (String, Symbol)

    type of source to build

  • :args (Hash)

    configuration hash for source initialization

Returns:



21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/carnivore/source.rb', line 21

def build(args={})
  [:args, :type].each do |key|
    unless(args.has_key?(key))
      abort ArgumentError.new "Missing required parameter `:#{key}`"
    end
  end
  require Source.require_path(args[:type]) || "carnivore/source/#{args[:type]}"
  klass = args[:type].to_s.split('_').map(&:capitalize).join
  klass = Source.const_get(klass)
  args[:args][:name] ||= Celluloid.uuid
  inst = SourceContainer.new(klass, args[:args])
  register(args[:args][:name], inst)
  inst
end

.clear!NilClass

Returns Remove any registered sources.

Returns:

  • (NilClass)

    Remove any registered sources



97
98
99
# File 'lib/carnivore/source.rb', line 97

def clear!
  sources_registry.clear
end

.provide(type, require_path) ⇒ TrueClass

Register a new source type

Parameters:

  • type (Symbol)

    name of source type

  • require_path (String)

    path to require when requested

Returns:

  • (TrueClass)


55
56
57
58
# File 'lib/carnivore/source.rb', line 55

def provide(type, require_path)
  source_classes[type] = require_path
  true
end

.register(name, inst) ⇒ TrueClass

Register the container

Parameters:

Returns:

  • (TrueClass)


73
74
75
76
# File 'lib/carnivore/source.rb', line 73

def register(name, inst)
  sources_registry[name] = inst
  true
end

.require_path(type) ⇒ String, NilClass

Registered path for given source type

Parameters:

  • type (String, Symbol)

    name of source type

Returns:

  • (String, NilClass)


64
65
66
# File 'lib/carnivore/source.rb', line 64

def require_path(type)
  source_classes[type]
end

.reset_comms!Object

Reset communication methods within class



102
103
104
105
106
107
108
109
110
111
112
# File 'lib/carnivore/source.rb', line 102

def reset_comms!
  self.class_eval do
    unless(method_defined?(:reset_communications?))
      alias_method :custom_transmit, :transmit
      alias_method :transmit, :_transmit
      def reset_communications?
        true
      end
    end
  end
end

.source(name) ⇒ SourceContainer

Source container with given name

Parameters:

  • name (String, Symbol)

    name of source

Returns:



82
83
84
85
86
87
88
89
# File 'lib/carnivore/source.rb', line 82

def source(name)
  if(sources_registry[name])
    sources_registry[name]
  else
    Celluloid.logger.error "Source lookup failed (name: #{name})"
    abort KeyError.new("Requested named source is not registered: #{name}")
  end
end

.source_classesSmash

Returns Source class information.

Returns:

  • (Smash)

    Source class information



37
38
39
40
41
# File 'lib/carnivore/source.rb', line 37

def source_classes
  memoize(:source_classes, :global) do
    Smash.new
  end
end

.sourcesArray<SourceContainer>

Returns registered source containers.

Returns:



92
93
94
# File 'lib/carnivore/source.rb', line 92

def sources
  sources_registry.values
end

.sources_registrySmash

Returns Registered source information.

Returns:

  • (Smash)

    Registered source information



44
45
46
47
48
# File 'lib/carnivore/source.rb', line 44

def sources_registry
  memoize(:sources, :global) do
    Smash.new
  end
end

Instance Method Details

#_transmit(*args) ⇒ TrueClass, FalseClass

Send to local loop if processing otherwise use regular transmit

Parameters:

  • args (Object)

    argument list

Returns:

  • (TrueClass, FalseClass)


482
483
484
485
486
487
488
489
490
491
492
493
494
# File 'lib/carnivore/source.rb', line 482

def _transmit(*args)
  begin
    if(loop_enabled? && processing)
      loop_transmit(*args)
    else
      custom_transmit(*args)
    end
    true
  rescue EncodingError => e
    error "Transmission failed due to encoding error! Error: #{e.class} - #{e} [(#{args.map(&:to_s).join(')(')})]"
    false
  end
end

#add_callback(callback_name, block_or_class) ⇒ self

Adds the given callback to the source for message processing

Parameters:

  • callback_name (String, Symbol)

    name of callback

  • block_or_class (Carnivore::Callback, Proc)

Returns:

  • (self)


308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# File 'lib/carnivore/source.rb', line 308

def add_callback(callback_name, block_or_class)
  name = "#{self.name}:#{callback_name}"
  if(block_or_class.is_a?(Class))
    size = block_or_class.workers || 1
    if(size < 1)
      warn "Callback class (#{block_or_class}) defined no workers. Skipping."
      return self
    elsif(size == 1)
      debug "Adding callback class (#{block_or_class}) under supervision. Name: #{callback_name(name)}"
      callback_supervisor.supervise_as callback_name(name), block_or_class, name, current_actor
    else
      debug "Adding callback class (#{block_or_class}) under supervision pool (#{size} workers). Name: #{callback_name(name)}"
      callback_supervisor.pool block_or_class, as: callback_name(name), size: size, args: [name, current_actor]
    end
  else
    debug "Adding custom callback class  from block (#{block_or_class}) under supervision. Name: #{callback_name(name)}"
    callback_supervisor.supervise_as callback_name(name), Callback, name, current_actor, block_or_class
  end
  callbacks.push(name).uniq!
  self
end

#auto_confirm?TrueClass, FalseClass

Returns automatic message confirmation enabled.

Returns:

  • (TrueClass, FalseClass)

    automatic message confirmation enabled



241
242
243
# File 'lib/carnivore/source.rb', line 241

def auto_confirm?
  @auto_confirm
end

#auto_process?TrueClass, FalseClass

Returns auto processing enabled.

Returns:

  • (TrueClass, FalseClass)

    auto processing enabled



221
222
223
# File 'lib/carnivore/source.rb', line 221

def auto_process?
  auto_process && !callbacks.empty?
end

#callback_name(name) ⇒ Carnivore::Callback, NilClass

Returns namespaced name (prefixed with source name and instance id)

Parameters:

  • name (String, Symbol)

    name of callback

Returns:



347
348
349
350
351
352
# File 'lib/carnivore/source.rb', line 347

def callback_name(name)
  unless(@callback_names[name])
    @callback_names[name] = [@name, self.object_id, name].join(':').to_sym
  end
  @callback_names[name]
end

#confirm(message) ⇒ Object

Confirm receipt of the message on source

Parameters:



299
300
301
# File 'lib/carnivore/source.rb', line 299

def confirm(message)
  debug 'No custom confirm declared'
end

#connectObject

Connection hook for sources requiring customized connect

Parameters:

  • args (Hash)

    initialization hash



265
266
267
# File 'lib/carnivore/source.rb', line 265

def connect
  debug 'No custom connect declared'
end

#format(msg) ⇒ Carnivore::Message

Create new Message from received payload

Parameters:

  • msg (Object)

    received payload

Returns:



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/carnivore/source.rb', line 358

def format(msg)
  actor = Carnivore::Supervisor.supervisor[name]
  if(actor)
    if(msg.is_a?(Hash) && msg.keys.map(&:to_s).sort == ['content', 'raw'])
      Message.new(
        :message => msg[:raw],
        :content => msg[:content],
        :source => actor.current_actor
      )
    else
      Message.new(
        :message => msg,
        :source => actor.current_actor
      )
    end
  else
    abort "Failed to locate self in registry (#{name})"
  end
end

#init_registryMessageRegistry

Load and initialize the message registry

Returns:

  • (MessageRegistry)

    new registry



516
517
518
519
# File 'lib/carnivore/source.rb', line 516

def init_registry
  require 'carnivore/message_registry'
  @message_registry = MessageRegistry.new
end

#inspectString

Returns inspection formatted string.

Returns:

  • (String)

    inspection formatted string



246
247
248
# File 'lib/carnivore/source.rb', line 246

def inspect
  "<#{self.class.name}:#{object_id} @name=#{name} @callbacks=#{Hash[*callbacks.map{|k,v| [k,v.object_id]}.flatten]}>"
end

#loop_enabled?TrueClass, FalseClass

Local message loopback is enabled. Custom sources should override this method to allow loopback delivery if desired

Returns:

  • (TrueClass, FalseClass)


500
501
502
# File 'lib/carnivore/source.rb', line 500

def loop_enabled?
  false
end

#loop_receive(*args) ⇒ Carnivore::Message, NilClass

Get received message on local loopback

Parameters:

  • args (Object)

    argument list (unused)

Returns:



462
463
464
# File 'lib/carnivore/source.rb', line 462

def loop_receive(*args)
  message_loop.shift
end

#loop_transmit(message, original_message = nil, args = {}) ⇒ TrueClass

Push message onto internal loop queue

Parameters:

  • message (Carnivore::Message)
  • original_message (Object) (defaults to: nil)

    unused

  • args (Hash) (defaults to: {})

    unused

Returns:

  • (TrueClass)


472
473
474
475
476
# File 'lib/carnivore/source.rb', line 472

def loop_transmit(message, original_message=nil, args={})
  message_loop.push message
  signal(:messages_available)
  true
end

#multiple_callbacks?TrueClass, FalseClass

Allow sending payload to multiple matching callbacks. Custom sources should override this method to disable multiple callback matches if desired.

Returns:

  • (TrueClass, FalseClass)


509
510
511
# File 'lib/carnivore/source.rb', line 509

def multiple_callbacks?
  allow_multiple_matches
end

#process(*args) ⇒ TrueClass

Process incoming messages from this source

Parameters:

  • args (Object)

    list of arguments

Returns:

  • (TrueClass)


401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/carnivore/source.rb', line 401

def process(*args)
  unless(processing)
    begin
      @processing = true
      while(run_process && !callbacks.empty?)
        # if(message_loop.empty? && message_remote.empty?)
        #   wait(:messages_available)
        # end
        msgs = receive
        # msgs.push message_loop.pop unless message_loop.empty?
        # msgs.push message_remote.pop unless message_remote.empty?
        msgs = [msgs].flatten.compact.map do |m|
          if(valid_message?(m))
            format(m)
          end
        end.compact
        msgs.each do |msg|
          if(multiple_callbacks? || respond_to?(:orphan_callback))
            valid_callbacks = callbacks.find_all do |name|
              callback_supervisor[callback_name(name)].valid?(msg)
            end
          else
            valid_callbacks = callbacks
          end
          if(valid_callbacks.empty?)
            warn "Received message was not processed through any callbacks on this source: #{msg}"
            orphan_callback(msg) if respond_to?(:orphan_callback)
          elsif(valid_callbacks.size > 1 && !multiple_callbacks?)
            error "Received message is valid for multiple callbacks but multiple callbacks are disabled: #{msg}"
            multiple_callback(msg) if respond_to?(:multiple_callback)
          else
            valid_callbacks.each do |name|
              debug "Dispatching message<#{msg[:message].object_id}> to callback<#{name} (#{callback_name(name)})>"
              callback_supervisor[callback_name(name)].async.call(msg)
            end
          end
        end
      end
    ensure
      @processing = false
    end
    true
  else
    false
  end
end

#receive(n = 1) ⇒ Object+

This method is abstract.

Receive messages from source

Parameters:

  • n (Integer) (defaults to: 1)

    number of messages

Returns:

  • (Object, Array<Object>)

    payload or array of payloads

Raises:

  • (NotImplementedError)


274
275
276
# File 'lib/carnivore/source.rb', line 274

def receive(n=1)
  raise NotImplementedError.new('Abstract method not valid for runtime')
end

#receive_messagesTrueClass

Receive messages from source

Returns:

  • (TrueClass)


450
451
452
453
454
455
456
# File 'lib/carnivore/source.rb', line 450

def receive_messages
  loop do
    message_remote.push receive
    signal(:messages_available)
  end
  true
end

#remove_callback(name) ⇒ self

Remove the named callback from the source

Parameters:

  • name (String, Symbol)

Returns:

  • (self)


334
335
336
337
338
339
340
341
# File 'lib/carnivore/source.rb', line 334

def remove_callback(name)
  unless(@callbacks.include?(callback_name(name)))
    abort NameError.new("Failed to locate callback named: #{name}")
  end
  actors[callback_name(name)].terminate
  @callbacks.delete(name)
  self
end

#setup(args = {}) ⇒ Object

Setup hook for source requiring customized setup

Parameters:

  • args (Hash) (defaults to: {})

    initialization hash



258
259
260
# File 'lib/carnivore/source.rb', line 258

def setup(args={})
  debug 'No custom setup declared'
end

#start!TrueClass, FalseClass

Start source if auto_process is enabled

Returns:

  • (TrueClass, FalseClass)


209
210
211
212
213
214
215
216
217
218
# File 'lib/carnivore/source.rb', line 209

def start!
  if(auto_process?)
    info 'Message processing started via auto start'
    async.process
    true
  else
    warn 'Message processing is disabled via auto start'
    false
  end
end

#teardown_cleanupObject

Ensure we cleanup our internal supervisor before bailing out



226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/carnivore/source.rb', line 226

def teardown_cleanup
  warn 'Termination request received. Tearing down!'
  if(callback_supervisor && callback_supervisor.alive?)
    begin
      warn "Tearing down callback supervisor! (#{callback_supervisor})"
      callback_supervisor.terminate
    rescue Celluloid::Task::TerminatedError
      warn 'Terminated task error during callback supervisor teardown. Moving on.'
    end
  else
    warn 'Callback supervisor is not alive. No teardown issued'
  end
end

#to_sString

Returns stringified instance.

Returns:

  • (String)

    stringified instance



251
252
253
# File 'lib/carnivore/source.rb', line 251

def to_s
  "<#{self.class.name}:#{object_id} @name=#{name}>"
end

#touch(message) ⇒ TrueClass, FalseClass

Touch message to reset timeout

Parameters:

Returns:

  • (TrueClass, FalseClass)


291
292
293
294
# File 'lib/carnivore/source.rb', line 291

def touch(message)
  warn 'Source#touch was not implemented for this source!'
  true
end

#transmit(message, original_message = nil, args = {}) ⇒ Object

Send payload to source

Parameters:

  • message (Object)

    payload

  • original_message (Carnviore::Message) (defaults to: nil)

    original message if reply to extract optional metadata

  • args (Hash) (defaults to: {})

    optional extra arguments

Raises:

  • (NotImplemented)


283
284
285
# File 'lib/carnivore/source.rb', line 283

def transmit(message, original_message=nil, args={})
  raise NotImplemented.new('Abstract method not valid for runtime')
end

#valid_message?(m) ⇒ TrueClass, FalseClass

Validate message is allowed before processing. This is currently only used when the message registry is enabled to prevent duplicate message processing.

Parameters:

Returns:

  • (TrueClass, FalseClass)


384
385
386
387
388
389
390
391
392
393
394
395
# File 'lib/carnivore/source.rb', line 384

def valid_message?(m)
  if(message_registry)
    if(message_registry.valid?(m))
      true
    else
      warn "Message was already received. Discarding: #{m.inspect}"
      false
    end
  else
    true
  end
end