Class: LogStash::Inputs::SfdcElf
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::SfdcElf
- Defined in:
- lib/logstash/inputs/sfdc_elf.rb
Overview
This plugin enables Salesforce customers to load EventLogFile(ELF) data from their Force.com orgs. The plugin will handle downloading ELF CSV file, parsing them, and handling any schema changes transparently.
Constant Summary collapse
- LOG_KEY =
'SFDC'
- RETRY_ATTEMPTS =
3
Instance Method Summary collapse
Instance Method Details
#register ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/logstash/inputs/sfdc_elf.rb', line 53 def register # Initialize the client. @client = ClientWithStreamingSupport.new @client.client_id = @client_id.value @client.client_secret = @client_secret.value @client.host = @host @client.version = '46.0' # Authenticate the client @logger.info("#{LOG_KEY}: tyring to authenticate client") @client.retryable_authenticate(username: @username, password: @password.value + @security_token.value, retry_attempts: RETRY_ATTEMPTS) @logger.info("#{LOG_KEY}: authenticating succeeded") # Save org id to distinguish between multiple orgs. @org_id = @client.query('select id from Organization')[0]['Id'] # Set up time interval for forever while loop. @poll_interval_in_seconds = @poll_interval_in_minutes * 60 # Handel the @path config passed by the user. If path does not exist then set @path to home directory. verify_path # Handel parsing the data into event objects and enqueue it to the queue. @queue_util = QueueUtil.new # Handel when to schedule the next process based on the @poll_interval_in_hours config. @scheduler = Scheduler.new(@poll_interval_in_seconds) # Handel state of the plugin based on the read and writes of LogDates to the .sdfc_info_logstash file. @state_persistor = StatePersistor.new(@path, @org_id) # Grab the last indexed log date. @last_indexed_log_date = @state_persistor.get_last_indexed_log_date @logger.info("#{LOG_KEY}: @last_indexed_log_date = #{@last_indexed_log_date}") end |
#run(queue) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/logstash/inputs/sfdc_elf.rb', line 98 def run(queue) @scheduler.schedule do # Line for readable log statements. @logger.info('---------------------------------------------------') # Grab a list of SObjects, specifically EventLogFiles. soql_expr= "SELECT Id, EventType, Logfile, LogDate, LogFileLength, LogFileFieldTypes FROM EventLogFile WHERE LogDate > #{@last_indexed_log_date} and EventType in (#{@eventtypesstring}) and Sequence>0 and Interval='Hourly' ORDER BY LogDate ASC" query_result_list = @client.retryable_query(username: @username, password: @password.value + @security_token.value, retry_attempts: RETRY_ATTEMPTS, soql_expr: soql_expr) @logger.info("#{LOG_KEY}: query result size = #{query_result_list.size}") if !query_result_list.empty? # query_result_list is in ascending order based on the LogDate, so grab the last one of the list and save the # LogDate to @last_read_log_date and .sfdc_info_logstash @last_indexed_log_date = query_result_list.last.LogDate.strftime('%FT%T.%LZ') # TODO: grab tempfiles here!! # Overwrite the .sfdc_info_logstash file with the @last_read_log_date. # Note: we currently do not support deduplication, but will implement it soon. # TODO: need to implement deduplication # TODO: might have to move this after enqueue_events(), in case of a crash in between. # TODO: can do all @state_persistor calls after the if statement @state_persistor.update_last_indexed_log_date(@last_indexed_log_date) # Creates events from query_result_list, then simply append the events to the queue. @queue_util.enqueue_events(query_result_list, queue, @client) end end # do loop end |