Module: TreasureData::Command

Includes:
Options, Helpers, Updater
Defined in:
lib/td/command/db.rb,
lib/td/command/job.rb,
lib/td/command/help.rb,
lib/td/command/list.rb,
lib/td/command/user.rb,
lib/td/command/query.rb,
lib/td/command/sched.rb,
lib/td/command/table.rb,
lib/td/command/apikey.rb,
lib/td/command/common.rb,
lib/td/command/export.rb,
lib/td/command/import.rb,
lib/td/command/result.rb,
lib/td/command/runner.rb,
lib/td/command/sample.rb,
lib/td/command/schema.rb,
lib/td/command/server.rb,
lib/td/command/status.rb,
lib/td/command/update.rb,
lib/td/command/account.rb,
lib/td/command/options.rb,
lib/td/command/password.rb,
lib/td/command/workflow.rb,
lib/td/command/connector.rb,
lib/td/command/bulk_import.rb

Defined Under Namespace

Modules: List, Options Classes: BulkImportExecutionError, CommandExecutor, ImportError, JsonParser, MessagePackParser, ParameterConfigurationError, Runner, SizeBasedDownloadProgressIndicator, StructuredParser, TextParser, TimeBasedDownloadProgressIndicator, UpdateError, WorkflowError

Constant Summary collapse

JOB_WAIT_MAX_RETRY_COUNT_ON_NETWORK_ERROR =

TODO

10
PRIORITY_FORMAT_MAP =
{
  -2 => 'VERY LOW',
  -1 => 'LOW',
  0 => 'NORMAL',
  1 => 'HIGH',
  2 => 'VERY HIGH',
}
PRIORITY_PARSE_MAP =
{
  /\Avery[ _\-]?low\z/i => -2,
  /\A-2\z/ => -2,
  /\Alow\z/i => -1,
  /\A-1\z/ => -1,
  /\Anorm(?:al)?\z/i => 0,
  /\A[\-\+]?0\z/ => 0,
  /\Ahigh\z/i => 1,
  /\A[\+]?1\z/ => 1,
  /\Avery[ _\-]?high\z/i => 2,
  /\A[\+]?2\z/ => 2,
}
HIVE_RESERVED_KEYWORDS =
%W[
  TRUE FALSE ALL AND OR NOT LIKE ASC DESC ORDER BY GROUP WHERE FROM AS SELECT DISTINCT INSERT OVERWRITE
  OUTER JOIN LEFT RIGHT FULL ON PARTITION PARTITIONS TABLE TABLES TBLPROPERTIES SHOW MSCK DIRECTORY LOCAL
  TRANSFORM USING CLUSTER DISTRIBUTE SORT UNION LOAD DATA INPATH IS NULL CREATE EXTERNAL ALTER DESCRIBE
  DROP REANME TO COMMENT BOOLEAN TINYINT SMALLINT INT BIGINT FLOAT DOUBLE DATE DATETIME TIMESTAMP STRING
  BINARY ARRAY MAP REDUCE PARTITIONED CLUSTERED SORTED INTO BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED
  COLLECTION ITEMS KEYS LINES STORED SEQUENCEFILE TEXTFILE INPUTFORMAT OUTPUTFORMAT LOCATION TABLESAMPLE BUCKET OUT
  OF CAST ADD REPLACE COLUMNS RLIKE REGEXP TEMPORARY FUNCTION EXPLAIN EXTENDED SERDE WITH SERDEPROPERTIES LIMIT SET TBLPROPERTIES
]
KEY_NUM_LIMIT =
512
IMPORT_TEMPLATES =
{
  'apache' => [
                /^([^ ]*) [^ ]* ([^ ]*) \[([^\]]*)\] "(\S+)(?: +([^ ]*) +\S*)?" ([^ ]*) ([^ ]*)(?: "([^\"]*)" "([^\"]*)")?$/,
                ['host', 'user', 'time', 'method', 'path', 'code', 'size', 'referer', 'agent'],
                "%d/%b/%Y:%H:%M:%S %z"],
  'syslog' => [
                /^([^ ]* [^ ]* [^ ]*) ([^ ]*) ([a-zA-Z0-9_\/\.\-]*)(?:\[([0-9]+)\])?[^\:]*\: *(.*)$/,
                ['time', 'host', 'ident', 'pid', 'message'],
                "%b %d %H:%M:%S"],
}
SUPPORTED_FORMATS =
%W[json.gz line-json.gz tsv.gz jsonl.gz]
SUPPORTED_ENCRYPT_METHOD =
%W[s3]
JAVA_COMMAND =
"java"
JAVA_MAIN_CLASS =
"com.treasure_data.td_import.BulkImportCommand"
JVM_OPTS =

TODO

["-Xmx1024m"]

Instance Method Summary collapse

Methods included from Helpers

format_with_delimiter, home_directory, on_64bit_os?, on_mac?, on_windows?

Methods included from Updater

#maven_repo_root

Methods included from Updater::ModuleDefinition

#client_version_from_path, #compare_versions, #disable, #disable?, #disable_message, #endpoint_root, #fetch, #get_client_version_file, #home_directory, #inject_libpath, #installed_client_path, #jarfile_dest_path, #latest_local_version, #on_mac?, #on_windows?, #package_category, #raise_error, #stream_fetch, #update_package_endpoint, #updated_client_path, #updating_lock_path, #version_endpoint, #wait_for_lock

Methods included from Options

#job_show_options, write_format_option

Instance Method Details

#account(op) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/td/command/account.rb', line 6

def (op)
  op.banner << "\noptions:\n"

  force = false
  op.on('-f', '--force', 'overwrite current account setting', TrueClass) {|b|
    force = true
  }

  user_name = op.cmd_parse

  endpoint = nil
  # user may be calling 'td account' with the -e / --endpoint
  # option, which we want to preserve and save
  begin
     endpoint = Config.endpoint
  rescue ConfigNotFoundError => e
    # the endpoint is neither stored in the config file
    # nor passed as option on the command line
  end

  conf = nil
  begin
    conf = Config.read
  rescue ConfigError
  end

  if conf && conf['account.apikey']
    unless force
      if conf['account.user']
        $stderr.puts "Account is already configured with '#{conf['account.user']}' account."
      else
        $stderr.puts "Account is already configured."
      end
      $stderr.puts "Add '-f' option to overwrite."
      exit 0
    end
  end

  $stdout.puts "Enter your Treasure Data credentials. For Google SSO user, please see https://docs.treasuredata.com/display/public/PD/Configuring+Authentication+for+TD+Using+the+TD+Toolbelt#ConfiguringAuthenticationforTDUsingtheTDToolbelt-SettingUpGoogleSSOUsers"
  unless user_name
    begin
      $stdout.print "Email: "
      line = STDIN.gets || ""
      user_name = line.strip
    rescue Interrupt
      $stderr.puts "\ncanceled."
      exit 1
    end
  end

  if user_name.empty?
    $stderr.puts "canceled."
    exit 0
  end

  client = nil

  3.times do
    begin
      $stdout.print "Password (typing will be hidden): "
      password = get_password
    rescue Interrupt
      $stderr.print "\ncanceled."
      exit 1
    ensure
      system "stty echo"   # TODO termios
      $stdout.print "\n"
    end

    if password.empty?
      $stderr.puts "canceled."
      exit 0
    end

    begin
      # enalbe SSL for the authentication
      opts = {}
      opts[:ssl] = true
      opts[:endpoint] = endpoint if endpoint
      client = Client.authenticate(user_name, password, opts)
    rescue TreasureData::AuthError
      $stderr.puts "User name or password mismatched."
    end

    break if client
  end
  return unless client

  $stdout.puts "Authenticated successfully."

  conf ||= Config.new
  conf["account.user"] = user_name
  conf["account.apikey"] = client.apikey
  conf['account.endpoint'] = endpoint if endpoint
  conf.save

  $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "db:create <db_name>' to create a database."
end

#account_usage(op) ⇒ Object



105
106
107
108
109
110
111
112
# File 'lib/td/command/account.rb', line 105

def (op)
  op.cmd_parse

  client = get_client
  a = client.

  $stderr.puts "Storage:  #{a.storage_size_string}"
end

#apikey_set(op) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/td/command/apikey.rb', line 26

def apikey_set(op)
  op.banner << "\noptions:\n"

  force = false
  op.on('-f', '--force', 'overwrite current account setting', TrueClass) {|b|
    force = true
  }

  apikey = op.cmd_parse

  conf = nil
  begin
    conf = Config.read
  rescue ConfigError
  end
  if conf && conf['account.apikey']
    unless force
      if conf['account.user']
        $stderr.puts "Account is already configured with '#{conf['account.user']}' account."
      else
        $stderr.puts "Account is already configured."
      end
      $stderr.puts "Add '-f' option to overwrite."
      exit 0
    end
  end

  conf ||= Config.new
  conf.delete("account.user")
  conf["account.apikey"] = apikey
  conf.save

  $stdout.puts "API key is set."
  $stdout.puts "Use '#{$prog} db:create <db_name>' to create a database."
end

#apikey_show(op) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/td/command/apikey.rb', line 5

def apikey_show(op)
  if Config.apikey
    $stdout.puts Config.apikey
    return
  end

  conf = nil
  begin
    conf = Config.read
  rescue ConfigError
  end

  if !conf || !conf['account.apikey']
    $stderr.puts "Account is not configured yet."
    $stderr.puts "Use '#{$prog} apikey:set' or '#{$prog} account' first."
    exit 1
  end

  $stdout.puts conf['account.apikey']
end

#bulk_import_commit(op) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/td/command/bulk_import.rb', line 279

def bulk_import_commit(op)
  wait = false

  op.on('-w', '--wait', 'wait for finishing the commit', TrueClass) {|b|
    wait = b
  }

  name = op.cmd_parse

  client = get_client

  job = client.commit_bulk_import(name)

  $stderr.puts "Bulk import session '#{name}' started to commit."

  if wait
    wait_commit(name) # wait the commit
  end
end

#bulk_import_create(op) ⇒ Object



28
29
30
31
32
33
34
35
# File 'lib/td/command/bulk_import.rb', line 28

def bulk_import_create(op)
  name, db_name, table_name = op.cmd_parse

  client = get_client
  client.create_bulk_import(name, db_name, table_name, {})

  $stderr.puts "Bulk import session '#{name}' is created."
end

#bulk_import_delete(op) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/td/command/bulk_import.rb', line 37

def bulk_import_delete(op)
  name = op.cmd_parse

  client = get_client

  begin
    client.delete_bulk_import(name)
  rescue NotFoundError
    cmd_debug_error $!
    $stderr.puts "Bulk import session '#{name}' does not exist."
    exit 1
  end

  $stderr.puts "Bulk import session '#{name}' is deleted."
end

#bulk_import_delete_part(op) ⇒ Object

obsoleted



199
200
201
202
203
204
205
206
207
# File 'lib/td/command/bulk_import.rb', line 199

def bulk_import_delete_part(op)
  name, part_name = op.cmd_parse

  client = get_client

  client.bulk_import_delete_part(name, part_name)

  $stderr.puts "Part '#{part_name}' is deleted."
end

#bulk_import_delete_parts(op) ⇒ Object



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/td/command/bulk_import.rb', line 209

def bulk_import_delete_parts(op)
  part_prefix = ""

  op.on('-P', '--prefix NAME', 'add prefix to parts name') {|s|
    part_prefix = s
  }

  name, *part_names = op.cmd_parse

  client = get_client

  part_names.each {|part_name|
    part_name = part_prefix + part_name

    $stderr.puts "Deleting '#{part_name}'..."
    client.bulk_import_delete_part(name, part_name)
  }

  $stderr.puts "done."
end

#bulk_import_list(op) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/td/command/bulk_import.rb', line 6

def bulk_import_list(op)
  set_render_format_option(op)

  op.cmd_parse

  client = get_client

  bis = client.bulk_imports

  rows = []
  bis.each {|bi|
    rows << {:Name=>bi.name, :Table=>"#{bi.database}.#{bi.table}", :Status=>bi.status.to_s.capitalize, :Frozen=>bi.upload_frozen? ? 'Frozen' : '', :JobID=>bi.job_id, :"Valid Parts"=>bi.valid_parts, :"Error Parts"=>bi.error_parts, :"Valid Records"=>bi.valid_records, :"Error Records"=>bi.error_records}
  }

  $stdout.puts cmd_render_table(rows, :fields => [:Name, :Table, :Status, :Frozen, :JobID, :"Valid Parts", :"Error Parts", :"Valid Records", :"Error Records"], :max_width=>200, :render_format => op.render_format)

  if rows.empty?
    $stderr.puts "There are no bulk import sessions."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "bulk_import:create <name> <db> <table>' to create a session."
  end
