Class: Embulk::Input::Spotx

Inherits:
InputPlugin
  • Object
show all
Defined in:
lib/embulk/input/spotx.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.resume(task, columns, count, &control) ⇒ Object



25
26
27
28
29
30
# File 'lib/embulk/input/spotx.rb', line 25

def self.resume(task, columns, count, &control)
  _task_reports = yield(task, columns, count)

  next_config_diff = {}
  return next_config_diff
end

.transaction(config, &control) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/embulk/input/spotx.rb', line 9

def self.transaction(config, &control)
  task = {
    "endpoint" => config.param("endpoint", :string),
    "client_id" => config.param("client_id", :string),
    "client_secret" => config.param("client_secret", :string),
    "refresh_token" => config.param("refresh_token", :string),
    "headers" => config.param("headers", :hash, default: {}),
  }

  columns = [
    Column.new(0, "record", :json),
  ]

  resume(task, columns, 1, &control)
end

Instance Method Details

#initObject

TODO def self.guess(config)

sample_records = [
  {"example"=>"a", "column"=>1, "value"=>0.1},
  {"example"=>"a", "column"=>2, "value"=>0.2},
]
columns = Guess::SchemaGuess.from_hash_records(sample_records)
return {"columns" => columns}

end



42
43
44
45
46
47
48
# File 'lib/embulk/input/spotx.rb', line 42

def init
  @endpoint = task["endpoint"]
  @client_id = task["client_id"]
  @client_secret = task["client_secret"]
  @refresh_token = task["refresh_token"]
  @headers = task["headers"]
end

#refresh_access_tokenObject



50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/embulk/input/spotx.rb', line 50

def refresh_access_token
  uri = 'https://publisher-api.spotxchange.com/1.0/token'
  request_body = {
    client_id: @client_id,
    client_secret: @client_secret,
    refresh_token: @refresh_token,
    grant_type: 'refresh_token',
  }
  response = ::HTTP.post(uri, form: request_body)
  response_body = JSON.parse(response.body.to_s)
  response_body['value']['data']['access_token']
end

#request_dataObject



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/embulk/input/spotx.rb', line 63

def request_data
  auth = {"Authorization" => "Bearer #{refresh_access_token}"}
  response = ::HTTP.get(@endpoint, headers: @headers.merge(auth))
  body = ''
  response.body.each do |chunk|
    chunk.gsub!('{"value":{"data":[', '')
    chunk.gsub!(']}}', '')
    chunk.gsub!('},{', "}\n{")
    body += chunk
  end
  return body
end

#runObject



76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/embulk/input/spotx.rb', line 76

def run
  content = request_data
  content.split("\n").map do |line|
    row = JSON.parse(line)
    page_builder.add([row])
    row
  end
  page_builder.finish

  task_report = {}
  return task_report
end