Class: Embulk::Input::Splunk

Inherits:
InputPlugin
  • Object
show all
Defined in:
lib/embulk/input/splunk.rb

Constant Summary collapse

SPLUNK_UNLIMITED_RESULTS =

Zero means unlimited results. Splunk’s default is 100.

0
SPLUNK_TIME_FORMAT =
"%Y-%m-%dT%H:%M:%S.%L%:z"

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.resume(task, columns, count, &control) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/embulk/input/splunk.rb', line 38

def self.resume(task, columns, count, &control)
  task_reports = yield(task, columns, count)
  
  next_config_diff = {}
  
  if task["incremental"]
    next_config_diff[:earliest_time] = Time.parse( task_reports.first[:latest_time_in_results] ).strftime(SPLUNK_TIME_FORMAT)
  end
  
  return next_config_diff
end

.transaction(config, &control) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/embulk/input/splunk.rb', line 16

def self.transaction(config, &control)
  # configuration code:
  task = {
    "host" => config.param("host", :string),
    "port" => config.param("port", :integer, default: 8089),
    "username" => config.param("username", :string),
    "password" => config.param("password", :string),
    "query" => config.param("query", :string),
    "incremental" => config.param("incremental", :bool, default: false),
    "time_format" => config.param("time_format", :string, default: SPLUNK_TIME_FORMAT),
    
    "earliest_time" => config.param(:earliest_time, :string, default: "2010-01-01T00:00:00.000"),
  }

  columns = [
    Column.new(0, "time", :timestamp),
    Column.new(1, "result", :json),
  ]

  resume(task, columns, 1, &control)
end

Instance Method Details

#initObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/embulk/input/splunk.rb', line 50

def init
  # initialization code:
  splunk_config = {
    :scheme => :https,
    :host => task[:host],
    :port => task[:port],
    :username => task[:username],
    :password => task[:password]
  }
  
  @service = ::Splunk::connect(splunk_config)
  @query = task["query"]
  @earliest_time = task[:earliest_time]
end

#runObject



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/embulk/input/splunk.rb', line 65

def run
  stream = @service.create_oneshot(@query,
                                   count: SPLUNK_UNLIMITED_RESULTS,
                                   earliest_time: @earliest_time)

  reader = ::Splunk::ResultsReader.new(stream)
  
  latest_time_in_results = Time.at(0)

  reader.each do |result|
    event_time = Time.strptime( result["_time"], task[:time_format] )
    latest_time_in_results = [latest_time_in_results, event_time].max
    
    page_builder.add( [
      event_time,
      result.to_json
    ] )
  end
  
  page_builder.finish
  
  task_result = {
    latest_time_in_results: latest_time_in_results
  }

  return task_result
end