Class: Nfcollector::PayloadProcessor
- Inherits:
-
Object
- Object
- Nfcollector::PayloadProcessor
- Defined in:
- lib/nfcollector/payload_processor.rb
Instance Method Summary collapse
-
#initialize(account_id, tz, attributes_string) ⇒ PayloadProcessor
constructor
A new instance of PayloadProcessor.
- #process_payload(io) ⇒ Object
Constructor Details
#initialize(account_id, tz, attributes_string) ⇒ PayloadProcessor
Returns a new instance of PayloadProcessor.
4 5 6 7 8 9 10 11 12 13 14 15 16 |
# File 'lib/nfcollector/payload_processor.rb', line 4 def initialize(account_id, tz, attributes_string) @account_id = account_id @tz = tz input_definition = Attributes.parse(attributes_string) # TODO: Will need to specify the Output class to use later @transpiler = Mapping::DefaultOutput.build_transpiler(input_definition, tz: @tz) @categoriser = Categoriser.new(account_id) @indexer = Mapping::Indexer.new(account_id) # TODO: Do the category lookups inside the categoriser and pass it the categories string # (Only do this if the input definition has the categories - actuall ALL the required inputs) @transpiler.after_row(@categoriser, using: [ :domain, :host, :created_at ]) @transpiler.after_row(@indexer, using: [ :username ]) end |
Instance Method Details
#process_payload(io) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/nfcollector/payload_processor.rb', line 18 def process_payload(io) # TODO: Look at streaming with GzipReader#read_partial later (not the lowest hanging fruit right now) gz = Zlib::GzipReader.new(io) strio = StringIO.new(gz.read) gz.close weblog_partitioner = WeblogPartition.get_partitioner(@account_id, @transpiler.index_of(:created_at), @transpiler.headers) # Improvements for the future # - Switched from excelsior to CSV because it was segfaulting under the latest Ruby # - Ideally we would use ccsv here but it cannot read from an IO, only a file # We could look at using sidekiq with Pure Ruby # or have a job unzip to a second file first (not very memory efficient, and error prone) # Or look at rewriting the collector in Go and use http://www.goworker.org/ (Ideal but lots of work) # csv = CSV.new(strio) while !csv.eof? mapped_row = @transpiler.transpile(csv.readline) # TODO: rename the transpile method weblog_partitioner.add_row(mapped_row) end # TODO: These could write on the fly in time cfws = weblog_partitioner.commit!(@account_id) @categoriser.commit! @indexer.commit! return cfws end |