end

#bulk_import_perform(op) ⇒ Object



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/td/command/bulk_import.rb', line 230

def bulk_import_perform(op)
  wait = false
  force = false
  pool_name = nil

  op.on('-w', '--wait', 'wait for finishing the job', TrueClass) {|b|
    wait = b
  }
  op.on('-f', '--force', 'force start performing', TrueClass) {|b|
    force = b
  }
  op.on('-O', '--pool-name NAME', 'specify resource pool by name') {|s|
    pool_name = s
  }

  name = op.cmd_parse

  client = get_client

  unless force
    bis = client.bulk_imports
    bi = bis.find {|bi| name == bi.name }
    if bi
      if bi.status == 'performing'
        $stderr.puts "Bulk import session '#{name}' is already performing."
        $stderr.puts "Add '-f' option to force start."
        $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "job:kill #{bi.job_id}' to cancel the last trial."
        exit 1
      elsif bi.status == 'ready'
        $stderr.puts "Bulk import session '#{name}' is already ready to commit."
        $stderr.puts "Add '-f' option to force start."
        exit 1
      end
    end
  end

  opts = {}
  opts['pool_name'] = pool_name if pool_name
  job = client.perform_bulk_import(name, opts)

  $stderr.puts "Job #{job.job_id} is queued."
  $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "job:show [-w] #{job.job_id}' to show the status."

  if wait
    require 'td/command/job'  # wait_job
    wait_job(job)
  end
end

#bulk_import_show(op) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/td/command/bulk_import.rb', line 53

def bulk_import_show(op)
  name = op.cmd_parse

  client = get_client

  bi = client.bulk_import(name)
  unless bi
    $stderr.puts "Bulk import session '#{name}' does not exist."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "bulk_import:create <name> <db> <table>' to create a session."
    exit 1
  end

  $stderr.puts "Name         : #{bi.name}"
  $stderr.puts "Database     : #{bi.database}"
  $stderr.puts "Table        : #{bi.table}"
  $stderr.puts "Status       : #{bi.status.to_s.capitalize}"
  $stderr.puts "Frozen       : #{bi.upload_frozen?}"
  $stderr.puts "JobID        : #{bi.job_id}"
  $stderr.puts "Valid Records: #{bi.valid_records}"
  $stderr.puts "Error Records: #{bi.error_records}"
  $stderr.puts "Valid Parts  : #{bi.valid_parts}"
  $stderr.puts "Error Parts  : #{bi.error_parts}"
  $stderr.puts "Uploaded Parts :"

  list = client.list_bulk_import_parts(name)
  list.each {|name|
    $stdout.puts "  #{name}"
  }
end

#bulk_import_upload_part(op) ⇒ Object

obsoleted



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/td/command/bulk_import.rb', line 84

def bulk_import_upload_part(op)
  retry_limit = 10
  retry_wait = 1

  name, part_name, path = op.cmd_parse

  File.open(path, "rb") {|io|
    bulk_import_upload_impl(name, part_name, io, io.size, retry_limit, retry_wait)
  }

  $stderr.puts "Part '#{part_name}' is uploaded."
end

#bulk_import_upload_parts(op) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/td/command/bulk_import.rb', line 97

def bulk_import_upload_parts(op)
  retry_limit = 10
  retry_wait = 1
  suffix_count = 0
  part_prefix = ""
  auto_perform = false
  parallel = 2
  pool_name = nil

  op.on('-P', '--prefix NAME', 'add prefix to parts name') {|s|
    part_prefix = s
  }
  op.on('-s', '--use-suffix COUNT', 'use COUNT number of . (dots) in the source file name to the parts name', Integer) {|i|
    suffix_count = i
  }
  op.on('--auto-perform', 'perform bulk import job automatically', TrueClass) {|b|
    auto_perform = b
  }
  op.on('--parallel NUM', 'perform uploading in parallel (default: 2; max 8)', Integer) {|i|
    parallel = i
  }
  op.on('-O', '--pool-name NAME', 'specify resource pool by name') {|s|
    pool_name = s
  }

  name, *files = op.cmd_parse

  # validate the session
  bi = get_client.bulk_imports.find {|bi| name == bi.name }
  unless bi
    $stderr.puts "Bulk import session '#{name}' does not exist. Please check the first argument of the command."
    exit 1
  end

  parallel = 1 if parallel <= 1
  parallel = 8 if parallel >= 8

  threads = (1..parallel).map {|i|
    Thread.new do
      errors = []
      until files.empty?
        ifname = files.shift
        basename = File.basename(ifname)
        begin
          part_name = part_prefix + basename.split('.')[0..suffix_count].join('.')

          File.open(ifname, "rb") {|io|
            size = io.size
            $stderr.write "Uploading '#{ifname}' -> '#{part_name}'... (#{size} bytes)\n"

            bulk_import_upload_impl(name, part_name, io, size, retry_limit, retry_wait)
          }
        rescue
          errors << [ifname, $!]
        end
      end
      errors
    end
  }

  errors = []
  threads.each {|t|
    errors.concat t.value
  }

  unless errors.empty?
    $stderr.puts "failed to upload #{errors.size} files."
    $stderr.puts "backtraces:"
    errors.each {|(ifname,ex)|
      $stderr.puts "  #{ifname}: #{ex}"
      ex.backtrace.each {|bt|
        $stderr.puts "      #{ifname}: #{bt}"
      }
    }
    $stderr.puts "files:"
    ifnames = errors.map {|(ifname,ex)| ifname }
    ifnames.each {|ifname|
      $stderr.puts "  #{ifname}"
    }
    $stderr.puts "You can retry uploading by following command:"
    $stderr.puts "td bulk_import:upload_parts #{name} #{ifnames.map {|ifname| "'#{ifname}'" }.join(' ')}"
    exit 1
  end

  $stderr.puts "done."

  if auto_perform
    opts = {}
    opts['pool_name'] = pool_name if pool_name
    client = get_client
    job = client.perform_bulk_import(name, opts)

    $stderr.puts "Job #{job.job_id} is queued."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "job:show [-w] #{job.job_id}' to show the status."
  end
end

#bulk_import_upload_parts2(op) ⇒ Object



194
195
196
# File 'lib/td/command/bulk_import.rb', line 194

def bulk_import_upload_parts2(op)
  $stdout.puts "This command is moved to 'td import:upload' since 0.10.85."
end

#connector_create(op) ⇒ Object



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/td/command/connector.rb', line 175

def connector_create(op)
  # TODO it's a must parameter at this moment but API should be fixed
  opts = {:timezone => 'UTC'}
  op.on('--time-column COLUMN_NAME', "data partitioning key") {|s|
    opts[:time_column] = s
  }
  op.on('-t', '--timezone TZ', "name of the timezone.",
                               "  Only extended timezones like 'Asia/Tokyo', 'America/Los_Angeles' are supported,",
                               "  (no 'PST', 'PDT', etc...).",
                               "  When a timezone is specified, the cron schedule is referred to that timezone.",
                               "  Otherwise, the cron schedule is referred to the UTC timezone.",
                               "  E.g. cron schedule '0 12 * * *' will execute daily at 5 AM without timezone option",
                               "  and at 12PM with the -t / --timezone 'America/Los_Angeles' timezone option") {|s|
    opts[:timezone] = s
  }
  op.on('-D', '--delay SECONDS', 'delay time of the schedule', Integer) {|i|
    opts[:delay] = i
  }

  name, cron, database, table, config_file = op.cmd_parse

  config = prepare_bulkload_job_config(config_file)
  opts[:cron] = cron

  client = get_client()
  get_table(client, database, table)

  session = client.bulk_load_create(name, database, table, opts.merge(config: config))
  dump_connector_session(session)
end

#connector_delete(op) ⇒ Object



259
260
261
262
263
264
265
266
267
# File 'lib/td/command/connector.rb', line 259

def connector_delete(op)
  name = op.cmd_parse

  client = get_client()
  session = client.bulk_load_delete(name)
  $stdout.puts 'Deleted session'
  $stdout.puts '--'
  dump_connector_session(session)
end

#connector_guess(op) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/td/command/connector.rb', line 18

