Class: Nfcollector::PayloadProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/nfcollector/payload_processor.rb

Instance Method Summary collapse

Constructor Details

#initialize(account_id, tz, attributes_string) ⇒ PayloadProcessor

Returns a new instance of PayloadProcessor.



4
5
6
7
8
9
10
11
12
13
14
15
16
# File 'lib/nfcollector/payload_processor.rb', line 4

def initialize(, tz, attributes_string)
  @account_id = 
  @tz         = tz
  input_definition = Attributes.parse(attributes_string)
  # TODO: Will need to specify the Output class to use later
  @transpiler = Mapping::DefaultOutput.build_transpiler(input_definition, tz: @tz)
  @categoriser = Categoriser.new()
  @indexer     = Mapping::Indexer.new()
  # TODO: Do the category lookups inside the categoriser and pass it the categories string
  # (Only do this if the input definition has the categories - actuall ALL the required inputs)
  @transpiler.after_row(@categoriser, using: [ :domain, :host, :created_at ])
  @transpiler.after_row(@indexer, using: [ :username ])
end

Instance Method Details

#process_payload(io) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/nfcollector/payload_processor.rb', line 18

def process_payload(io)
  # TODO: Look at streaming with GzipReader#read_partial later (not the lowest hanging fruit right now)
  gz = Zlib::GzipReader.new(io)
  strio = StringIO.new(gz.read)
  gz.close

  weblog_partitioner = WeblogPartition.get_partitioner(@account_id, @transpiler.index_of(:created_at), @transpiler.headers)

  # Improvements for the future
  # - Switched from excelsior to CSV because it was segfaulting under the latest Ruby
  # - Ideally we would use ccsv here but it cannot read from an IO, only a file
  # We could look at using sidekiq with Pure Ruby
  # or have a job unzip to a second file first (not very memory efficient, and error prone)
  # Or look at rewriting the collector in Go and use http://www.goworker.org/ (Ideal but lots of work)
  #
  csv = CSV.new(strio)
  while !csv.eof?
    mapped_row = @transpiler.transpile(csv.readline) # TODO: rename the transpile method
    weblog_partitioner.add_row(mapped_row)
  end

  # TODO: These could write on the fly in time
  cfws = weblog_partitioner.commit!(@account_id)
  @categoriser.commit!
  @indexer.commit!
  return cfws
end