Class: Embulk::Input::Splunk
- Inherits:
-
InputPlugin
- Object
- InputPlugin
- Embulk::Input::Splunk
- Defined in:
- lib/embulk/input/splunk.rb
Constant Summary collapse
- SPLUNK_UNLIMITED_RESULTS =
Zero means unlimited results. Splunk’s default is 100.
0- SPLUNK_TIME_FORMAT =
"%Y-%m-%dT%H:%M:%S.%L%:z"
Class Method Summary collapse
Instance Method Summary collapse
Class Method Details
.resume(task, columns, count, &control) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/embulk/input/splunk.rb', line 38 def self.resume(task, columns, count, &control) task_reports = yield(task, columns, count) next_config_diff = {} if task["incremental"] next_config_diff[:earliest_time] = Time.parse( task_reports.first[:latest_time_in_results] ).strftime(SPLUNK_TIME_FORMAT) end return next_config_diff end |
.transaction(config, &control) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/embulk/input/splunk.rb', line 16 def self.transaction(config, &control) # configuration code: task = { "host" => config.param("host", :string), "port" => config.param("port", :integer, default: 8089), "username" => config.param("username", :string), "password" => config.param("password", :string), "query" => config.param("query", :string), "incremental" => config.param("incremental", :bool, default: false), "time_format" => config.param("time_format", :string, default: SPLUNK_TIME_FORMAT), "earliest_time" => config.param(:earliest_time, :string, default: "2010-01-01T00:00:00.000"), } columns = [ Column.new(0, "time", :timestamp), Column.new(1, "result", :json), ] resume(task, columns, 1, &control) end |
Instance Method Details
#init ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/embulk/input/splunk.rb', line 50 def init # initialization code: splunk_config = { :scheme => :https, :host => task[:host], :port => task[:port], :username => task[:username], :password => task[:password] } @service = ::Splunk::connect(splunk_config) @query = task["query"] @earliest_time = task[:earliest_time] end |
#run ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/embulk/input/splunk.rb', line 65 def run stream = @service.create_oneshot(@query, count: SPLUNK_UNLIMITED_RESULTS, earliest_time: @earliest_time) reader = ::Splunk::ResultsReader.new(stream) latest_time_in_results = Time.at(0) reader.each do |result| event_time = Time.strptime( result["_time"], task[:time_format] ) latest_time_in_results = [latest_time_in_results, event_time].max page_builder.add( [ event_time, result.to_json ] ) end page_builder.finish task_result = { latest_time_in_results: latest_time_in_results } return task_result end |