Class: Pants::Readers::BaseReader

Inherits:
Object
  • Object
show all
Includes:
LogSwitch::Mixin
Defined in:
lib/pants/readers/base_reader.rb

Direct Known Subclasses

FileReader, UDPReader, Seam

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(core_stopper_callback) ⇒ BaseReader

Returns a new instance of BaseReader.

Parameters:

  • core_stopper_callback (EventMachine::Callback)

    This gets called when all reading is done and the writers have written out all their data. It signals to the caller that the job of the reader is all done. For first level readers (readers that are not Tees), this lets Pants check all existing Readers to see if they're done, so it can know when to stop the reactor.


27
28
29
30
31
32
33
34
35
# File 'lib/pants/readers/base_reader.rb', line 27

def initialize(core_stopper_callback)
  @writers = []
  @write_to_channel = EM::Channel.new
  @core_stopper_callback = core_stopper_callback
  @read_object ||= nil
  @starter = nil
  @stopper = nil
  @running = false
end

Instance Attribute Details

#core_stopper_callbackEventMachine::Callback (readonly)

Returns The callback from Core that should be called when the Reader is done reading.

Returns:

  • (EventMachine::Callback)

    The callback from Core that should be called when the Reader is done reading.


18
19
20
# File 'lib/pants/readers/base_reader.rb', line 18

def core_stopper_callback
  @core_stopper_callback
end

#write_to_channelEventMachine::Channel (readonly)

Returns The channel that Writers should subscribe to.

Returns:

  • (EventMachine::Channel)

    The channel that Writers should subscribe to.


14
15
16
# File 'lib/pants/readers/base_reader.rb', line 14

def write_to_channel
  @write_to_channel
end

#writersArray (readonly)

Returns The list of Writers attached to the Reader.

Returns:

  • (Array)

    The list of Writers attached to the Reader.


10
11
12
# File 'lib/pants/readers/base_reader.rb', line 10

def writers
  @writers
end

Instance Method Details

#add_seam(klass, *args) ⇒ Pants::Seam

Allows for adding a Pants::Seam (or child) object to the reader's list of internal writers. For more info on Seams, see the docs for Pants::Seam.

Parameters:

  • klass (Pants::Seam)

    The class of the Pants::Seam object to create.

Returns:

See Also:


205
206
207
208
209
# File 'lib/pants/readers/base_reader.rb', line 205

def add_seam(klass, *args)
  @writers << klass.new(@core_stopper_callback, @write_to_channel, *args)

  @writers.last
end

#add_writer(obj, *args) ⇒ Object

One method of adding a Writer to the Reader. Use this method to add an a) already instantiated Writer object, or b) a Writers from a class of Writer objects.

Notice how using the last method requires you to pass in the channel that the reader is pushing data to–this is probably one reason for avoiding this method of adding a writer, yet remains available for flexibility.

Examples:

Add using class and init variables

core = Pants::Core.new
core.read 'udp://10.2.3.4:9000'
core.add_writer(Pants::Writers::UDPWriter, '10.5.6.7', 9000)

Add using an already instantiated Writer object

core = Pants::Core.new
reader = core.read 'udp://10.2.3.4:9000'
writer = Pants::Writers::UDPWriter.new('10.5.6.7', 9000, reader.write_to_channel)
core.add_writer(writer)

Parameters:

  • obj (Class, Pants::Reader)

    Either the class of a Writer to create, or an already created Writer object.

  • args (*)

    Any arguments that need to be used for creating the Writer.


130
131
132
133
134
135
136
137
138
139
140
# File 'lib/pants/readers/base_reader.rb', line 130

def add_writer(obj, *args)
  if obj.is_a? Class
    @writers << obj.new(*args, @write_to_channel)
  elsif obj.kind_of? Pants::Writers::BaseWriter
    @writers << obj
  else
    raise Pants::Error, "Don't know how to add a writer of type #{obj}"
  end

  @writers.last
end

#read_objectString

