Class: Pants::Seam

Inherits:
Readers::BaseReader show all
Includes:
LogSwitch::Mixin
Defined in:
lib/pants/seam.rb

Overview

A Seam is a core Pants object type (like Readers and Writers) that lets you attach to a Reader, work with the read data, and pass it on to attached Writers. It implements buffering by using EventMachine Queues: pop data off the @read_queue, work with it, then push it onto the @write_queue. Once on the @write_queue, the Seam will pass on to all Writers that have been added to it.

The @read_queue is wrapped by #read_items, which yields data chunks from the Reader in, allowing easy access to each bit of data as it was when it was read in. The @write_queue is wrapped by #write, which lets you just give it the data you want to pass on to the attached Writers.

Seams are particularly useful for working with network data, where if you're redirecting traffic from one place to another, you may need to alter data in those packets to make it useful to the receiving ends.

Instance Attribute Summary

Attributes inherited from Readers::BaseReader

#core_stopper_callback, #write_to_channel, #writers

Instance Method Summary collapse

Methods inherited from Readers::BaseReader

#add_seam, #add_writer, #read_object, #remove_writer, #running?, #stop!, #write_to

Constructor Details

#initialize(core_stopper_callback, reader_channel) ⇒ Seam

Returns a new instance of Seam.

Parameters:

  • core_stopper_callback (EventMachine::Callback)

    The callback that's provided by Core.

  • reader_channel (EventMachine::Channel)

    The channel from the Reader that the Seam is attached to.


29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/pants/seam.rb', line 29

def initialize(core_stopper_callback, reader_channel)
  @read_queue = EM::Queue.new
  @write_queue = EM::Queue.new
  @write_object ||= nil

  @receives = 0
  @reads = 0
  @writes = 0
  @sends = 0

  reader_channel.subscribe do |data|
    log "Got data on reader channel"
    @read_queue << data
    @receives += data.size
  end

  super(core_stopper_callback)
  send_data
end

Instance Method Details

#read_items(&block) {|item| ... } ⇒ Object

Call this to read data that was put into the read queue. It yields one “item” (however the data was put onto the queue) at a time. It will continually yield as there is data that comes in on the queue.

Parameters:

  • block (Proc)

    The block to yield items from the reader to.

Yields:

  • (item)

    Gives one item off the read queue.


89
90
91
92
93
94
95
96
97
# File 'lib/pants/seam.rb', line 89

def read_items(&block)
  processor = proc do |item|
    block.call(item)
    @reads += item.size
    @read_queue.pop(&processor)
  end

  @read_queue.pop(&processor)
end

#start(callback) ⇒ Object


49
50
51
52
53
# File 'lib/pants/seam.rb', line 49

def start(callback)
  super(callback)

  starter.call
end

#stopObject

Make sure you call this (with super()) in your child to ensure read and write queues are flushed.


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/pants/seam.rb', line 57

def stop
  log "Stopping..."
  log "receives #{@receives}"
  log "reads #{@reads}"
  log "writes #{@writes}"
  log "sends #{@sends}"

  finish_loop = EM.tick_loop do
    if @read_queue.empty? && @write_queue.empty?
      :stop
    end
  end

  finish_loop.on_stop { stopper.call }
end

#write(data) ⇒ Object

Call this after your Seam child has processed data and is ready to send it to its writers.

Parameters:

  • data (Object)

103
104
105
106
# File 'lib/pants/seam.rb', line 103

def write(data)
  @write_queue << data
  @writes += data.size
end

#write_objectString

Returns A String that identifies what the writer is writing to. This is simply used for displaying info to the user.

Returns:

  • (String)

    A String that identifies what the writer is writing to. This is simply used for displaying info to the user.


75
76
77
78
79
80
81
# File 'lib/pants/seam.rb', line 75

def write_object
  if @write_object
    @write_object
  else
    warn "No write_object info has been defined for this writer."
  end
end