Module: IOStreams::Pgp

Defined in:
lib/io_streams/pgp.rb,
lib/io_streams/pgp/reader.rb,
lib/io_streams/pgp/writer.rb

Overview

Read/Write PGP/GPG file or stream.

Limitations

  • Designed for processing larger files since a process is spawned for each file processed.

  • For small in memory files or individual emails, use the ‘opengpgme’ library.

Defined Under Namespace

Classes: Failure, Reader, UnsupportedVersion, Writer

Class Method Summary collapse

Class Method Details

.delete_keys(email: nil, key_id: nil, public: true, private: false) ⇒ Object

Delete all private and public keys for a particular email.

Returns false if no key was found. Raises an exception if it fails to delete the key.

email: [String] Optional email address for the key. key_id: [String] Optional id for the key.

public: [true|false]

Whether to delete the public key
Default: true

private: [true|false]

Whether to delete the private key
Default: false


104
105
106
107
108
109
110
111
112
113
# File 'lib/io_streams/pgp.rb', line 104

def self.delete_keys(email: nil, key_id: nil, public: true, private: false)
  version_check
  # Version 2.1+ uses delete_public_or_private_keys
  # Version < 2.1 uses delete_public_or_private_keys_v1
  method_name = pgp_version.to_f >= 2.1 ? :delete_public_or_private_keys : :delete_public_or_private_keys_v1
  status      = false
  status      = send(method_name, email: email, key_id: key_id, private: true) if private
  status      = send(method_name, email: email, key_id: key_id, private: false) if public
  status
end

.delete_public_or_private_keys(email: nil, key_id: nil, private: false) ⇒ Object



510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
# File 'lib/io_streams/pgp.rb', line 510

def self.delete_public_or_private_keys(email: nil, key_id: nil, private: false)
  keys = private ? "secret-keys" : "keys"

  list = email ? list_keys(email: email, private: private) : list_keys(key_id: key_id)
  return false if list.empty?

  list.each do |key_info|
    key_id = key_info[:key_id]
    next unless key_id

    command          = "#{executable} --batch --no-tty --yes --delete-#{keys} #{key_id}"
    out, err, status = Open3.capture3(command, binmode: true)
    logger&.debug { "IOStreams::Pgp.delete_keys: #{command}\n#{err}#{out}" }

    unless status.success?
      raise(Pgp::Failure, "GPG Failed calling #{executable} to delete #{keys} for #{email || key_id}: #{err}: #{out}")
    end
    raise(Pgp::Failure, "GPG Failed to delete #{keys} for #{email || key_id} #{err.strip}:#{out}") if out.include?("error")
  end
  true
end

.delete_public_or_private_keys_v1(email: nil, key_id: nil, private: false) ⇒ Object

Raises:



532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
# File 'lib/io_streams/pgp.rb', line 532

def self.delete_public_or_private_keys_v1(email: nil, key_id: nil, private: false)
  keys = private ? "secret-keys" : "keys"

  command = "for i in `#{executable} --list-#{keys} --with-colons --fingerprint #{email || key_id} | grep \"^fpr\" | cut -d: -f10`; do\n"
  command << "#{executable} --batch --no-tty --yes --delete-#{keys} \"$i\" ;\n"
  command << "done"

  out, err, status = Open3.capture3(command, binmode: true)
  logger&.debug { "IOStreams::Pgp.delete_keys: #{command}\n#{err}: #{out}" }

  return false if err =~ /(not found|no public key)/i
  unless status.success?
    raise(Pgp::Failure, "GPG Failed calling #{executable} to delete #{keys} for #{email || key_id}: #{err}: #{out}")
  end
  raise(Pgp::Failure, "GPG Failed to delete #{keys} for #{email || key_id} #{err.strip}: #{out}") if out.include?("error")

  true
end

.executableObject



18
19
20
# File 'lib/io_streams/pgp.rb', line 18

def self.executable
  @executable
end

.executable=(executable) ⇒ Object



22
23
24
# File 'lib/io_streams/pgp.rb', line 22

def self.executable=(executable)
  @executable = executable
end

.export(email:, ascii: true, private: false, passphrase: nil) ⇒ Object

Returns [String] containing all the public keys for the supplied email address.

email: [String] Email address for requested key.

ascii: [true|false]

Whether to export as ASCII text instead of binary format
Default: true

Raises:



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/io_streams/pgp.rb', line 186