Allows for adding “about me” info, depending on the reader type. This info is printed out when Pants starts, so you know get confirmation of what you're about to do. If you don't define this in your reader, nothing will be printed out.

Returns:

  • (String)

    A String that identifies what the reader is reading from.


76
77
78
79
80
81
82
# File 'lib/pants/readers/base_reader.rb', line 76

def read_object
  if @read_object
    @read_object
  else
    warn "No read_object info has been defined for this reader."
  end
end

#remove_writer(obj, key_value_pairs = nil) ⇒ Object

Removes a writer object from the internal list of writers.

Examples:

Using URI

reader.writers    # => [<Pants::Writers::FileWriter @file_path='./testfile'...>]
reader.remove_writer('./testfile')
reader.writers    # => []

Using class and args as key/value pairs

reader.writers    # => [<Pants::Writers::FileWriter @file_path='./testfile'...>]
reader.remove_writer(Pants::Writers::FileWriter, file_path: './testfile')
reader.writers    # => []

Parameters:

  • obj (Class)

    Class of the writer to remove.

  • key_value_pairs (Hash) (defaults to: nil)

    Keys are methods to be called on each writer and will be checked to see if the return value from that method equals the given value.


159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/pants/readers/base_reader.rb', line 159

def remove_writer(obj, key_value_pairs=nil)
  if obj.is_a? Class
    @writers.delete_if do |writer|
      writer.is_a?(obj) &&
        key_value_pairs.all? { |k, v| writer.send(k) == v }
    end
  elsif obj.is_a? String
    writer = begin
      uri = obj.is_a?(URI) ? obj : URI(obj)
    rescue URI::InvalidURIError
      find_writer_from_uri(nil)
    else
      find_writer_from_uri(uri)
    end

    unless writer
      raise ArgumentError, "No writer found wth URI scheme: #{uri.scheme}"
    end

    key_value_pairs = if writer[:args]
      writer[:args].inject({}) do |result, arg|
        result[arg] = uri.send(arg)

        result
      end
    else
      {}
    end

    @writers.delete_if do |w|
      w.is_a?(writer[:klass]) &&
        key_value_pairs.all? { |k, v| w.send(k) == v }
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)

85
86
87
# File 'lib/pants/readers/base_reader.rb', line 85

def running?
  @running
end

#start(callback) ⇒ Object

Starts all of the writers, then starts the reader. Child readers must call this to make sure the writers are all running and ready for data before the reader starts pushing data onto its Channel.

Parameters:

  • callback (EventMachine::Callback)

    Once all writers are up and running, this gets called, letting the caller know all Writers are up and running. This should contain all code that the child Reader wants to execute on start.


45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/pants/readers/base_reader.rb', line 45

def start(callback)
  start_loop = EM.tick_loop do
    if @writers.empty? || @writers.all?(&:running?)
      :stop
    end
  end
  start_loop.on_stop { callback.call }

  log "Starting writers for reader #{self.__id__}..."
  EM::Iterator.new(@writers).each do |writer, iter|
    writer.start
    iter.next
  end
end

#stop!Object

Calls the reader's #stopper, thus forcing the reader to shutdown. For readers that intend to read a finite amount of data, the Reader should call the #stopper when it's done; for readers that read a non-stop stream (i.e. like an open socket), this gets called by OS signals (i.e. if you ctrl-c).


65
66
67
# File 'lib/pants/readers/base_reader.rb', line 65

def stop!
  stopper.call
end

#write_to(uri) ⇒ Pants::Writers::BaseWriter

Returns The newly created writer.

Parameters:

  • uri (String)

    The URI to the object to read. Can be of URI type that's defined in Pants.writers.

Returns:


93
94
95
96
97
98
99
100
101
102
103
# File 'lib/pants/readers/base_reader.rb', line 93

def write_to(uri)
  begin
    uri = uri.is_a?(URI) ? uri : URI(uri)
  rescue URI::InvalidURIError
    @writers << new_writer_from_uri(nil, @write_to_channel)
  else
    @writers << new_writer_from_uri(uri, @write_to_channel)
  end

  @writers.last
end