Class: Outhad::Integrations::Destination::Sftp::Client

Inherits:
DestinationConnector
  • Object
show all
Includes:
Core::Fullrefresher, Core::RateLimiter
Defined in:
lib/outhad/integrations/destination/sftp/client.rb

Instance Method Summary collapse

Instance Method Details

#check_connection(connection_config) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/outhad/integrations/destination/sftp/client.rb', line 10

def check_connection(connection_config)
  connection_config = connection_config.with_indifferent_access
  with_sftp_client(connection_config) do |sftp|
    stream = SecureRandom.uuid
    test_path = "#{connection_config[:destination_path]}/#{stream}"
    test_file_operations(sftp, test_path)
    return success_status
  end
rescue StandardError => e
  handle_exception(e, {
                     context: "SFTP:CHECK_CONNECTION:EXCEPTION",
                     type: "error"
                   })
  failure_status(e)
end

#clear_all_records(sync_config) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/outhad/integrations/destination/sftp/client.rb', line 112

def clear_all_records(sync_config)
  connection_specification = sync_config.destination.connection_specification.with_indifferent_access
  with_sftp_client(connection_specification) do |sftp|
    files = sftp.dir.glob(connection_specification[:destination_path], "*")
    files.each do |file|
      sftp.remove!(File.join(connection_specification[:destination_path], file.name))
    end
    return control_message("Successfully cleared data.", "succeeded") if sftp.dir.entries(connection_specification[:destination_path]).size <= 2

    return control_message("Failed to clear data.", "failed")
  end
rescue StandardError => e
  control_message(e.message, "failed")
end

#discover(_connection_config = nil) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/outhad/integrations/destination/sftp/client.rb', line 26

def discover(_connection_config = nil)
  catalog_json = read_json(CATALOG_SPEC_PATH)

  catalog = build_catalog(catalog_json)

  catalog.to_outhad_message
rescue StandardError => e
  handle_exception(e, {
                     context: "SFTP:DISCOVER:EXCEPTION",
                     type: "error"
                   })
end

#write(sync_config, records, _action = "destination_insert") ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/outhad/integrations/destination/sftp/client.rb', line 39

def write(sync_config, records, _action = "destination_insert")
  @sync_config = sync_config
  connection_config = sync_config.destination.connection_specification.with_indifferent_access
  file_path = generate_file_path(sync_config)
  local_file_name = generate_local_file_name(sync_config)
  csv_content = generate_csv_content(records)
  records_size = records.size
  write_success = 0

  case connection_config[:format][:compression_type]
  when CompressionType.enum("zip")
    write_success = write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size)
  when CompressionType.enum("un_compressed")
    write_success = write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size)
  else
    raise ArgumentError, "Unsupported compression type: #{connection_config[:format][:compression_type]}"
  end
  write_failure = records.size - write_success
  tracking_message(write_success, write_failure)
rescue StandardError => e
  handle_exception(e, {
                     context: "SFTP:WRITE:EXCEPTION",
                     type: "error",
                     sync_id: @sync_config.sync_id,
                     sync_run_id: @sync_config.sync_run_id
                   })
end

#write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/outhad/integrations/destination/sftp/client.rb', line 67

def write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size)
  write_success = 0
  Tempfile.create([local_file_name, ".zip"]) do |tempfile|
    Zip::File.open(tempfile.path, Zip::File::CREATE) do |zipfile|
      zipfile.get_output_stream("#{local_file_name}.csv") { |f| f.write(csv_content) }
    end
    with_sftp_client(connection_config) do |sftp|
      sftp.upload!(tempfile.path, file_path)
      write_success = records_size
    rescue StandardError => e
      # TODO: add sync_id and sync_run_id to the log
      handle_exception(e, {
                         context: "SFTP:RECORD:WRITE:EXCEPTION",
                         type: "error",
                         sync_id: @sync_config.sync_id,
                         sync_run_id: @sync_config.sync_run_id
                       })
      write_success = 0
    end
  end
  write_success
end

#write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/outhad/integrations/destination/sftp/client.rb', line 90

def write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size)
  write_success = 0
  Tempfile.create([local_file_name, ".csv"]) do |tempfile|
    tempfile.write(csv_content)
    tempfile.close
    with_sftp_client(connection_config) do |sftp|
      sftp.upload!(tempfile.path, file_path)
      write_success = records_size
    rescue StandardError => e
      # TODO: add sync_id and sync_run_id to the log
      handle_exception(e, {
                         context: "SFTP:RECORD:WRITE:EXCEPTION",
                         type: "error",
                         sync_id: @sync_config.sync_id,
                         sync_run_id: @sync_config.sync_run_id
                       })
      write_success = 0
    end
  end
  write_success
end