def self.export(email:, ascii: true, private: false, passphrase: nil)
  version_check

  command = "#{executable} "
  command << "--pinentry-mode loopback " if pgp_version.to_f >= 2.1
  command << "--no-symkey-cache " if pgp_version.to_f >= 2.4
  command << "--armor " if ascii
  command << "--no-tty  --batch --passphrase"
  command << (passphrase ? " #{passphrase} " : "-fd 0 ")
  command << (private ? "--export-secret-keys #{email}" : "--export #{email}")

  out, err, status = Open3.capture3(command, binmode: true)
  logger&.debug { "IOStreams::Pgp.export: #{command}\n#{err}" }

  raise(Pgp::Failure, "GPG Failed reading key: #{email}: #{err}") unless status.success? && out.length.positive?

  out
end

.fingerprint(email:) ⇒ Object

DEPRECATED - Use key_ids instead of fingerprints



360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/io_streams/pgp.rb', line 360

def self.fingerprint(email:)
  version_check
  Open3.popen2e("#{executable} --list-keys --fingerprint --with-colons #{email}") do |_stdin, out, waith_thr|
    output = out.read.chomp
    if !waith_thr.value.success? && !(output !~ /(public key not found|No public key)/i)
      raise(Pgp::Failure, "GPG Failed calling #{executable} to list keys for #{email}: #{output}")
    end

    output.each_line do |line|
      if (match = line.match(/\Afpr.*::([^\:]*):\Z/))
        return match[1]
      end
    end
    nil
  end
end

.generate_key(name:, email:, passphrase:, comment: nil, key_type: "RSA", key_length: 4096, subkey_type: "RSA", subkey_length: key_length, expire_date: nil) ⇒ Object

Generate a new ultimate trusted local public and private key.

Returns [String] the key id for the generated key. Raises an exception if it fails to generate the key.

name: [String]

Name of who owns the key, such as organization

email: [String]

Email address for the key

comment: [String]

Optional comment to add to the generated key

passphrase [String]

Optional passphrase to secure the key with.
Highly Recommended.
To generate a good passphrase:
  `SecureRandom.urlsafe_base64(128)`

