Class: Pants::Core

Inherits:
Object
  • Object
show all
Includes:
LogSwitch::Mixin
Defined in:
lib/pants/core.rb

Overview

A single Core object is necessary for Pants to run. Root-level readers are attached to the core, then writers are attached to those readers. It's main job, other than giving a home to readers, is to handle the order of starting up and shutting of the readers and writers so that no data is lost.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ Core

Returns a new instance of Core.


23
24
25
26
27
28
# File 'lib/pants/core.rb', line 23

def initialize(&block)
  setup_signals
  @readers = []

  @convenience_block = block
end

Instance Attribute Details

#readersArray (readonly)

Returns The list of readers that are reading data.

Returns:

  • (Array)

    The list of readers that are reading data.


21
22
23
# File 'lib/pants/core.rb', line 21

def readers
  @readers
end

Instance Method Details

#add_reader(obj, *args) ⇒ Object

One method of adding a Reader to the Core. Use this method to add an a) already instantiated Reader object, or b) a Reader from a class of Reader objects.

Notice how using the last method requires you to pass in the core's callback method–this is probably one reason for avoiding this method of adding a reader, yet remains available for flexibility.

Examples:

Add using class and init variables

core = Pants::Core.new
core.add_reader(Pants::Readers::UDPReader, '10.2.3.4', 9000)

Add using an already instantiated Reader object

core = Pants::Core.new
reader = Pants::Readers::UDPReader.new('10.2.3.4', 9000, core.callback)
core.add_reader(reader)

Parameters:

  • obj (Class, Pants::Reader)

    Either the class of a Reader to create, or an already created Reader object.

  • args (*)

    Any arguments that need to be used for creating the Reader.


76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/pants/core.rb', line 76

def add_reader(obj, *args)
  if obj.is_a? Class
    @readers << obj.new(*args, callback)
  elsif obj.kind_of? Pants::Readers::BaseReader
    @readers << obj
  else
    raise Pants::Error, "Don't know how to add a reader of type #{obj}"
  end

  @convenience_block.call(@readers.last) if @convenience_block

  @readers.last
end

#callbackEventMachine::Callback

Creates an EventMachine::Callback method that other Readers, Writers, and others can use for letting the Core know when it can shutdown. Those Readers, Writers, etc. should handle calling this callback when they're done doing what they need to do.

Returns:

  • (EventMachine::Callback)

96
97
98
99
100
101
102
# File 'lib/pants/core.rb', line 96

def callback
  EM.Callback do
    if @readers.none?(&:running?)
      EM.stop_event_loop
    end
  end
end

#read(uri) ⇒ Pants::Reader

One method of adding a Reader to the Core. Use this method to make code reader nicer when reading something that's expressed as a URI.

Examples:

core = Pants::Core.new
core.read 'udp://10.2.3.4:9000'

Parameters:

  • uri (String, URI)

    The URI to the object to read. Can be a file:///, udp://, or a string with the path to a file.

Returns:

  • (Pants::Reader)

    The newly created reader.


41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/pants/core.rb', line 41

def read(uri)
  begin
    uri = uri.is_a?(URI) ? uri : URI(uri)
  rescue URI::InvalidURIError
    @readers << new_reader_from_uri(nil, callback)
  else
    @readers << new_reader_from_uri(uri, callback)
  end

  @convenience_block.call(@readers.last) if @convenience_block

  @readers.last
end

#restartObject

Stop, then run.


154
155
156
157
158
# File 'lib/pants/core.rb', line 154

def restart
  stop!
  puts "Restarting..."
  run
end

#runObject

Starts the EventMachine reactor, the reader and the writers.

Raises:


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/pants/core.rb', line 105

def run
  raise Pants::Error, "No readers added yet" if @readers.empty?

  starter = proc do
    puts "Pants v#{Pants::VERSION}"
    puts ">> Reading from #{@readers.size} readers"

    @readers.each_with_index do |reader, i|
      puts ">> reader#{i}:  Starting read on: #{reader.read_object}"
      puts ">> reader#{i}:  Writing to #{reader.writers.size} writers"

      reader.writers.each_with_index do |writer, j|
        puts ">> reader#{i}writer#{j}:  #{writer.write_object}"
      end
    end

    EM::Iterator.new(@readers).each do |reader, iter|
      reader.start
      iter.next
    end
  end

  if EM.reactor_running?
    log "Joining reactor..."
    starter.call
  else
    log "Starting reactor..."
    EM.run(&starter)
  end
end

#stop!Object

Tells the reader to signal to its writers that it's time to finish.


137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/pants/core.rb', line 137

def stop!
  puts "Stop called.  Closing readers and writers..."

  if @readers.none?(&:running?)
    puts "No readers are running; nothing to do."
  else
    puts "Stopping readers:"

    @readers.each do |reader|
      puts "\t#{reader}" if reader.running?
    end

    @readers.each(&:stop!)
  end
end