Class: QueueUtil
- Inherits:
-
Object
- Object
- QueueUtil
- Defined in:
- lib/logstash/inputs/sfdc_elf/queue_util.rb
Overview
Handel parsing data into event objects and then enqueue all of the events to the queue.
Defined Under Namespace
Classes: EventLogFile
Constant Summary collapse
- LOG_KEY =
Constants
'SFDC - QueueUtil'
- SEPARATOR =
','
- QUOTE_CHAR =
'"'
Instance Method Summary collapse
- #enqueue_events(query_result_list, queue, client) ⇒ Object
- #get_event_log_file_records(query_result_list, client) ⇒ Object
-
#initialize ⇒ QueueUtil
constructor
A new instance of QueueUtil.
Constructor Details
#initialize ⇒ QueueUtil
Returns a new instance of QueueUtil.
17 18 19 |
# File 'lib/logstash/inputs/sfdc_elf/queue_util.rb', line 17 def initialize @logger = Cabin::Channel.get(LogStash) end |
Instance Method Details
#enqueue_events(query_result_list, queue, client) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/logstash/inputs/sfdc_elf/queue_util.rb', line 28 def enqueue_events(query_result_list, queue, client) @logger.info("#{LOG_KEY}: enqueue events") # Grab a list of Tempfiles that contains CSV file data. event_log_file_records = get_event_log_file_records(query_result_list, client) # Iterate though each record. event_log_file_records.each do |elf| begin # Create local variable to simplify & make code more readable. tmp = elf.temp_file # Get the schema from the first line in the tempfile. It will be in CSV format so we parse it, and it will # return an array. schema = CSV.parse_line(tmp.readline, col_sep: SEPARATOR, quote_char: QUOTE_CHAR) # Loop through tempfile, line by line. tmp.each_line do |line| # Parse the current line, it will return an string array. string_array = CSV.parse_line(line, col_sep: SEPARATOR, quote_char: QUOTE_CHAR) # Convert the string array into its corresponding type array. data = string_to_type_array(string_array, elf.field_types) # create_event will return a event object. queue << create_event(schema, data, elf.event_type) end ensure # Close tmp file and unlink it, doing this will delete the actual tempfile. tmp.close tmp.unlink end end # do loop, tempfile_list end |
#get_event_log_file_records(query_result_list, client) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/logstash/inputs/sfdc_elf/queue_util.rb', line 146 def get_event_log_file_records(query_result_list, client) @logger.info("#{LOG_KEY}: generating tempfile list") result = [] query_result_list.each do |event_log_file| # Get the path of the CSV file from the LogFile field, then stream the data to the .write method of the Tempfile tmp = Tempfile.new('sfdc_elf_tempfile') client.streaming_download(event_log_file.LogFile, tmp) # Flushing will write the buffer into the Tempfile itself. tmp.flush # Rewind will move the file pointer from the end to the beginning of the file, so that users can simple # call the Read method. tmp.rewind # Append the EventLogFile object into the result list field_types = event_log_file.LogFileFieldTypes.split(',') result << EventLogFile.new(field_types, tmp, event_log_file.EventType) # Log the info from event_log_file object. @logger.info(" #{LOG_KEY}: Id = #{event_log_file.Id}") @logger.info(" #{LOG_KEY}: EventType = #{event_log_file.EventType}") @logger.info(" #{LOG_KEY}: LogFile = #{event_log_file.LogFile}") @logger.info(" #{LOG_KEY}: LogDate = #{event_log_file.LogDate}") @logger.info(" #{LOG_KEY}: LogFileLength = #{event_log_file.LogFileLength}") @logger.info(" #{LOG_KEY}: LogFileFieldTypes = #{event_log_file.LogFileFieldTypes}") @logger.info(' ......................................') end result end |