See ‘man gpg` for the remaining options

Raises:



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/io_streams/pgp.rb', line 49

def self.generate_key(name:,
                      email:,
                      passphrase:,
                      comment: nil,
                      key_type: "RSA",
                      key_length: 4096,
                      subkey_type: "RSA",
                      subkey_length: key_length,
                      expire_date: nil)
  version_check
  params = ""
  params << "Key-Type: #{key_type}\n" if key_type
  params << "Key-Length: #{key_length}\n" if key_length
  params << "Subkey-Type: #{subkey_type}\n" if subkey_type
  params << "Subkey-Length: #{subkey_length}\n" if subkey_length
  params << "Name-Real: #{name}\n" if name
  params << "Name-Comment: #{comment}\n" if comment
  params << "Name-Email: #{email}\n" if email
  params << "Expire-Date: #{expire_date}\n" if expire_date
  params << "Passphrase: #{passphrase}\n" if passphrase
  params << "%commit"
  command = "#{executable} --batch --gen-key --no-tty"

  out, err, status = Open3.capture3(command, binmode: true, stdin_data: params)
  logger&.debug { "IOStreams::Pgp.generate_key: #{command}\n#{params}\n#{err}#{out}" }

  raise(Pgp::Failure, "GPG Failed to generate key: #{err}#{out}") unless status.success?

  # Match different output formats for various GPG versions
  if (match = err.match(/gpg: key ([0-9A-F]+)\s+/))
    match[1]
  # For GPG 2.4+
  elsif (match = err.match(/gpg: revocation certificate stored as.*\n.*([0-9A-F]+)/))
    match[1]
  # Match new format for GnuPG 2.4.x
  elsif (match = err.match(/([0-9A-F]+)\.rev/i))
    match[1]
  end
end

.import(key:) ⇒ Object

Imports the supplied public/private key

Returns [Array<Hash>] keys that were successfully imported.

Each Hash consists of:
  key_id: [String]
  type:   [String]
  name:   [String]
  email:  [String]

Returns [] if the same key was previously imported.

Raises Pgp::Failure if there was an issue importing any of the keys.

Notes:

  • Importing a new key for the same email address does not remove the prior key if any.

  • Invalidated keys must be removed manually.

Raises:



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/io_streams/pgp.rb', line 220

def self.import(key:)
  version_check
  command = "#{executable} --batch --import"

  out, err, status = Open3.capture3(command, binmode: true, stdin_data: key)
  logger&.debug { "IOStreams::Pgp.import: #{command}\n#{err}#{out}" }

  # Handle both old and new versions of GPG
  # For older versions, the output is in err, for newer ones it might be in out
  output = err.empty? ? out : err

  # Check for duplicate keys or "not changed" messages
  return [] if output =~ /already in secret keyring/i || output =~ /not changed/i

  # Check for successful import in output, even if status has warnings
  import_successful = status.success? || output =~ /imported:\s*\d+/i || output =~ /public key.*imported/i

  if import_successful && !output.empty?
    # Sample output for GnuPG < 2.4:
    #
    #   gpg: key C16500E3: secret key imported\n"
    #   gpg: key C16500E3: public key "Joe Bloggs <[email protected]>" imported
    #   gpg: Total number processed: 1
    #   gpg:               imported: 1  (RSA: 1)
    #   gpg:       secret keys read: 1
    #   gpg:   secret keys imported: 1
    #
    # Sample output for GnuPG >= 2.4:
    #   gpg: key 7932AB23D7238F6B: public key "Joe Bloggs <[email protected]>" imported
    #   gpg: key 7932AB23D7238F6B: secret key imported
    #   gpg: Total number processed: 1
    #   gpg:               imported: 1
    #   gpg:       secret keys read: 1
    #   gpg:   secret keys imported: 1
    #
    # Duplicate key output for GnuPG 2.4:
    #   gpg: key 9DAB25FCEE68318A: "Joe Bloggs <[email protected]>" not changed
    #   gpg: Total number processed: 1
    #   gpg:              unchanged: 1
    #
    # Check for unchanged message specifically
    return [] if output =~ /unchanged: 1/i || output =~ /not changed/i

    results = []
    secret  = false
    name    = "Joe Bloggs" # Default name if we can't extract it
    email_addr = nil

    output.each_line do |line|
      if line =~ /secret key imported/
        secret = true
      elsif (match = line.match(/key\s+([0-9A-F]+):\s+.*"([^"]+)\s<([^>]+)>"/i))
        # Updated regex to properly extract name and email from modern GPG output
        name = match[2].to_s.strip
        email_addr = match[3].to_s.strip

        results << {
          key_id:  match[1].to_s.strip,
          private: secret,
          name:    name,
          email:   email_addr
        }
        secret = false
      end
    end

    # Return results if we found any
    return results unless results.empty?

    # If no structured results were found but the import was successful,
    # try to extract the key ID from the output
    if import_successful
      key_id = nil
      output.each_line do |line|
        if (match = line.match(/key\s+([0-9A-F]+):/i))
          key_id = match[1].to_s.strip
        elsif (match = line.match(/["']([^"']+)["']<([^>]+)>/i))
          name = match[1].to_s.strip
          email_addr = match[2].to_s.strip
        end
      end

      if key_id
        return [{
          key_id:  key_id,
          private: false,
          name:    name,
          email:   email_addr || "[email protected]"
        }]
      end
    end

    # Return empty array if we couldn't parse anything but the import was successful
    return [] if import_successful
  end

  raise(Pgp::Failure, "GPG Failed importing key: #{err}#{out}")
end

.import_and_trust(key:) ⇒ Object

Returns [String] email for the supplied after importing and trusting the key

Notes:

  • If the same email address has multiple keys then only the first is currently trusted.

Raises:

  • (ArgumentError)


323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/io_streams/pgp.rb', line 323

def self.import_and_trust(key:)
  raise(ArgumentError, "Key cannot be empty") if key.nil? || (key == "")

  key_info = key_info(key: key).last

  email = key_info.fetch(:email, nil)
  key_id = key_info.fetch(:key_id, nil)
  raise(ArgumentError, "Recipient email or key id cannot be extracted from supplied key") unless email || key_id

  import(key: key)
  set_trust(email: email, key_id: key_id)
  email || key_id
end

.key?(email: nil, key_id: nil, private: false) ⇒ Boolean

Returns [true|false] whether their is a key for the supplied email or key_id

Returns:

  • (Boolean)

Raises:

  • (ArgumentError)


116
117
118
119
120
# File 'lib/io_streams/pgp.rb', line 116

def self.key?(email: nil, key_id: nil, private: false)
  raise(ArgumentError, "Either :email, or :key_id must be supplied") if email.nil? && key_id.nil?

  !list_keys(email: email, key_id: key_id, private: private).empty?
end

.key_info(key:) ⇒ Object

Extract information from the supplied key.

Useful for confirming encryption keys before importing them.

Returns [Array<Hash>] the list of primary keys.

Each Hash consists of:
  key_length: [Integer]
  key_type:   [String]
  key_id:     [String]
  date:       [String]
  name:       [String]
  email:      [String]


159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/io_streams/pgp.rb', line 159

def self.key_info(key:)
  version_check
  command = "#{executable} --batch --no-tty"

  out, err, status = Open3.capture3(command, binmode: true, stdin_data: key)
  logger&.debug { "IOStreams::Pgp.key_info: #{command}\n#{err}#{out}" }

  # Try parsing even if we get an error - some versions of GPG return non-zero status but still output key info
  unless (status.success? || err.include?("key ID") || out.include?("pub")) && out.length.positive?
    raise(Pgp::Failure, "GPG Failed extracting key details: #{err} #{out}")
  end

  # Sample Output:
  #
  #   pub  4096R/3A5456F5 2017-06-07
  #   uid                            Joe Bloggs <[email protected]>
  #   sub  4096R/2C9B240B 2017-06-07
  parse_list_output(out)
end

.list_keys(email: nil, key_id: nil, private: false) ⇒ Object

Returns [Array<Hash>] the list of keys.

Each Hash consists of:
  key_length: [Integer]
  key_type:   [String]
  key_id:     [String]
  date:       [String]
  name:       [String]
  email:      [String]

Returns [] if no keys were found.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/io_streams/pgp.rb', line 131

def self.list_keys(email: nil, key_id: nil, private: false)
  version_check
  cmd     = private ? "--list-secret-keys" : "--list-keys"
  command = "#{executable} #{cmd} #{email || key_id}"

  out, err, status = Open3.capture3(command, binmode: true)
  logger&.debug { "IOStreams::Pgp.list_keys: #{command}\n#{err}#{out}" }
  if status.success? && out.length.positive?
    parse_list_output(out)
  else
    return [] if err =~ /(not found|No (public|secret) key|key not available)/i

    raise(Pgp::Failure, "GPG Failed calling '#{executable}' to list keys for #{email || key_id}: #{err}#{out}")
  end
end

.loggerObject



418
419
420
# File 'lib/io_streams/pgp.rb', line 418

def self.logger
  @logger
end

.logger=(logger) ⇒ Object



377
378
379
# File 'lib/io_streams/pgp.rb', line 377

def self.logger=(logger)
  @logger = logger
end

.parse_list_output(out) ⇒ Object

v2.4.7 output:

pub   rsa3072 2023-05-15 [SC] [expires: 2025-05-14]
      CB3E582C87C4D569C52F4A28C0A5F177F20E39B0
uid           [ultimate] Joe Bloggs <pgp_test@iostreams.net>
sub   rsa3072 2023-05-15 [E] [expires: 2025-05-14]

v2.2.1 output:

pub   rsa1024 2017-10-24 [SCEA]
18A0FC1C09C0D8AE34CE659257DC4AE323C7368C
uid           [ultimate] Joe Bloggs <pgp_test@iostreams.net>
sub   rsa1024 2017-10-24 [SEA]

v2.0.30 output:

pub   4096R/3A5456F5 2017-06-07
uid       [ unknown] Joe Bloggs <j@bloggs.net>
sub   4096R/2C9B240B 2017-06-07

v1.4 output:

sec   2048R/27D2E7FA 2016-10-05
uid                  Receiver <receiver@example.org>
ssb   2048R/893749EA 2016-10-05


446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
# File 'lib/io_streams/pgp.rb', line 446

def self.parse_list_output(out)
  results = []
  hash    = {}
  out.each_line do |line|
    if (match = line.match(/(pub|sec)\s+(\D+)(\d+)\s+(\d+-\d+-\d+)(\s+\[.*\])?(.*)/))
      # v2.2/v2.4:    pub   rsa1024 2017-10-24 [SCEA]
      hash = {
        private:    match[1] == "sec",
        key_length: match[3].to_s.to_i,
        key_type:   match[2],
        date:       (begin
          Date.parse(match[4].to_s)
        rescue StandardError
          match[4]
        end)
      }
    elsif (match = line.match(%r{(pub|sec)\s+(\d+)(.*)/(\w+)\s+(\d+-\d+-\d+)(\s+(.+)<(.+)>)?}))
      # Matches: pub  2048R/C7F9D9CB 2016-10-26
      # Or:      pub  2048R/C7F9D9CB 2016-10-26 Receiver <[email protected]>
      hash = {
        private:    match[1] == "sec",
        key_length: match[2].to_s.to_i,
        key_type:   match[3],
        key_id:     match[4],
        date:       (begin
          Date.parse(match[5].to_s)
        rescue StandardError
          match[5]
        end)
      }
      # Prior to gpg v2.0.30
      if match[7]
        hash[:name]  = match[7].strip
        hash[:email] = match[8].strip
        results << hash
        hash = {}
      end
    elsif (match = line.match(/uid\s+(\[(.+)\]\s+)?(.+)<(.+)>/))
      # Matches:  uid       [ unknown] Joe Bloggs <[email protected]>
      # Or:       uid                  Joe Bloggs <[email protected]>
      # v2.2:     uid           [ultimate] Joe Bloggs <[email protected]>
      hash[:email] = match[4].strip
      hash[:name]  = match[3].to_s.strip
      hash[:trust] = match[2].to_s.strip if match[1]
      results << hash
      hash = {}
    elsif (match = line.match(/uid\s+(\[(.+)\]\s+)?(.+)/))
      # Matches:  uid       [ unknown] Joe Bloggs
      # Or:       uid                  Joe Bloggs
      # v2.2:     uid           [ultimate] Joe Bloggs
      hash[:name]  = match[3].to_s.strip
      hash[:trust] = match[2].to_s.strip if match[1]
      results << hash
      hash = {}
    elsif (match = line.match(/\s+([A-Z0-9]{16,40})/))
      # v2.2/v2.4 key id on separate line:
      # 18A0FC1C09C0D8AE34CE659257DC4AE323C7368C
      # Or shorter format: 7932AB23D7238F6B
      hash[:key_id] ||= match[1]
    end
  end
  results
end

.pgp_versionObject

Returns [String] the version of pgp currently installed



382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# File 'lib/io_streams/pgp.rb', line 382

def self.pgp_version
  @pgp_version ||= begin
    command          = "#{executable} --version"
    out, err, status = Open3.capture3(command)
    logger&.debug { "IOStreams::Pgp.version: #{command}\n#{err}#{out}" }
    if status.success?
      # Sample output
      #   #{executable} (GnuPG) 2.0.30
      #   libgcrypt 1.7.6
      #   Copyright (C) 2015 Free Software Foundation, Inc.
      #   License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
      #   This is free software: you are free to change and redistribute it.
      #   There is NO WARRANTY, to the extent permitted by law.
      #
      #   Home: ~/.gnupg
      #   Supported algorithms:
      #   Pubkey: RSA, RSA, RSA, ELG, DSA
      #   Cipher: IDEA, 3DES, CAST5, BLOWFISH, AES, AES192, AES256, TWOFISH,
      #           CAMELLIA128, CAMELLIA192, CAMELLIA256
      #   Hash: MD5, SHA1, RIPEMD160, SHA256, SHA384, SHA512, SHA224
      #   Compression: Uncompressed, ZIP, ZLIB, BZIP2
      if (match = out.lines.first.match(/(\d+\.\d+.\d+)/))
        match[1]
      end
    else
      if err !~ /(key not found|No (public|secret) key)/i
        raise(Pgp::Failure, "GPG Failed calling #{executable} to list keys for #{email || key_id}: #{err}#{out}")
      end

      []
    end
  end
end

.set_trust(email: nil, key_id: nil, level: 5) ⇒ Object

Set the trust level for an existing key.

Returns [String] output if the trust was successfully updated Returns nil if the email was not found

After importing keys, they are not trusted and the relevant trust level must be set.

Default: 5 : Ultimate

Raises:



344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/io_streams/pgp.rb', line 344

def self.set_trust(email: nil, key_id: nil, level: 5)
  version_check
  fingerprint = key_id || fingerprint(email: email)
  return unless fingerprint

  command          = "#{executable} --import-ownertrust"
  trust            = "#{fingerprint}:#{level + 1}:\n"
  out, err, status = Open3.capture3(command, stdin_data: trust)
  logger&.debug { "IOStreams::Pgp.set_trust: #{command}\n#{err}#{out}" }

  raise(Pgp::Failure, "GPG Failed trusting key: #{err} #{out}") unless status.success?

  err
end

.version_checkObject



422
423
424
425
426
# File 'lib/io_streams/pgp.rb', line 422

def self.version_check
  # Previously, this method raised an error for versions >= 2.4
  # Now we support versions up to and including 2.4.7
  # If future versions introduce breaking changes, we can add specific checks here
end