def connector_guess(op)
  type = 's3'
  id = secret = source = nil
  out = 'config.yml'
  guess_plugins = {}

  op.on('--type[=TYPE]', "(obsoleted)") { |s| type = s }
  op.on('--access-id ID', "(obsoleted)") { |s| id = s }
  op.on('--access-secret SECRET', "(obsoleted)") { |s| secret = s }
  op.on('--source SOURCE', "(obsoleted)") { |s| source = s }
  op.on('-o', '--out FILE_NAME', "output file name for connector:preview") { |s| out = s }
  op.on('-g', '--guess NAME,NAME,...', 'specify list of guess plugins that users want to use') {|s|
    guess_plugins['guess_plugins'] = s.split(',')
  }

  config_file = op.cmd_parse
  if config_file
    config = prepare_bulkload_job_config(config_file)
    out ||= config_file
  else
    begin
      $stdout.puts 'Command line option is obsoleted. You should use configuration file.'
      required('--access-id', id)
      required('--access-secret', secret)
      required('--source', source)
      required('--out', out)
    rescue ParameterConfigurationError
      if id == nil && secret == nil && source == nil
        $stdout.puts op.to_s
        $stdout.puts ""
        raise ParameterConfigurationError, "path to configuration file is required"
      else
        raise
      end
    end

    uri = URI.parse(source)
    endpoint = uri.host
    path_components = uri.path.scan(/\/[^\/]*/)
    bucket = path_components.shift.sub(/\//, '')
    path_prefix = path_components.join.sub(/\//, '')

    config = {
      :type => type,
      :access_key_id => id,
      :secret_access_key => secret,
      :endpoint => endpoint,
      :bucket => bucket,
      :path_prefix => path_prefix,
    }
  end

  config = TreasureData::ConnectorConfigNormalizer.new(config).normalized_config
  config['exec'].merge!(guess_plugins)

  client = get_client
  job = client.bulk_load_guess(config: config)

  create_file_backup(out)
  if /\.json\z/ =~ out
    config_str = JSON.pretty_generate(job['config'])
  else
    config_str = config_to_yaml(job['config'])
  end
  File.open(out, 'w') do |f|
    f << config_str
  end

  $stdout.puts "Guessed configuration:"
  $stdout.puts
  $stdout.puts config_str
  $stdout.puts
  $stdout.puts "Created #{out} file."
  $stdout.puts "Use '#{$prog} " + Config.cl_options_string + "connector:preview #{out}' to see bulk load preview."
end

#connector_history(op) ⇒ Object



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/td/command/connector.rb', line 269

def connector_history(op)
  set_render_format_option(op)
  name = op.cmd_parse

  fields = [:JobID, :Status, :Records, :Database, :Table, :Priority, :Started, :Duration]
  client = get_client()
  rows = client.bulk_load_history(name).map { |e|
    time_property = if e['start_at']
      {
        :Started => Time.at(e['start_at']),
        :Duration => (e['end_at'].nil? ? Time.now.to_i : e['end_at']) - e['start_at'],
      }
    else
      {:Started => '', :Duration => ''}
    end

    {
      :JobID => e['job_id'],
      :Status => e['status'],
      :Records => e['records'],
      # TODO: td-client-ruby should retuan only name
      :Database => e['database'] ? e['database']['name'] : '',
      :Table    => e['table']    ? e['table']['name']    : '',
      :Priority => e['priority'],
    }.merge(time_property)
  }
  $stdout.puts cmd_render_table(rows, :fields => fields, :render_format => op.render_format)
end

#connector_issue(op) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/td/command/connector.rb', line 120

def connector_issue(op)
  database = table = nil
  time_column      = nil
  wait             = false
  auto_create      = false

  op.on('--database DB_NAME', "destination database") { |s| database = s }
  op.on('--table TABLE_NAME', "destination table") { |s| table = s }
  op.on('--time-column COLUMN_NAME', "data partitioning key") { |s| time_column = s }  # unnecessary but for backward compatibility
  op.on('-w', '--wait', 'wait for finishing the job', TrueClass) { |b| wait = b }
  op.on('--auto-create-table', "Create table and database if doesn't exist", TrueClass) { |b|
    auto_create = b
  }

  config_file = op.cmd_parse

  required('--database', database)
  required('--table', table)

  config = prepare_bulkload_job_config(config_file)
  (config['out'] ||= {})['time_column'] = time_column if time_column  # TODO will not work once embulk implements multi-job

  client = get_client()

  if auto_create
    create_database_and_table_if_not_exist(client, database, table)
  end

  job_id = client.bulk_load_issue(database, table, config: config)

  $stdout.puts "Job #{job_id} is queued."
  $stdout.puts "Use '#{$prog} " + Config.cl_options_string + "job:show #{job_id}' to show the status."

  if wait
    wait_connector_job(client, job_id)
  end
end

#connector_list(op) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/td/command/connector.rb', line 158

def connector_list(op)
  set_render_format_option(op)
  op.cmd_parse

  client = get_client()
  # TODO database and table is empty at present. Fix API or Client.
  keys = ['name', 'cron', 'timezone', 'delay', 'database', 'table']
  fields = keys.map { |e| e.capitalize.to_sym }
  rows = client.bulk_load_list().sort_by { |e|
    e['name']
  }.map { |e|
    Hash[fields.zip(e.values_at(*keys))]
  }

  $stdout.puts cmd_render_table(rows, :fields => fields, :render_format => op.render_format, resize: false)
end

#connector_preview(op) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/td/command/connector.rb', line 94

def connector_preview(op)
  set_render_format_option(op)
  config_file = op.cmd_parse
  config = prepare_bulkload_job_config(config_file)
  client = get_client()
  preview = client.bulk_load_preview(config: config)

  cols = preview['schema'].sort_by { |col|
    col['index']
  }
  fields = cols.map { |col| col['name'] + ':' + col['type'] }
  types = cols.map { |col| col['type'] }
  rows = preview['records'].map { |row|
    cols = {}
    row.each_with_index do |col, idx|
      cols[fields[idx]] = col.inspect
    end
    cols
  }

  $stdout.puts cmd_render_table(rows, :fields => fields, :render_format => op.render_format, :resize => false)

  $stdout.puts "Update #{config_file} and use '#{$prog} " + Config.cl_options_string + "connector:preview #{config_file}' to preview again."
  $stdout.puts "Use '#{$prog} " + Config.cl_options_string + "connector:issue #{config_file}' to run Server-side bulk load."
end

#connector_run(op) ⇒ Object



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/td/command/connector.rb', line 298

def connector_run(op)
  wait = false
  op.on('-w', '--wait', 'wait for finishing the job', TrueClass) { |b| wait = b }

  name, scheduled_time = op.cmd_parse
  time = if scheduled_time
    Time.parse(scheduled_time).to_i
  else
    current_time.to_i
  end

  client = get_client()
  job_id = client.bulk_load_run(name, time)

  $stdout.puts "Job #{job_id} is queued."
  $stdout.puts "Use '#{$prog} " + Config.cl_options_string + "job:show #{job_id}' to show the status."

  if wait
    wait_connector_job(client, job_id)
  end
end

#connector_show(op) ⇒ Object



206
207
208
209
210
211
212
# File 'lib/td/command/connector.rb', line 206

def connector_show(op)
  name = op.cmd_parse

  client = get_client()
  session = client.bulk_load_show(name)
  dump_connector_session(session)
end

#connector_update(op) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/td/command/connector.rb', line 214

def connector_update(op)
  settings = {}

  op.on('-n', '--newname NAME', 'change the schedule\'s name', String) {|n|
    settings['name'] = n
  }
  op.on('-d', '--database DB_NAME', 'change the database', String) {|s|
    settings['database'] = s
  }
  op.on('-t', '--table TABLE_NAME', 'change the table', String) {|s|
    settings['table'] = s
  }
  op.on('-s', '--schedule [CRON]', 'change the schedule or leave blank to remove the schedule', String) {|s|
    settings['cron'] = s || ''
  }
  op.on('-z', '--timezone TZ', "name of the timezone.",
                               "  Only extended timezones like 'Asia/Tokyo', 'America/Los_Angeles' are supported,",
                               "  (no 'PST', 'PDT', etc...).",
                               "  When a timezone is specified, the cron schedule is referred to that timezone.",
                               "  Otherwise, the cron schedule is referred to the UTC timezone.",
                               "  E.g. cron schedule '0 12 * * *' will execute daily at 5 AM without timezone option",
                               "  and at 12PM with the -t / --timezone 'America/Los_Angeles' timezone option", String) {|s|
    settings['timezone'] = s
  }
  op.on('-D', '--delay SECONDS', 'change the delay time of the schedule', Integer) {|i|
    settings['delay'] = i
  }
  op.on('-T', '--time-column COLUMN_NAME', 'change the name of the time column', String) {|s|
    settings['time_column'] = s
  }
  op.on('-c', '--config CONFIG_FILE', 'update the connector configuration', String) {|s|
    settings['config'] = s
  }
  op.on('--config-diff CONFIG_DIFF_FILE', "update the connector config_diff", String) { |s| settings['config_diff'] = s }

  name, config_file = op.cmd_parse
  settings['config'] = config_file if config_file
  op.cmd_usage 'nothing to update' if settings.empty?
  settings['config'] = prepare_bulkload_job_config(settings['config']) if settings.key?('config')
  settings['config_diff'] = prepare_bulkload_job_config(settings['config_diff']) if settings.key?('config_diff')
  client = get_client()
  session = client.bulk_load_update(name, settings)
  dump_connector_session(session)
end

#db_create(op) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/td/command/db.rb', line 48

def db_create(op)
  db_name = op.cmd_parse

  API.validate_database_name(db_name)

  client = get_client

  opts = {}
  begin
    client.create_database(db_name, opts)
  rescue AlreadyExistsError
    $stderr.puts "Database '#{db_name}' already exists."
    exit 1
  end

  $stderr.puts "Database '#{db_name}' is created."
  $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "table:create #{db_name} <table_name>' to create a table."
end

#db_delete(op) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/td/command/db.rb', line 67

def db_delete(op)
  force = false
  op.on('-f', '--force', 'clear tables and delete the database', TrueClass) {|b|
    force = true
  }

  db_name = op.cmd_parse

  client = get_client

  begin
    db = client.database(db_name)

    if !force && !db.tables.empty?
      $stderr.puts "Database '#{db_name}' is not empty. Use '-f' option or drop tables first."
      exit 1
    end

    db.delete
  rescue NotFoundError
    $stderr.puts "Database '#{db_name}' does not exist."
    exit 1
  end

  $stderr.puts "Database '#{db_name}' is deleted."
end

#db_list(op) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/td/command/db.rb', line 28

def db_list(op)
  set_render_format_option(op)

  op.cmd_parse

  client = get_client
  dbs = client.databases

  rows = []
  dbs.each {|db|
    rows << {:Name=>db.name, :Count=>db.count}
  }
  $stdout.puts cmd_render_table(rows, :fields => [:Name, :Count], :render_format => op.render_format)

  if dbs.empty?
    $stderr.puts "There are no databases."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "db:create <db_name>' to create a database."
  end
end

#db_show(op) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/td/command/db.rb', line 5

def db_show(op)
  set_render_format_option(op)

  db_name = op.cmd_parse

  client = get_client

  db = get_database(client, db_name)

  rows = []
  db.tables.each {|table|
    pschema = table.schema.fields.map {|f|
      "#{f.name}:#{f.type}"
    }.join(', ')
    rows << {:Table => table.name, :Type => table.type.to_s, :Count => table.count.to_s, :Schema=>pschema.to_s}
  }
  rows = rows.sort_by {|map|
    [map[:Type].size, map[:Table]]
  }

  $stdout.puts cmd_render_table(rows, :fields => [:Table, :Type, :Count, :Schema], :render_format => op.render_format)
end

#export_result(op) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/td/command/export.rb', line 8

def export_result(op)
  wait = false
  priority = nil
  retry_limit = nil

  op.on('-w', '--wait', 'wait until the job is completed', TrueClass) {|b|
    wait = b
  }
  op.on('-P', '--priority PRIORITY', 'set priority') {|s|
    priority = job_priority_id_of(s)
    unless priority
      raise "unknown priority #{s.inspect} should be -2 (very-low), -1 (low), 0 (normal), 1 (high) or 2 (very-high)"
    end
  }
  op.on('-R', '--retry COUNT', 'automatic retrying count', Integer) {|i|
    retry_limit = i
  }

  target_job_id, result = op.cmd_parse

  client = get_ssl_client

  opts = {
    result: result,
    retry_limit: retry_limit,
    priority: priority,
  }
  if wait
    job = client.job(target_job_id)
    if !job.finished?
      $stderr.puts "target job #{target_job_id} is still running..."
      wait_job(job)
    end
  end

  job = client.result_export(target_job_id, opts)
  $stderr.puts "result export job #{job.job_id} is queued."
  $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "job:show #{job.job_id}' to show the status."

  if wait
    wait_job(job)
    $stdout.puts "status     : #{job.status}"
  end
end

#export_table(op) ⇒ Object



53
54
55
# File 'lib/td/command/export.rb', line 53

def export_table(op)
  table_export(op)
end

#get_history(client, name, from, to) ⇒ Object



408
409
410
411
412
413
414
415
416
417
418
419
# File 'lib/td/command/sched.rb', line 408

def get_history(client, name, from, to)
  begin
    history = client.history(name, from, to)
  rescue NotFoundError
    cmd_debug_error $!
    $stderr.puts "Schedule '#{name}' does not exist."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "sched:list' to show list of the schedules."
    exit 1
  end

  history
end

#help(op) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/td/command/help.rb', line 5

def help(op)
  cmd = op.cmd_parse

  c = List.get_option(cmd)
  if c == nil
     $stderr.puts "'#{cmd}' is not a td command. Run '#{$prog}' to show the list."
     List.show_guess(cmd)
     exit 1

  elsif c.name != cmd && c.group == cmd
    # group command
    $stdout.puts List.cmd_usage(cmd)
    exit 1

  else
    method, cmd_req_connectivity = List.get_method(cmd)
    method.call(['--help'])
  end
end

#help_all(op) ⇒ Object



25
26
27
28
29
30
31
# File 'lib/td/command/help.rb', line 25

def help_all(op)
  cmd = op.cmd_parse

  TreasureData::Command::List.show_help(op.summary_indent)
  $stdout.puts ""
  $stdout.puts "Type '#{$prog} help COMMAND' for more information on a specific command."
end

#import_auto(op) ⇒ Object



47
48
49
# File 'lib/td/command/import.rb', line 47

def import_auto(op)
  import_by_java(op)
end

#import_commit(op) ⇒ Object



61
62
63
64
# File 'lib/td/command/import.rb', line 61

def import_commit(op)
  require 'td/command/bulk_import'
  bulk_import_commit(op)
end

#import_config(op) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/td/command/import.rb', line 81

def import_config(op)
  out = 'td-bulkload.yml'
  options = {
    'format' => 'csv'
  }
  not_migrate_options = []
  op.on('-o', '--out FILE_NAME', "output file name for connector:guess") { |s| out = s }
  op.on('-f', '--format FORMAT', "source file format [csv, tsv, mysql]; default=csv") { |s| options['format'] = s }

  op.on('--db-url URL',           "Database Connection URL") { |s| options['db_url']      = s }
  op.on('--db-user NAME',         "user name for database")  { |s| options['db_user']     = s }
  op.on('--db-password PASSWORD', "password for database")   { |s| options['db_password'] = s }
  %w(--columns --column-header --time-column --time-format).each do |not_migrate_option|
    opt_arg_name = not_migrate_option.gsub('--', '').upcase
    op.on("#{not_migrate_option} #{opt_arg_name}", 'not supported') { |s| not_migrate_options << not_migrate_option }
  end

  arg = op.cmd_parse

  unless %w(mysql csv tsv).include?(options['format'])
    raise ParameterConfigurationError, "#{options['format']} is unknown format. Support format is csv, tsv and mysql."
  end

  unless not_migrate_options.empty?
    be = not_migrate_options.size == 1 ? 'is' : 'are'
    $stderr.puts "`#{not_migrate_options.join(', ')}` #{be} not migrate. Please, edit config file after execute guess commands."
  end

  $stdout.puts "Generating #{out}..."

  config = generate_seed_confing(options['format'], arg, options)
  config_str = YAML.dump(config)


  create_file_backup(out)
  File.open(out, 'w') {|f| f << config_str }

  if config['out']['type'] == 'td'
    show_message_for_td_output_plugin(out)
  else
    show_message_for_td_data_connector(out)
  end
end

#import_create(op) ⇒ Object



23
24
25
26
# File 'lib/td/command/import.rb', line 23

def import_create(op)
  require 'td/command/bulk_import'
  bulk_import_create(op)
end

#import_delete(op) ⇒ Object



66
67
68
69
# File 'lib/td/command/import.rb', line 66

def import_delete(op)
  require 'td/command/bulk_import'
  bulk_import_delete(op)
end

#import_error_records(op) ⇒ Object



56
57
58
59
# File 'lib/td/command/import.rb', line 56

def import_error_records(op)
  require 'td/command/bulk_import'
  bulk_import_error_records(op)
end

#import_freeze(op) ⇒ Object



71
72
73
74
# File 'lib/td/command/import.rb', line 71

def import_freeze(op)
  require 'td/command/bulk_import'
  bulk_import_freeze(op)
end

#import_jar_update(op) ⇒ Object



34
35
36
37
# File 'lib/td/command/import.rb', line 34

def import_jar_update(op)
  op.cmd_parse
  check_n_update_jar(false)
end

#import_jar_version(op) ⇒ Object



28
29
30
31
32
# File 'lib/td/command/import.rb', line 28

def import_jar_version(op)
  op.cmd_parse
  version = find_version_file
  $stdout.puts "td-import-java #{File.open(version, 'r').read}"
end

#import_list(op) ⇒ Object



13
14
15
16
# File 'lib/td/command/import.rb', line 13

def import_list(op)
  require 'td/command/bulk_import'
  bulk_import_list(op)
end

#import_perform(op) ⇒ Object



51
52
53
54
# File 'lib/td/command/import.rb', line 51

def import_perform(op)
  require 'td/command/bulk_import'
  bulk_import_perform(op)
end

#import_prepare(op) ⇒ Object



39
40
41
# File 'lib/td/command/import.rb', line 39

def import_prepare(op)
  import_by_java(op)
end

#import_show(op) ⇒ Object



18
19
20
21
# File 'lib/td/command/import.rb', line 18

def import_show(op)
  require 'td/command/bulk_import'
  bulk_import_show(op)
end

#import_unfreeze(op) ⇒ Object



76
77
78
79
# File 'lib/td/command/import.rb', line 76

def import_unfreeze(op)
  require 'td/command/bulk_import'
  bulk_import_unfreeze(op)
end

#import_upload(op) ⇒ Object



43
44
45
# File 'lib/td/command/import.rb', line 43

def import_upload(op)
  import_by_java(op)
end

#job_kill(op) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/td/command/job.rb', line 147

def job_kill(op)
  job_id = op.cmd_parse

  client = get_client

  former_status = client.kill(job_id)
  if TreasureData::Job::FINISHED_STATUS.include?(former_status)
    $stderr.puts "Job #{job_id} is already finished (#{former_status})"
    exit 0
  end

  if former_status == TreasureData::Job::STATUS_RUNNING
    $stderr.puts "Job #{job_id} is killed."
  else
    $stderr.puts "Job #{job_id} is canceled."
  end
end

#job_list(op) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/td/command/job.rb', line 31

def job_list(op)
  page = 0
  skip = 0
  status = nil
  slower_than = nil

  op.on('-p', '--page PAGE', 'skip N pages', Integer) {|i|
    page = i
  }
  op.on('-s', '--skip N', 'skip N jobs', Integer) {|i|
    skip = i
  }
  op.on('-R', '--running', 'show only running jobs', TrueClass) {|b|
    status = 'running'
  }
  op.on('-S', '--success', 'show only succeeded jobs', TrueClass) {|b|
    status = 'success'
  }
  op.on('-E', '--error', 'show only failed jobs', TrueClass) {|b|
    status = 'error'
  }
  op.on('--slow [SECONDS]', 'show slow queries (default threshold: 3600 seconds)', Integer) {|i|
    slower_than = i || 3600
  }

  set_render_format_option(op)

  max = op.cmd_parse

  max = (max || 20).to_i

  client = get_client

  if page
    skip += max * page
  end

  conditions = nil
  if slower_than
    conditions = {:slower_than => slower_than}
  end

  jobs = client.jobs(skip, skip + max - 1, status, conditions)

  rows = []
  jobs.each {|job|
    job.auto_update_status = false
    start = job.start_at
    elapsed = Command.humanize_elapsed_time(start, job.end_at)
    cpu_time = Command.humanize_time(job.cpu_time, true)
    priority = job_priority_name_of(job.priority)
    query = (op.render_format == 'table' || op.render_format.nil? ? job.query.to_s[0,50] + " ..." : job.query)
    rows << {
      :JobID => job.job_id,
      :Database => job.db_name,
      :Status => job.status,
      :Type => job.type,
      :Query => query,
      :Start => (start ? start.localtime : ''),
      :Elapsed => elapsed.rjust(11),
      :CPUTime => cpu_time.rjust(17),
      :ResultSize => (job.result_size ? Command.humanize_bytesize(job.result_size, 2) : ""),
      :Priority => priority,
      :Result => job.result_url,
      :Duration => job.duration ? Time.at(job.duration).utc.strftime("%X") : nil
    }
  }

  $stdout.puts cmd_render_table(rows,
    :fields => [:JobID, :Status, :Start, :Elapsed, :CPUTime, :ResultSize, :Priority, :Result, :Type, :Database, :Query, :Duration],
    :max_width => 1000,
    :render_format => op.render_format
  )
end

#job_show(op) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/td/command/job.rb', line 106

def job_show(op)
  options = job_show_options(op)
  job_id = op.cmd_parse

  verbose     = options[:verbose]
  wait        = options[:wait]
  output      = options[:output]
  format      = options[:format]
  render_opts = options[:render_opts]
  limit       = options[:limit]
  exclude     = options[:exclude]

  if output.nil? && format
    unless ['tsv', 'csv', 'json'].include?(format)
      raise ParameterConfigurationError,
            "Supported formats are only tsv, csv and json without -o / --output option"
    end
  end

  if render_opts[:header]
    unless ['json', 'tsv', 'csv'].include?(format)
      raise ParameterConfigurationError,
            "Option -c / --column-header is only supported with json, tsv and csv formats"
    end
  end

  if !output.nil? && !limit.nil?
    raise ParameterConfigurationError,
          "Option -l / --limit is only valid when not outputting to file (no -o / --output option provided)"
  end

  get_and_show_result(job_id, wait, exclude, output, limit, format, render_opts, verbose)
end

#job_show_option_argv(argv_saved, name, back_number) ⇒ Object



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# File 'lib/td/command/sched.rb', line 378

def job_show_option_argv(argv_saved, name, back_number)
  argv = ['job:show']
  argv += (argv_saved - [name]) if argv_saved.length > 0

  # there are three argvs parters for sched_result.
  # 1. without --last
  # 2. --last (without Num)
  # 3. --last Num
  # 'back_number' is value of Num which was parsed by OptionParser.
  # remove both "--last" and Num if they are.

  index_of_last = argv.index("--last")

  return argv unless index_of_last

  index_of_next_of_last = index_of_last + 1

  # the arg value following to "--last"
  next_of_last = argv[index_of_next_of_last]

  indexes_of_options_for_sched_result = [index_of_last]
  indexes_of_options_for_sched_result << index_of_next_of_last if next_of_last == back_number.to_s

  indexes_of_options_for_sched_result.each do |index|
    argv[index] = nil
  end

  argv.compact
end

#job_status(op) ⇒ Object



140
141
142
143
144
145
# File 'lib/td/command/job.rb', line 140

def job_status(op)
  job_id = op.cmd_parse
  client = get_client

  $stdout.puts client.job_status(job_id)
end

#password_change(op) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/td/command/password.rb', line 5

def password_change(op)
  op.cmd_parse

  old_password = nil
  password = nil

  begin
    system "stty -echo"  # TODO termios
    $stdout.print "Old password (typing will be hidden): "
    old_password = STDIN.gets || ""
    old_password = old_password[0..-2]  # strip \n
  rescue Interrupt
    $stderr.print "\ncanceled."
    exit 1
  ensure
    system "stty echo"   # TODO termios
    $stdout.print "\n"
  end

  if old_password.empty?
    $stderr.puts "canceled."
    exit 0
  end

  3.times do
    begin
      system "stty -echo"  # TODO termios
      $stdout.print "New password (typing will be hidden): "
      password = STDIN.gets || ""
      password = password[0..-2]  # strip \n
    rescue Interrupt
      $stderr.print "\ncanceled."
      exit 1
    ensure
      system "stty echo"   # TODO termios
      $stdout.print "\n"
    end

    if password.empty?
      $stderr.puts "canceled."
      exit 0
    end

    begin
      system "stty -echo"  # TODO termios
      $stdout.print "Retype new password: "
      password2 = STDIN.gets || ""
      password2 = password2[0..-2]  # strip \n
    rescue Interrupt
      $stderr.print "\ncanceled."
      exit 1
    ensure
      system "stty echo"   # TODO termios
      $stdout.print "\n"
    end

    if password == password2
      break
    end

    $stdout.puts "Doesn't match."
  end

  client = get_client(:ssl => true)

  client.change_my_password(old_password, password)

  $stderr.puts "Password changed."
end

#query(op) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/td/command/query.rb', line 7

def query(op)
  db_name = nil
  wait = false
  output = nil
  format = nil
  render_opts = {:header => false}
  result_url = nil
  result_user = nil
  result_ask_password = false
  priority = nil
  retry_limit = nil
  query = nil
  type = nil
  limit = nil
  exclude = false
  pool_name = nil
  domain_key = nil
  engine_version = nil

  op.on('-d', '--database DB_NAME', 'use the database (required)') {|s|
    db_name = s
  }
  op.on('-w', '--wait[=SECONDS]', 'wait for finishing the job (for seconds)', Integer) {|b|
    wait = b
  }
  op.on('-G', '--vertical', 'use vertical table to show results', TrueClass) {|b|
    render_opts[:vertical] = b
  }
  op.on('-o', '--output PATH', 'write result to the file') {|s|
    unless Dir.exist?(File.dirname(s))
      s = File.expand_path(s)
    end
    output = s
    format = 'tsv' if format.nil?
  }

  TreasureData::Command::Options.write_format_option(op) {|s| format = s }

  op.on('-r', '--result RESULT_URL', 'write result to the URL (see also result:create subcommand)',
                                     ' It is suggested for this option to be used with the -x / --exclude option to suppress printing',
                                     ' of the query result to stdout or -o / --output to dump the query result into a file.') {|s|
    result_url = s
  }
  op.on('-u', '--user NAME', 'set user name for the result URL') {|s|
    result_user = s
  }
  op.on('-p', '--password', 'ask password for the result URL') {|s|
    result_ask_password = true
  }
  op.on('-P', '--priority PRIORITY', 'set priority') {|s|
    priority = job_priority_id_of(s)
    unless priority
      raise "unknown priority #{s.inspect} should be -2 (very-low), -1 (low), 0 (normal), 1 (high) or 2 (very-high)"
    end
  }
  op.on('-R', '--retry COUNT', 'automatic retrying count', Integer) {|i|
    retry_limit = i
  }
  op.on('-q', '--query PATH', 'use file instead of inline query') {|s|
    query = File.open(s) { |f| f.read.strip }
  }
  op.on('-T', '--type TYPE', 'set query type (hive, presto)') {|s|
    type = s.to_sym
  }
  op.on('--sampling DENOMINATOR', 'OBSOLETE - enable random sampling to reduce records 1/DENOMINATOR', Integer) {|i|
    $stdout.puts "WARNING: the random sampling feature enabled through the '--sampling' option was removed and does no longer"
    $stdout.puts "         have any effect. It is left for backwards compatibility with older scripts using 'td'."
    $stdout.puts
  }
  op.on('-l', '--limit ROWS', 'limit the number of result rows shown when not outputting to file') {|s|
    unless s.to_i > 0
      raise "Invalid limit number. Must be a positive integer"
    end
    limit = s.to_i
  }
  op.on('-c', '--column-header', 'output of the columns\' header when the schema is available for the table (only applies to json, tsv and csv formats)', TrueClass) {|b|
    render_opts[:header] = b
  }
  op.on('-x', '--exclude', 'do not automatically retrieve the job result', TrueClass) {|b|
    exclude = b
  }
  op.on('-O', '--pool-name NAME', 'specify resource pool by name') {|s|
    pool_name = s
  }
  op.on('--domain-key DOMAIN_KEY', 'optional user-provided unique ID. You can include this ID with your `create` request to ensure idempotence') {|s|
    domain_key = s
  }
  op.on('--engine-version ENGINE_VERSION', 'EXPERIMENTAL: specify query engine version by name') {|s|
    engine_version = s
  }

  sql = op.cmd_parse

  # required parameters

  unless db_name
    raise ParameterConfigurationError,
          "-d / --database DB_NAME option is required."
  end

  if sql == '-'
    sql = STDIN.read
  elsif sql.nil?
    sql = query
  end
  unless sql
    raise ParameterConfigurationError,
          "<sql> argument or -q / --query PATH option is required."
  end

  # parameter concurrency validation

  if output.nil? && format
    unless ['tsv', 'csv', 'json'].include?(format)
      raise ParameterConfigurationError,
            "Supported formats are only tsv, csv and json without --output option"
    end
  end

  if render_opts[:header]
    unless ['tsv', 'csv', 'json'].include?(format)
      raise ParameterConfigurationError,
            "Option -c / --column-header is only supported with json, tsv and csv formats"
    end
  end

  if result_url
    require 'td/command/result'
    result_url = build_result_url(result_url, result_user, result_ask_password)
  end

  client = get_client

  # local existence check
  get_database(client, db_name)

  opts = {}
  opts['type'] = type if type
  opts['pool_name'] = pool_name if pool_name
  opts['domain_key'] = domain_key if domain_key
  opts['engine_version'] = engine_version if engine_version
  job = client.query(db_name, sql, result_url, priority, retry_limit, opts)

  $stdout.puts "Job #{job.job_id} is queued."
  $stdout.puts "Use '#{$prog} " + Config.cl_options_string + "job:show #{job.job_id}' to show the status."
  #puts "See #{job.url} to see the progress."

  if wait != false # `wait==nil` means `--wait` is specified
    wait_job(job, true, wait)
    $stdout.puts "Status      : #{job.status}"
    if job.success? && !exclude
      begin
        show_result_with_retry(job, output, limit, format, render_opts)
      rescue TreasureData::NotFoundError => e
      end
    end
  elsif output
    $stdout.puts "The output file won't be generated without additional `-w` or `--wait` option."
  end
end

#required(opt, value) ⇒ Object



12
13
14
15
16
# File 'lib/td/command/connector.rb', line 12

def required(opt, value)
  if value.nil?
    raise ParameterConfigurationError, "#{opt} option required"
  end
end

#result_create(op) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/td/command/result.rb', line 47

def result_create(op)
  result_user = nil
  result_ask_password = false

  op.on('-u', '--user NAME', 'set user name for authentication') {|s|
    result_user = s
  }
  op.on('-p', '--password', 'ask password for authentication') {|s|
    result_ask_password = true
  }

  name, url = op.cmd_parse
  API.validate_result_set_name(name)

  client = get_client

  url = build_result_url(url, result_user, result_ask_password)

  opts = {}
  begin
    client.create_result(name, url, opts)
  rescue AlreadyExistsError
    $stderr.puts "Result URL '#{name}' already exists."
    exit 1
  end

  $stderr.puts "Result URL '#{name}' is created."
end

#result_delete(op) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/td/command/result.rb', line 76

def result_delete(op)
  name = op.cmd_parse

  client = get_client

  begin
    client.delete_result(name)
  rescue NotFoundError
    $stderr.puts "Result URL '#{name}' does not exist."
    exit 1
  end

  $stderr.puts "Result URL '#{name}' is deleted."
end

#result_list(op) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/td/command/result.rb', line 22

def result_list(op)
  set_render_format_option(op)

  op.cmd_parse

  client = get_client

  rs = client.results

  rows = []
  rs.each {|r|
    rows << {:Name => r.name, :URL => r.url}
  }
  rows = rows.sort_by {|map|
    map[:Name]
  }

  $stdout.puts cmd_render_table(rows, :fields => [:Name, :URL], :render_format => op.render_format)

  if rs.empty?
    $stderr.puts "There are no result URLs."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "result:create <name> <url>' to create a result URL."
  end
end

#result_show(op) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/td/command/result.rb', line 5

def result_show(op)
  name = op.cmd_parse
  client = get_client

  rs = client.results
  r = rs.find {|r| name == r.name }

  unless r
    $stderr.puts "Result URL '#{name}' does not exist."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "result:create #{name} <URL>' to create the URL."
    exit 1
  end

  $stdout.puts "Name : #{r.name}"
  $stdout.puts "URL  : #{r.url}"
end

#sample_apache(op) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/td/command/sample.rb', line 4

def sample_apache(op)
  fname = op.cmd_parse

  require 'json'

  t = Time.now.to_i
  i = 0
  last_time = nil

  data = File.join(File.dirname(__FILE__), '../../../data/sample_apache.json')
  File.open(data) {|df|
    File.open(fname, 'w') {|of|
      df.each_line {|line|
        record = JSON.parse(line)
        record['time'] = last_time = (t - (i**1.3)).to_i
        of.puts record.to_json
        i += 1
      }
    }
  }

  $stderr.print "Created #{fname} with #{i} records whose time is "
  $stderr.puts "in the [#{Time.at(last_time)}, #{Time.at(t)}] range."

  $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "table:import <db> <table> --json #{fname}' to import this file."
end

#sched_create(op) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/td/command/sched.rb', line 29

def sched_create(op)
  require 'td/command/job'  # job_priority_id_of

  db_name = nil
  timezone = nil
  delay = 0
  result_url = nil
  result_user = nil
  result_ask_password = false
  priority = nil
  query = nil
  retry_limit = nil
  type = nil
  engine_version = nil

  op.on('-d', '--database DB_NAME', 'use the database (required)') {|s|
    db_name = s
  }
  op.on('-t', '--timezone TZ', "name of the timezone.",
                               "  Only extended timezones like 'Asia/Tokyo', 'America/Los_Angeles' are supported,",
                               "  (no 'PST', 'PDT', etc...).",
                               "  When a timezone is specified, the cron schedule is referred to that timezone.",
                               "  Otherwise, the cron schedule is referred to the UTC timezone.",
                               "  E.g. cron schedule '0 12 * * *' will execute daily at 5 AM without timezone option",
                               "  and at 12PM with the -t / --timezone 'America/Los_Angeles' timezone option") {|s|
    timezone = s
  }
  op.on('-D', '--delay SECONDS', 'delay time of the schedule', Integer) {|i|
    delay = i
  }
  op.on('-r', '--result RESULT_URL', 'write result to the URL (see also result:create subcommand)') {|s|
    result_url = s
  }
  op.on('-u', '--user NAME', 'set user name for the result URL') {|s|
    result_user = s
  }
  op.on('-p', '--password', 'ask password for the result URL') {|s|
    result_ask_password = true
  }
  op.on('-P', '--priority PRIORITY', 'set priority') {|s|
    priority = job_priority_id_of(s)
    unless priority
      raise "unknown priority #{s.inspect} should be -2 (very-low), -1 (low), 0 (normal), 1 (high) or 2 (very-high)"
    end
  }
  op.on('-q', '--query PATH', 'use file instead of inline query') {|s|
    query = File.open(s) { |f| f.read.strip }
  }
  op.on('-R', '--retry COUNT', 'automatic retrying count', Integer) {|i|
    retry_limit = i
  }
  op.on('-T', '--type TYPE', 'set query type (hive)') {|s|
    type = s
  }
  op.on('--engine-version ENGINE_VERSION', 'EXPERIMENTAL: specify query engine version by name') {|s|
    engine_version = s
  }

  name, cron, sql = op.cmd_parse

  unless db_name
    $stderr.puts "-d, --database DB_NAME option is required."
    exit 1
  end

  if sql == '-'
    sql = STDIN.read
  elsif sql.nil?
    sql = query
  end

  unless sql
    $stderr.puts "<sql> argument or -q,--query PATH option is required."
    exit 1
  end

  if result_url
    require 'td/command/result'
    result_url = build_result_url(result_url, result_user, result_ask_password)
  end

  client = get_client

  # local existence check
  get_database(client, db_name)

  begin
    first_time = client.create_schedule(name, :cron=>cron, :query=>sql, :database=>db_name, :result=>result_url, :timezone=>timezone, :delay=>delay, :priority=>priority, :retry_limit=>retry_limit, :type=>type, :engine_version=>engine_version)
  rescue AlreadyExistsError
    cmd_debug_error $!
    $stderr.puts "Schedule '#{name}' already exists."
    exit 1
  end

  if first_time
    $stderr.puts "Schedule '#{name}' is created. It starts at #{first_time.localtime}."
  else
    $stderr.puts "Schedule '#{name}' is created, which never runs."
  end
end

#sched_delete(op) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/td/command/sched.rb', line 130

def sched_delete(op)
  name = op.cmd_parse

  client = get_client

  begin
    client.delete_schedule(name)
  rescue NotFoundError
    cmd_debug_error $!
    $stderr.puts "Schedule '#{name}' does not exist."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "sched:list' to show list of the schedules."
    exit 1
  end

  $stderr.puts "Schedule '#{name}' is deleted."
end

#sched_history(op) ⇒ Object



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/td/command/sched.rb', line 243

def sched_history(op)
  require 'td/command/job'  # job_priority_name_of

  page = 0
  skip = 0

  op.on('-p', '--page PAGE', 'skip N pages', Integer) {|i|
    page = i
  }
  op.on('-s', '--skip N', 'skip N schedules', Integer) {|i|
    skip = i
  }
  set_render_format_option(op)

  name, max = op.cmd_parse

  max = (max || 20).to_i

  if page
    skip += max * page
  end

  client = get_client

  begin
    history = client.history(name, skip, skip+max)
  rescue NotFoundError
    cmd_debug_error $!
    $stderr.puts "Schedule '#{name}' does not exist."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "sched:list' to show list of the schedules."
    exit 1
  end

  scheds = client.schedules
  if s = scheds.find {|s| s.name == name }
    $stdout.puts "Name        : #{s.name}"
    $stdout.puts "Cron        : #{s.cron}"
    $stdout.puts "Timezone    : #{s.timezone}"
    $stdout.puts "Delay       : #{s.delay} sec"
    $stdout.puts "Next        : #{s.next_time}"
    $stdout.puts "Result      : #{s.result_url}"
    $stdout.puts "Priority    : #{job_priority_name_of(s.priority)}"
    $stdout.puts "Retry limit : #{s.retry_limit}"
    $stdout.puts "Database    : #{s.database}"
    $stdout.puts "Query       : #{s.query}"
  end

  rows = []
  history.each {|j|
    scheduled_at = j.scheduled_at ? j.scheduled_at.localtime : nil
    rows << {:Time => scheduled_at, :JobID => j.job_id, :Status => j.status, :Priority => job_priority_name_of(j.priority), :Result=>j.result_url}
  }

  $stdout.puts cmd_render_table(rows, :fields => [:JobID, :Time, :Status, :Priority, :Result], :render_format => op.render_format)
end

#sched_list(op) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/td/command/sched.rb', line 7

def sched_list(op)
  require 'td/command/job'  # job_priority_name_of

  set_render_format_option(op)

  op.cmd_parse

  client = get_client

  scheds = client.schedules

  rows = []
  scheds.each {|sched|
    rows << {:Name => sched.name, :Cron => sched.cron, :Timezone => sched.timezone, :Delay => sched.delay, :Priority => job_priority_name_of(sched.priority), :Result => sched.result_url, :Database => sched.database, :Query => sched.query, :"Next schedule" => sched.next_time ? sched.next_time.localtime : nil}
  }
  rows = rows.sort_by {|map|
    map[:Name]
  }

  $stdout.puts cmd_render_table(rows, :fields => [:Name, :Cron, :Timezone, :"Next schedule", :Delay, :Priority, :Result, :Database, :Query], :max_width=>500, :render_format => op.render_format)
end

#sched_result(op) ⇒ Object



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/td/command/sched.rb', line 342

def sched_result(op)
  options = job_show_options(op)
  back_number = 1
  op.on('--last [Number]', Integer, "show the result before N from the last. default: 1") do |n|
    back_number = n ? n : 1
  end

  # save argv before calling cmd_parse, which removes flags from the argv array
  argv_saved = op.argv.dup
  name = op.cmd_parse

  verbose     = options[:verbose]
  wait        = options[:wait]
  output      = options[:output]
  format      = options[:format]
  render_opts = options[:render_opts]
  limit       = options[:limit]
  exclude     = options[:exclude]

  client = get_client
  history = get_history(client, name, (back_number - 1), back_number)

  job = history.first

  if job.nil?
    $stderr.puts "No jobs available for this query. Refer to 'sched:history'."
    exit 1
  end

  # build the job:show command now
  argv = job_show_option_argv(argv_saved, name, back_number)
  argv << job.job_id

  Runner.new.run(argv)
end

#sched_run(op) ⇒ Object



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/td/command/sched.rb', line 299

def sched_run(op)
  num = 1

  op.on('-n', '--num N', 'number of jobs to run', Integer) {|i|
    num = i
  }
  set_render_format_option(op)

  name, time = op.cmd_parse

  if time.to_i.to_s == time.to_s
    # UNIX time
    t = Time.at(time.to_i)
  else
    require 'time'
    begin
      t = Time.parse(time)
    rescue
      $stderr.puts "invalid time format: #{time}"
      exit 1
    end
  end

  client = get_client

  begin
    jobs = client.run_schedule(name, t.to_i, num)
  rescue NotFoundError
    cmd_debug_error $!
    $stderr.puts "Schedule '#{name}' does not exist."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "sched:list' to show list of the schedules."
    exit 1
  end

  rows = []
  jobs.each_with_index {|job,i|
    rows << {:JobID => job.job_id, :Time => job.scheduled_at ? job.scheduled_at.localtime : nil}
  }

  $stderr.puts "Scheduled #{num} jobs from #{t}."
  $stdout.puts cmd_render_table(rows, :fields => [:JobID, :Time], :max_width=>500, :render_format => op.render_format)
end

#sched_update(op) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/td/command/sched.rb', line 147

def sched_update(op)
  require 'td/command/job'  # job_priority_id_of

  newname = nil
  cron = nil
  sql = nil
  db_name = nil
  result = nil
  timezone = nil
  delay = nil
  priority = nil
  retry_limit = nil
  type = nil
  engine_version = nil

  op.on('-n', '--newname NAME', 'change the schedule\'s name') {|n|
    newname = n
  }
  op.on('-s', '--schedule CRON', 'change the schedule') {|s|
    cron = s
  }
  op.on('-q', '--query SQL', 'change the query') {|s|
    sql = s
  }
  op.on('-d', '--database DB_NAME', 'change the database') {|s|
    db_name = s
  }
  op.on('-r', '--result RESULT_URL', 'change the result target (see also result:create subcommand)') {|s|
    result = s
  }
  op.on('-t', '--timezone TZ', "name of the timezone.",
                               "  Only extended timezones like 'Asia/Tokyo', 'America/Los_Angeles' are supported,",
                               "  (no 'PST', 'PDT', etc...).",
                               "  When a timezone is specified, the cron schedule is referred to that timezone.",
                               "  Otherwise, the cron schedule is referred to the UTC timezone.",
                               "  E.g. cron schedule '0 12 * * *' will execute daily at 5 AM without timezone option",
                               "  and at 12PM with the -t / --timezone 'America/Los_Angeles' timezone option") {|s|
    timezone = s
  }
  op.on('-D', '--delay SECONDS', 'change the delay time of the schedule', Integer) {|i|
    delay = i
  }
  op.on('-P', '--priority PRIORITY', 'set priority') {|s|
    priority = job_priority_id_of(s)
    unless priority
      raise "unknown priority #{s.inspect} should be -2 (very-low), -1 (low), 0 (normal), 1 (high) or 2 (very-high)"
    end
  }
  op.on('-R', '--retry COUNT', 'automatic retrying count', Integer) {|i|
    retry_limit = i
  }
  op.on('-T', '--type TYPE', 'set query type (hive)') {|s|
    type = s
  }
  op.on('--engine-version ENGINE_VERSION', 'EXPERIMENTAL: specify query engine version by name') {|s|
    engine_version = s
  }

  curname = op.cmd_parse

  params = {}
  params['name'] = newname if newname
  params['cron'] = cron if cron
  params['query'] = sql if sql
  params['database'] = db_name if db_name
  params['result'] = result if result
  params['timezone'] = timezone if timezone
  params['delay'] = delay.to_s if delay
  params['priority'] = priority.to_s if priority
  params['retry_limit'] = retry_limit.to_s if retry_limit
  params['type'] = type.to_s if type
  params['engine_version'] = engine_version if engine_version

  if params.empty?
    $stderr.puts op.to_s
    exit 1
  end

  client = get_client

  begin
    client.update_schedule(curname, params)
  rescue NotFoundError
    cmd_debug_error $!
    $stderr.puts "Schedule '#{curname}' does not exist."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "sched:list' to show list of the schedules."
    exit 1
  end

  if newname && curname != newname
    $stdout.puts "Schedule '#{curname}' is updated and its name changed to '#{newname}'."
  else
    $stdout.puts "Schedule '#{curname}' is updated."
  end
end

#schema_add(op) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/td/command/schema.rb', line 32

def schema_add(op)
  db_name, table_name, *columns = op.cmd_parse

  client = get_client
  table = get_table(client, db_name, table_name)

  schema = table.schema.merge(TreasureData::Schema.parse(columns))
  client.update_schema(db_name, table_name, schema)

  $stderr.puts "Schema is updated on #{db_name}.#{table_name} table."
end

#schema_remove(op) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/td/command/schema.rb', line 44

def schema_remove(op)
  db_name, table_name, *columns = op.cmd_parse

  client = get_client
  table = get_table(client, db_name, table_name)

  schema = table.schema

  columns.each {|col|
    unless schema.fields.reject!{|f| f.name == col }
      $stderr.puts "Column name '#{col}' does not exist."
      exit 1
    end
  }

  client.update_schema(db_name, table_name, schema)

  $stderr.puts "Schema is updated on #{db_name}.#{table_name} table."
end

#schema_set(op) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/td/command/schema.rb', line 20

def schema_set(op)
  db_name, table_name, *columns = op.cmd_parse

  client = get_client
  table = get_table(client, db_name, table_name)

  schema = TreasureData::Schema.parse(columns)
  client.update_schema(db_name, table_name, schema)

  $stderr.puts "Schema is updated on #{db_name}.#{table_name} table."
end

#schema_show(op) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/td/command/schema.rb', line 3

def schema_show(op)
  db_name, table_name = op.cmd_parse

  client = get_client
  table = get_table(client, db_name, table_name)

  $stdout.puts "#{db_name}.#{table_name} ("
  table.schema.fields.each {|f|
    if f.sql_alias
      $stdout.puts "  #{f.name}:#{f.type}@#{f.sql_alias}"
    else
      $stdout.puts "  #{f.name}:#{f.type}"
    end
  }
  $stdout.puts ")"
end

#server_endpoint(op) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/td/command/server.rb', line 12

def server_endpoint(op)
  endpoint = op.cmd_parse

  if Config.cl_endpoint and endpoint != Config.endpoint
    raise ParameterConfigurationError,
          "You specified the API server endpoint in the command options as well (-e / --endpoint " +
          "option) but it does not match the value provided to the 'server:endpoint' command. " +
          "Please remove the option or ensure the endpoints URLs match each other."
  end

  Command.validate_api_endpoint(endpoint)
  Command.test_api_endpoint(endpoint)

  conf = nil
  begin
    conf = Config.read
  rescue ConfigError
    conf = Config.new
  end
  conf["account.endpoint"] = endpoint
  conf.save
end

#server_status(op) ⇒ Object



6
7
8
9
10
# File 'lib/td/command/server.rb', line 6

def server_status(op)
  op.cmd_parse

  $stdout.puts Client.server_status
end

#status(op) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/td/command/status.rb', line 5

def status(op)
  op.cmd_parse

  client = get_client

  # +----------------+
  # |     scheds     |
  # +----------------+
  # +----------------+
  # |      jobs      |
  # +----------------+
  # +------+ +-------+
  # |tables| |results|
  # +------+ +-------+

  scheds = []
  jobs = []
  tables = []
  results = []

  s = client.schedules
  s.each {|sched|
    scheds << {:Name => sched.name, :Cron => sched.cron, :Result => sched.result_url, :Query => sched.query}
  }
  scheds = scheds.sort_by {|map|
    map[:Name]
  }
  x1, y1 = status_render(0, 0, "[Schedules]", scheds, :fields => [:Name, :Cron, :Result, :Query])

  j = client.jobs(0, 4)
  j.each {|job|
    start = job.start_at
    elapsed = Command.humanize_elapsed_time(start, job.end_at)
    jobs << {:JobID => job.job_id, :Status => job.status, :Query => job.query.to_s, :Start => (start ? start.localtime : ''), :Elapsed => elapsed, :Result => job.result_url}
  }
  x2, y2 = status_render(0, 0, "[Jobs]", jobs, :fields => [:JobID, :Status, :Start, :Elapsed, :Result, :Query])

  dbs = client.databases
  dbs.map {|db|
    db.tables.each {|table|
      tables << {:Database => db.name, :Table => table.name, :Count => table.count.to_s, :Size => table.estimated_storage_size_string}
    }
  }
  x3, y3 = status_render(0, 0, "[Tables]", tables, :fields => [:Database, :Table, :Count, :Size])

  rs = client.results
  rs.each {|r|
    results << {:Name => r.name, :URL => r.url}
  }
  results = results.sort_by {|map|
    map[:Name]
  }
  x4, y4 = status_render(x3+2, y3, "[Results]", results, :fields => [:Name, :URL])

  (y3-y4-1).times do
    $stdout.print "\eD"
  end
  $stdout.print "\eE"
end

#table_create(op) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/td/command/table.rb', line 18

def table_create(op)
  params = {}
  type = nil

  op.on('-T', '--type TYPE', 'set table type (log)') {|s|
    unless s == 'log'
      raise "Unknown table type #{s.dump}. Supported types: log"
    end
    type = s.to_sym
  }
  op.on('--expire-days DAYS', Integer, 'set table expire days') do |v|
    if v < 0
      $stderr.puts "Table expiration days must be greater or equal to 0."
      return
    end
    params[:expire_days] = v
  end
  op.on('--include-v BOOLEAN', TrueClass, 'set include_v flag') {|v| params[:include_v] = v}
  op.on('--detect-schema BOOLEAN', TrueClass, 'set detect schema flag') {|v| params[:detect_schema] = v}
  db_name, table_name = op.cmd_parse

  API.validate_table_name(table_name)

  if HIVE_RESERVED_KEYWORDS.include?(table_name.upcase)
    $stderr.puts "* WARNING *"
    $stderr.puts "  '#{table_name}' is a reserved keyword in Hive. We recommend renaming the table."
    $stderr.puts "  For a list of all reserved keywords, see our FAQ: https://docs.treasuredata.com/display/public/PD/Hive+Reserved+and+Non-Reserved+Keywords"
  end

  client = get_client

  begin
    client.create_log_table(db_name, table_name, params)
  rescue NotFoundError
    cmd_debug_error $!
    $stderr.puts "Database '#{db_name}' does not exist."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "db:create #{db_name}' to create the database."
    exit 1
  rescue AlreadyExistsError
    cmd_debug_error $!
    $stderr.puts "Table '#{db_name}.#{table_name}' already exists."
    exit 1
  end

  $stderr.puts "Table '#{db_name}.#{table_name}' is created."
end

#table_delete(op) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/td/command/table.rb', line 65

def table_delete(op)
  force = false
  op.on('-f', '--force', 'never prompt', TrueClass) {|b|
    force = true
  }

  db_name, table_name = op.cmd_parse

  client = get_client

  begin
    unless force
      table = get_table(client, db_name, table_name)
      $stderr.print "Do you really want to delete '#{table_name}' in '#{db_name}'? [y/N]: "
      ok = nil
      while line = $stdin.gets
        line.strip!
        if line =~ /^y(?:es)?$/i
          ok = true
          break
        elsif line.empty? || line =~ /^n(?:o)?$/i
          break
        else
          $stderr.print "Type 'Y' or 'N': "
        end
      end
      unless ok
        $stderr.puts "canceled."
        exit 1
      end
    end
    client.delete_table(db_name, table_name)
  rescue NotFoundError
    cmd_debug_error $!
    $stderr.puts "Table '#{db_name}.#{table_name}' does not exist."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "table:list #{db_name}' to show list of the tables."
    exit 1
  end

  $stderr.puts "Table '#{db_name}.#{table_name}' is deleted."
end

#table_expire(op) ⇒ Object



377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/td/command/table.rb', line 377

def table_expire(op)
  db_name, table_name, expire_days = op.cmd_parse

  expire_days = expire_days.to_i
  if expire_days < 0
    $stderr.puts "Table expiration days must be greater or equal to 0."
    return
  end

  client = get_client
  client.update_expire(db_name, table_name, expire_days)

  if expire_days == 0
    $stdout.puts "Data expiration disabled for this table."
  else
    $stdout.puts "Table set to expire data older than #{expire_days} days."
  end
end

#table_export(op) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/td/command/export.rb', line 57

def table_export(op)
  from = nil
  to = nil
  s3_bucket = nil
  wait = false
  aws_access_key_id = nil
  aws_secret_access_key = nil
  file_prefix = nil
  file_format = "json.gz" # default
  pool_name = nil
  encryption = nil
  assume_role = nil

  # Increase the summary_width value from default 32 into 36, to show the options help message properly.
  op.summary_width = 36

  op.on('-w', '--wait', 'wait until the job is completed', TrueClass) {|b|
    wait = b
  }
  op.on('-f', '--from TIME', 'export data which is newer than or same with the TIME') {|s|
    from = export_parse_time(s)
  }
  op.on('-t', '--to TIME', 'export data which is older than the TIME') {|s|
    to = export_parse_time(s)
  }
  op.on('-b', '--s3-bucket NAME', 'name of the destination S3 bucket (required)') {|s|
    s3_bucket = s
  }
  op.on('-p', '--prefix PATH', 'path prefix of the file on S3') {|s|
    file_prefix = s
  }
  op.on('-k', '--aws-key-id KEY_ID', 'AWS access key id to export data (required)') {|s|
    aws_access_key_id = s
  }
  op.on('-s', '--aws-secret-key SECRET_KEY', 'AWS secret access key to export data (required)') {|s|
    aws_secret_access_key = s
  }
  op.on('-F', '--file-format FILE_FORMAT',
        'file format for exported data.',
        'Available formats are tsv.gz (tab-separated values per line) and jsonl.gz (JSON record per line).',
        'The json.gz and line-json.gz formats are default and still available but only for backward compatibility purpose;',
        '  use is discouraged because they have far lower performance.') { |s|
    raise ArgumentError, "#{s} is not a supported file format" unless SUPPORTED_FORMATS.include?(s)
    file_format = s
  }
  op.on('-O', '--pool-name NAME', 'specify resource pool by name') {|s|
    pool_name = s
  }
  op.on('-e', '--encryption ENCRYPT_METHOD', 'export with server side encryption with the ENCRYPT_METHOD') {|s|
    raise ArgumentError, "#{s} is not a supported encryption method" unless SUPPORTED_ENCRYPT_METHOD.include?(s)
    encryption = s
  }
  op.on('-a', '--assume-role ASSUME_ROLE_ARN', 'export with assume role with ASSUME_ROLE_ARN as role arn') {|s|
    assume_role = s
  }

  db_name, table_name = op.cmd_parse

  unless s3_bucket
    $stderr.puts "-b, --s3-bucket NAME option is required."
    exit 1
  end

  unless aws_access_key_id
    $stderr.puts "-k, --aws-key-id KEY_ID option is required."
    exit 1
  end

  unless aws_secret_access_key
    $stderr.puts "-s, --aws-secret-key SECRET_KEY option is required."
    exit 1
  end

  client = get_client

  get_table(client, db_name, table_name)

  client = get_ssl_client

  s3_opts = {}
  s3_opts['from'] = from.to_s if from
  s3_opts['to'] = to.to_s if to
  s3_opts['file_prefix'] = file_prefix if file_prefix
  s3_opts['file_format'] = file_format
  s3_opts['bucket'] = s3_bucket
  s3_opts['access_key_id'] = aws_access_key_id
  s3_opts['secret_access_key'] = aws_secret_access_key
  s3_opts['pool_name'] = pool_name if pool_name
  s3_opts['encryption'] = encryption if encryption
  s3_opts['assume_role'] = assume_role if assume_role

  job = client.export(db_name, table_name, "s3", s3_opts)

  $stderr.puts "Export job #{job.job_id} is queued."
  $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "job:show #{job.job_id}' to show the status."

  if wait
    wait_job(job)
    $stdout.puts "Status     : #{job.status}"
  end
end

#table_import(op) ⇒ Object

TODO import-item TODO tail



411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# File 'lib/td/command/table.rb', line 411

def table_import(op)
  op.banner << "\nsupported formats:\n"
  op.banner << "  apache\n"
  op.banner << "  syslog\n"
  op.banner << "  msgpack\n"
  op.banner << "  json\n"

  format = 'apache'
  time_key = 'time'
  auto_create = false

  import_params = {time_key: time_key}

  op.on('--format FORMAT', "file format (default: #{format})") {|s|
    import_params[:format] = s
  }

  op.on('--apache', "same as --format apache; apache common log format") {
    import_params[:format] = 'apache'
  }

  op.on('--syslog', "same as --format syslog; syslog") {
    import_params[:format] = 'syslog'
  }

  op.on('--msgpack', "same as --format msgpack; msgpack stream format") {
    import_params[:format] = 'msgpack'
  }

  op.on('--json', "same as --format json; LF-separated json format") {
    import_params[:format] = 'json'
  }

  op.on('-t', '--time-key COL_NAME', "time key name for json and msgpack format (e.g. 'created_at')") {|s|
    import_params[:time_key] = s
  }

  op.on('--auto-create-table', "Create table and database if doesn't exist", TrueClass) { |b|
    auto_create = b
  }

  db_name, table_name, *paths = op.cmd_parse
  import_params[:db_name] = db_name
  import_params[:table_name] = table_name
  import_params[:paths] = paths

  api_client = get_client
  import_client = get_import_client

  if auto_create
    create_database_and_table_if_not_exist(api_client, db_name, table_name)
  end

  do_table_import(api_client, import_client, import_params)
end

#table_list(op) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/td/command/table.rb', line 107

def table_list(op)
  require 'parallel'

  format = 'table'
  num_threads = 4
  show_size_in_bytes = false

  op.on('-n', '--num_threads VAL', 'number of threads to get list in parallel') { |i|
    num_threads = Integer(i)
  }
  op.on('--show-bytes', 'show estimated table size in bytes') {
    show_size_in_bytes = true
  }
  set_render_format_option(op)

  db_name = op.cmd_parse

  client = get_client

  if db_name
    database = get_database(client, db_name)
    databases = [database]
  else
    databases = client.databases
  end

  # ref. https://github.com/treasure-data/td/issues/26
  should_number_format = [nil, "table"].include?(op.render_format)
  rows = []
  ::Parallel.each(databases, :in_threads => num_threads) {|db|
    begin
      if db.permission == :import_only
        next
      end
      db.tables.each {}
      db.tables.each {|table|
        pschema = table.schema.fields.map {|f|
          "#{f.name}:#{f.type}"
        }.join(', ')
        new_row = {
          :Database => db.name, :Table => table.name, :Type => table.type.to_s, :Count => (should_number_format ? TreasureData::Helpers.format_with_delimiter(table.count) : table.count),
          :Size => show_size_in_bytes ? TreasureData::Helpers.format_with_delimiter(table.estimated_storage_size) : table.estimated_storage_size_string,
          'Last import' => table.last_import ? table.last_import.localtime : nil,
          'Last log timestamp' => table.last_log_timestamp ? table.last_log_timestamp.localtime : nil,
          :Schema => pschema
        }

        rows << new_row
      }
    rescue APIError => e
      # ignores permission error because db:list shows all databases
      # even if the user can't access to tables in the database
      unless e.to_s =~ /not authorized/
        raise e
      end
    end
  }
  rows = rows.sort_by {|map|
    [map[:Database], map[:Type].size, map[:Table]]
  }

  fields = [:Database, :Table, :Type, :Count, :Size, 'Last import', 'Last log timestamp', :Schema]
  $stdout.puts cmd_render_table(rows, :fields => fields, :max_width => 500, :render_format => op.render_format)

  if rows.empty?
    if db_name
      if databases.first.permission == :import_only
        $stderr.puts "Database '#{db_name}' is import only, cannot list."
      else
        $stderr.puts "Database '#{db_name}' has no tables."
        $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "table:create <db> <table>' to create a table."
      end
    elsif databases.empty?
      $stderr.puts "There are no databases."
      $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "db:create <db>' to create a database."
    else
      $stderr.puts "There are no tables."
      $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "table:create <db> <table>' to create a table."
    end
  end
end

#table_partial_delete(op) ⇒ Object



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/td/command/table.rb', line 292

def table_partial_delete(op)
  from = nil
  to = nil
  wait = false
  pool_name = nil

  op.on('-t', '--to TIME', 'end time of logs to delete in Unix time >0 and multiple of 3600 (1 hour)') {|s|
    if s.to_i.to_s == s
      # UNIX time
      to = s.to_i
    else
      require 'time'
      to = Time.parse(s).to_i
    end
  }
  op.on('-f', '--from TIME', 'start time of logs to delete in Unix time >0 and multiple of 3600 (1 hour)') {|s|
    if s.to_i.to_s == s
      from = s.to_i
    else
      require 'time'
      from = Time.parse(s).to_i
    end
  }
  op.on('-w', '--wait', 'wait for the job to finish', TrueClass) {|b|
    wait = b
  }
  op.on('-O', '--pool-name NAME', 'specify resource pool by name') {|s|
    pool_name = s
  }

  db_name, table_name = op.cmd_parse

  unless from
    $stderr.puts "-f, --from TIME option is required"
    exit 1
  end

  unless to
    $stderr.puts "-t, --to TIME option is required"
    exit 1
  end

  if from % 3600 != 0 || to % 3600 != 0
    $stderr.puts "Time for the -f / --from and -t / --to options must either be a multiple of 3600 (1 hour)\n" +
                 "  or be expressed in Ruby time string format where the minutes and seconds are 0"
    exit 1
  end

  client = get_client

  table = get_table(client, db_name, table_name)

  opts = {}
  opts['pool_name'] = pool_name if pool_name
  job = client.partial_delete(db_name, table_name, to, from, opts)

  $stderr.puts "Partial delete job #{job.job_id} is queued."
  $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "job:show #{job.job_id}' to show the status."

  if wait
    $stderr.puts "queued..."
    wait_job(job)
    $stdout.puts "Status     : #{job.status}"
  end
end

#table_rename(op) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/td/command/table.rb', line 202

def table_rename(op)
  overwrite = false
  op.on('--overwrite', 'replace existing dest table') { overwrite = true }
  database_name, from_table_name, dest_table_name = op.cmd_parse

  client = get_client
  database = get_database(client, database_name)

  unless table_exist?(database, from_table_name)
    raise ParameterConfigurationError, "From table `#{from_table_name}` isn't exist."
  end

  if table_exist?(database, dest_table_name)
    unless overwrite
      raise ParameterConfigurationError, "Dest table `#{dest_table_name}` is exist. If you want to overwrite dest table, you should set `overwrite` option."
    end
  else
    client.create_log_table(database_name, dest_table_name)
  end

  client.swap_table(database_name, from_table_name, dest_table_name)
  client.delete_table(database_name, from_table_name)

  $stderr.puts "'renamed from '#{database_name}.#{from_table_name}' to '#{database_name}.#{dest_table_name}'."
rescue ParameterConfigurationError => e
  $stderr.puts e.message
  exit 1
end

#table_show(op) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/td/command/table.rb', line 231

def table_show(op)
  verbose = nil
  op.on('-v', 'show more attributes', TrueClass) {|b|
    verbose = b
  }
  db_name, table_name = op.cmd_parse

  client = get_client

  table = get_table(client, db_name, table_name)

  $stdout.puts "Name        : #{table.db_name}.#{table.name}"
  $stdout.puts "Type        : #{table.type}"
  $stdout.puts "Count       : #{table.count}"
  if verbose
    $stdout.puts "Expire Days : #{table.expire_days}"
    $stdout.puts "Include v   : #{table.include_v}"
  end
  $stdout.puts "Schema      : ("
  table.schema.fields.each {|f|
    $stdout.puts "    #{f.name}:#{f.type}"
  }
  $stdout.puts ")"
end

#table_swap(op) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/td/command/table.rb', line 189

def table_swap(op)
  db_name, table_name1, table_name2 = op.cmd_parse

  client = get_client

  table1 = get_table(client, db_name, table_name1)
  table2 = get_table(client, db_name, table_name2)

  client.swap_table(db_name, table_name1, table_name2)

  $stderr.puts "'#{db_name}.#{table_name1}' and '#{db_name}.#{table_name2}' are swapped."
end

#table_tail(op) ⇒ Object



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/td/command/table.rb', line 256

def table_tail(op)
  count = 10
  pretty = nil

  op.on('-n', '--count N', 'number of logs to get', Integer) {|i|
    count = i
  }
  op.on('-P', '--pretty', 'pretty print', TrueClass) {|b|
    pretty = b
  }

  db_name, table_name = op.cmd_parse

  client = get_client

  table = get_table(client, db_name, table_name)

  rows = table.tail(count)

  require 'json'
  if pretty
    opts = {
      :indent => ' '*2,
      :object_nl => "\n",
      :space => ' '
    }
    rows.each {|row|
      $stdout.puts row.to_json(opts)
    }
  else
    rows.each {|row|
      $stdout.puts row.to_json
    }
  end
end

#table_update(op) ⇒ Object



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/td/command/table.rb', line 358

def table_update(op)
  params = {}
  op.on('--expire-days DAYS', Integer, 'set table expire days') do |v|
    if v < 0
      $stderr.puts "Table expiration days must be greater or equal to 0."
      return
    end
    params[:expire_days] = v
  end
  op.on('--include-v BOOLEAN', TrueClass, 'set include_v flag') {|v| params[:include_v] = v}
  op.on('--detect-schema BOOLEAN', TrueClass, 'set detect schema flag') {|v| params[:detect_schema] = v}
  db_name, table_name = op.cmd_parse

  client = get_client
  res = client.update_table(db_name, table_name, params)

  $stdout.puts res.inspect
end

#update(op) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/td/command/update.rb', line 6

def update(op)
  # for gem installation, this command is disallowed -
  #   it only works for the toolbelt.
  if Updater.disable?
    $stderr.puts Updater.disable_message
    exit
  end

  start_time = Time.now
  $stdout.puts "Updating 'td' from #{TOOLBELT_VERSION}..."
  if new_version = Updater.update
    $stdout.puts "Successfully updated to #{new_version} in #{Command.humanize_time((Time.now - start_time).to_i)}."
  else
    $stdout.puts "Nothing to update."
  end
end

#user_apikey_add(op) ⇒ Object

TODO user:email:change <name> <email> def user_email_change(op) end



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/td/command/user.rb', line 140

def user_apikey_add(op)
  name = op.cmd_parse

  client = get_client

  begin
    client.add_apikey(name)
  rescue TreasureData::NotFoundError
    $stderr.puts "User '#{name}' does not exist."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "users' to show users."
    exit 1
  end

  $stderr.puts "Added an API key to user '#{name}'."
  $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "user:apikeys #{name}' to show the API key"
end

#user_apikey_list(op) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/td/command/user.rb', line 174

def user_apikey_list(op)
  set_render_format_option(op)

  name = op.cmd_parse

  client = get_client

  keys = client.list_apikeys(name)

  rows = []
  keys.each {|key|
    rows << {:Key => key}
  }

  $stdout.puts cmd_render_table(rows, :fields => [:Key], :render_format => op.render_format)
end

#user_apikey_remove(op) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/td/command/user.rb', line 157

def user_apikey_remove(op)
  name, key = op.cmd_parse

  client = get_client

  begin
    client.remove_apikey(name, key)
  rescue TreasureData::NotFoundError
    $stderr.puts "User '#{name}' or API key '#{key}' does not exist."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "users' to show users."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "user:apikeys '#{key}' to show API keys"
    exit 1
  end

  $stderr.puts "Removed an an API key from user '#{name}'."
end

#user_create(op) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/td/command/user.rb', line 44

def user_create(op)
  email = nil
  random_password = false

  op.on('-e', '--email EMAIL', "Use this email address to identify the user") {|s|
    email = s
  }

  op.on('-R', '--random-password', "Generate random password", TrueClass) {|b|
    random_password = b
  }

  name = op.cmd_parse

  unless email
    $stderr.puts "-e, --email EMAIL option is required."
    exit 1
  end

  if random_password
    lower = ('a'..'z').to_a
    upper = ('A'..'Z').to_a
    digit = ('0'..'9').to_a
    symbol = %w[_ @ - + ;]

    r = []
    3.times { r << lower.sort_by{rand}.first }
    3.times { r << upper.sort_by{rand}.first }
    2.times { r << digit.sort_by{rand}.first }
    1.times { r << symbol.sort_by{rand}.first }
    password = r.sort_by{rand}.join

    $stdout.puts "Password: #{password}"

  else
    3.times do
      begin
        system "stty -echo"  # TODO termios
        $stdout.print "Password (typing will be hidden): "
        password = STDIN.gets || ""
        password = password[0..-2]  # strip \n
      rescue Interrupt
        $stderr.print "\ncanceled."
        exit 1
      ensure
        system "stty echo"   # TODO termios
        $stdout.print "\n"
      end

      if password.empty?
        $stderr.puts "canceled."
        exit 0
      end

      begin
        system "stty -echo"  # TODO termios
        $stdout.print "Retype password: "
        password2 = STDIN.gets || ""
        password2 = password2[0..-2]  # strip \n
      rescue Interrupt
        $stderr.print "\ncanceled."
        exit 1
      ensure
        system "stty echo"   # TODO termios
        $stdout.print "\n"
      end

      if password == password2
        break
      end

      $stdout.puts "Doesn't match."
    end
  end

  client = get_client(:ssl => true)
  client.add_user(name, nil, email, password)

  $stderr.puts "User '#{name}' is created."
  $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "user:apikeys #{name}' to show the API key."
end

#user_delete(op) ⇒ Object



126
127
128
129
130
131
132
133
134
# File 'lib/td/command/user.rb', line 126

def user_delete(op)
  name = op.cmd_parse

  client = get_client

  client.remove_user(name)

  $stderr.puts "User '#{name}' is deleted."
end

#user_list(op) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/td/command/user.rb', line 22

def user_list(op)
  set_render_format_option(op)

  op.cmd_parse

  client = get_client

  users = client.users

  rows = []
  users.each {|user|
    rows << {:Name => user.name, :Email => user.email}
  }

  $stdout.puts cmd_render_table(rows, :fields => [:Name, :Email], :render_format => op.render_format)

  if rows.empty?
    $stderr.puts "There are no users."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "user:create <name>' to create an users."
  end
end

#user_password_change(op) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/td/command/user.rb', line 191

def user_password_change(op)
  name = op.cmd_parse

  password = nil

  3.times do
    begin
      system "stty -echo"  # TODO termios
      $stdout.print "New password (typing will be hidden): "
      password = STDIN.gets || ""
      password = password[0..-2]  # strip \n
    rescue Interrupt
      $stderr.print "\ncanceled."
      exit 1
    ensure
      system "stty echo"   # TODO termios
      $stdout.print "\n"
    end

    if password.empty?
      $stderr.puts "canceled."
      exit 0
    end

    begin
      system "stty -echo"  # TODO termios
      $stdout.print "Retype new password: "
      password2 = STDIN.gets || ""
      password2 = password2[0..-2]  # strip \n
    rescue Interrupt
      $stderr.print "\ncanceled."
      exit 1
    ensure
      system "stty echo"   # TODO termios
      $stdout.print "\n"
    end

    if password == password2
      break
    end

    $stdout.puts "Doesn't match."
  end

  client = get_client(:ssl => true)

  client.change_password(name, password)

  $stderr.puts "Password of user '#{name}' changed."
end

#user_show(op) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/td/command/user.rb', line 5

def user_show(op)
  name = op.cmd_parse

  client = get_client

  users = client.users
  user = users.find {|user| name == user.name }
  unless user
    $stderr.puts "User '#{name}' does not exist."
    $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "user:create <name>' to create an user."
    exit 1
  end

  $stderr.puts "Name  : #{user.name}"
  $stderr.puts "Email : #{user.email}"
end

#workflow(op, capture_output = false, check_prereqs = true) ⇒ Object

The workflow entrypoint command. Invokes the digdag cli, passing on any command line arguments.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/td/command/workflow.rb', line 13

def workflow(op, capture_output=false, check_prereqs=true)
  if Config.apikey.nil?
    raise ConfigError
  end
  check_digdag_cli if check_prereqs
  cmd = [
      java_cmd,
      '-Dio.digdag.cli.programName=td workflow',
      '-XX:+TieredCompilation', '-XX:TieredStopAtLevel=1', '-Xverify:none'
  ]

  FileUtils.mkdir_p digdag_tmp_dir
  Dir.mktmpdir(nil, digdag_tmp_dir) { |wd|
    env = {}
    digdag_config_path = File.join(wd, 'config')
    FileUtils.touch(digdag_config_path)
    workflow_endpoint = Config.workflow_endpoint

    # In the future passing config to digdag should use environment variables
    if Config.cl_apikey || workflow_endpoint != 'https://api-workflow.treasuredata.com'
      # If the user passes the apikey on the command line we cannot use the digdag td.conf plugin.
      # Instead, create a digdag configuration file with the endpoint and the specified apikey.
      apikey = TreasureData::Config.apikey
      env['TD_CONFIG_PATH'] = nil
      env['TREASURE_DATA_WORKFLOW_ENDPOINT'] = workflow_endpoint
      env['TD_API_KEY'] = apikey
      File.write(digdag_config_path, [
          "client.http.endpoint = #{workflow_endpoint}",
          "client.http.headers.authorization = TD1 #{apikey}",
          "secrets.td.apikey = #{apikey}"
      ].join($/) + $/)
      cmd << '-Dio.digdag.standards.td.secrets.enabled=false'
      cmd << "-Dconfig.td.default_endpoint=#{Config.endpoint_domain}"
      if workflow_endpoint.match(/\.connect\./i)
        cmd << '-Dclient.http.disable_direct_download=true'
      end
    else
      # Use the digdag td.conf plugin to configure wf api and apikey.
      env['TREASURE_DATA_CONFIG_PATH'] = Config.path
      cmd << '-Dio.digdag.standards.td.client-configurator.enabled=true'
    end

    cmd << '-jar' << digdag_cli_path
    unless op.argv.empty?
      cmd << '--config' << digdag_config_path
    end
    cmd.concat(op.argv)

    unless ENV['TD_TOOLBELT_DEBUG'].nil?
      $stderr.puts cmd.to_s
    end

    if capture_output
      # TODO: use popen3 instead?
      stdout_str, stderr_str, status = Open3.capture3(env, *cmd)
      $stdout.write(stdout_str)
      $stderr.write(stderr_str)
      return status.exitstatus
    else
      Kernel::system(env, *cmd)
      return $?.exitstatus
    end
  }
end

#workflow_reset(op) ⇒ Object

“Factory reset”



87
88
89
90
91
92
93
94
# File 'lib/td/command/workflow.rb', line 87

def workflow_reset(op)
  op.cmd_parse # to show help

  $stdout << 'Removing workflow module...'
  FileUtils.rm_rf digdag_dir
  $stdout.puts ' Done.'
  return 0
end

#workflow_update(op) ⇒ Object



78
79
80
81
82
83
84
# File 'lib/td/command/workflow.rb', line 78

def workflow_update(op)
  version = op.cmd_parse
  $stdout << "Downloading workflow module #{version}..."
  download_digdag(version)
  $stdout.puts ' Done.'
  return 0
end

#workflow_version(op) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/td/command/workflow.rb', line 96

def workflow_version(op)
  op.cmd_parse # to show help

  unless File.exist?(digdag_cli_path)
    $stderr.puts('Workflow module not yet installed.')
    return 1
  end

  $stdout.puts("Bundled Java: #{bundled_java?}")

  begin
    out, status = Open3.capture2e(java_cmd, '-version')
    raise unless status.success?
  rescue
    $stderr.puts('Failed to run java')
    return 1
  end
  $stdout.puts(out)

  version_op = List::CommandParser.new("workflow", [], [], nil, ['--version'], true)
  $stdout.write('Digdag version: ')
  workflow(version_op, capture_output=true, check_prereqs=false)
end