Class: Outhad::Integrations::Destination::Sftp::Client
- Inherits:
-
DestinationConnector
- Object
- DestinationConnector
- Outhad::Integrations::Destination::Sftp::Client
- Includes:
- Core::Fullrefresher, Core::RateLimiter
- Defined in:
- lib/outhad/integrations/destination/sftp/client.rb
Instance Method Summary collapse
- #check_connection(connection_config) ⇒ Object
- #clear_all_records(sync_config) ⇒ Object
- #discover(_connection_config = nil) ⇒ Object
- #write(sync_config, records, _action = "destination_insert") ⇒ Object
- #write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size) ⇒ Object
- #write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size) ⇒ Object
Instance Method Details
#check_connection(connection_config) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/outhad/integrations/destination/sftp/client.rb', line 10 def check_connection(connection_config) connection_config = connection_config.with_indifferent_access with_sftp_client(connection_config) do |sftp| stream = SecureRandom.uuid test_path = "#{connection_config[:destination_path]}/#{stream}" test_file_operations(sftp, test_path) return success_status end rescue StandardError => e handle_exception(e, { context: "SFTP:CHECK_CONNECTION:EXCEPTION", type: "error" }) failure_status(e) end |
#clear_all_records(sync_config) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/outhad/integrations/destination/sftp/client.rb', line 112 def clear_all_records(sync_config) connection_specification = sync_config.destination.connection_specification.with_indifferent_access with_sftp_client(connection_specification) do |sftp| files = sftp.dir.glob(connection_specification[:destination_path], "*") files.each do |file| sftp.remove!(File.join(connection_specification[:destination_path], file.name)) end return ("Successfully cleared data.", "succeeded") if sftp.dir.entries(connection_specification[:destination_path]).size <= 2 return ("Failed to clear data.", "failed") end rescue StandardError => e (e., "failed") end |
#discover(_connection_config = nil) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/outhad/integrations/destination/sftp/client.rb', line 26 def discover(_connection_config = nil) catalog_json = read_json(CATALOG_SPEC_PATH) catalog = build_catalog(catalog_json) catalog. rescue StandardError => e handle_exception(e, { context: "SFTP:DISCOVER:EXCEPTION", type: "error" }) end |
#write(sync_config, records, _action = "destination_insert") ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/outhad/integrations/destination/sftp/client.rb', line 39 def write(sync_config, records, _action = "destination_insert") @sync_config = sync_config connection_config = sync_config.destination.connection_specification.with_indifferent_access file_path = generate_file_path(sync_config) local_file_name = generate_local_file_name(sync_config) csv_content = generate_csv_content(records) records_size = records.size write_success = 0 case connection_config[:format][:compression_type] when CompressionType.enum("zip") write_success = write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size) when CompressionType.enum("un_compressed") write_success = write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size) else raise ArgumentError, "Unsupported compression type: #{connection_config[:format][:compression_type]}" end write_failure = records.size - write_success (write_success, write_failure) rescue StandardError => e handle_exception(e, { context: "SFTP:WRITE:EXCEPTION", type: "error", sync_id: @sync_config.sync_id, sync_run_id: @sync_config.sync_run_id }) end |
#write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/outhad/integrations/destination/sftp/client.rb', line 67 def write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size) write_success = 0 Tempfile.create([local_file_name, ".zip"]) do |tempfile| Zip::File.open(tempfile.path, Zip::File::CREATE) do |zipfile| zipfile.get_output_stream("#{local_file_name}.csv") { |f| f.write(csv_content) } end with_sftp_client(connection_config) do |sftp| sftp.upload!(tempfile.path, file_path) write_success = records_size rescue StandardError => e # TODO: add sync_id and sync_run_id to the log handle_exception(e, { context: "SFTP:RECORD:WRITE:EXCEPTION", type: "error", sync_id: @sync_config.sync_id, sync_run_id: @sync_config.sync_run_id }) write_success = 0 end end write_success end |
#write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/outhad/integrations/destination/sftp/client.rb', line 90 def write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size) write_success = 0 Tempfile.create([local_file_name, ".csv"]) do |tempfile| tempfile.write(csv_content) tempfile.close with_sftp_client(connection_config) do |sftp| sftp.upload!(tempfile.path, file_path) write_success = records_size rescue StandardError => e # TODO: add sync_id and sync_run_id to the log handle_exception(e, { context: "SFTP:RECORD:WRITE:EXCEPTION", type: "error", sync_id: @sync_config.sync_id, sync_run_id: @sync_config.sync_run_id }) write_success = 0 end end write_success end |