Class: RServiceBus2::Transporter
- Inherits:
-
Object
- Object
- RServiceBus2::Transporter
- Defined in:
- lib/rservicebus2/transporter.rb
Overview
TODO: Poison Message? Can I bury with timeout in beanstalk ? Needs to end up on an error queue, destination queue may be down. rubocop:disable Metrics/ClassLength
Instance Method Summary collapse
- #connect(remote_host_name) ⇒ Object
-
#connect_destination ⇒ Object
rubocop:enable Metrics/MethodLength.
-
#connect_local ⇒ Object
rubocop:disable Metrics/MethodLength.
-
#connect_to_source_beanstalk ⇒ Object
rubocop:disable Metrics/MethodLength, Metrics/AbcSize.
-
#disconnect ⇒ Object
rubocop:enable Metrics/MethodLength, Metrics/AbcSize.
- #get_value(name, default = nil) ⇒ Object
-
#process ⇒ Object
rubocop:disable Metrics/MethodLength, Metrics/AbcSize.
- #pull_config(remote_host_name) ⇒ Object
-
#run ⇒ Object
rubocop:enable Metrics/MethodLength, Metrics/AbcSize.
Instance Method Details
#connect(remote_host_name) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/rservicebus2/transporter.rb', line 101 def connect(remote_host_name) RServiceBus2.rlog "connect called, #{remote_host_name}" disconnect if @gateway.nil? || remoteHostName != @remote_host_name || @destination.nil? return unless @gateway.nil? # Get destination url from job pull_config(remote_host_name) connect_local connect_destination end |
#connect_destination ⇒ Object
rubocop:enable Metrics/MethodLength
78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/rservicebus2/transporter.rb', line 78 def connect_destination destination_url = "127.0.0.1:#{@local_port}" RServiceBus2.rlog "Connect to Remote Beanstalk, #{destination_url}" @destination = Beanstalk::Pool.new([destinationUrl]) RServiceBus2.rlog "Connected to Remote Beanstalk, #{destination_url}" rescue StandardError => e if e. == 'Beanstalk::NotConnected' puts "***Could not connect to destination, check beanstalk is running at, #{destination_url}" raise CouldNotConnectToDestination end raise end |
#connect_local ⇒ Object
rubocop:disable Metrics/MethodLength
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/rservicebus2/transporter.rb', line 58 def connect_local @local_port = get_value('LOCAL_PORT', 27_018).to_i RServiceBus2.rlog "Local Port: #{@local_port}" RServiceBus2.log "Connect SSH, #{@remote_user_name}@#{@remoteHostName}" # Open port 27018 to forward to 127.0.0.11300 on the remote host @gateway = Net::SSH::Gateway.new(@remote_host_name, @remote_user_name) @gateway.open('127.0.0.1', 11_300, @local_port) RServiceBus2.log "Connected to SSH, #{@remote_user_name}@#{@remote_host_name}" rescue Errno::EADDRINUSE puts "*** Local transport port in use, #{@local_port}" puts "*** Change local transport port, #{@localPort}, using format, LOCAL_PORT=#{@localPort + 1}" abort rescue Errno::EACCES puts "*** Local transport port specified, #{@local_port}, needs sudo access" puts '*** Change local transport port using format, LOCAL_PORT=27018' abort end |
#connect_to_source_beanstalk ⇒ Object
rubocop:disable Metrics/MethodLength, Metrics/AbcSize
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/rservicebus2/transporter.rb', line 22 def connect_to_source_beanstalk source_queue_name = get_value('SOURCE_QUEUE_NAME', 'transport-out') source_url = get_value('SOURCE_URL', '127.0.0.1:11300') @source = Beanstalk::Pool.new([source_url]) @source.watch source_queue_name RServiceBus2.log "Connected to, #{source_queue_name}@#{source_url}" rescue StandardError => e puts 'Error connecting to Beanstalk' puts "Host string, #{sourceUrl}" if e. == 'Beanstalk::NotConnected' puts '***Most likely, beanstalk is not running. Start beanstalk, and try running this again.' puts "***If you still get this error, check beanstalk is running at, #{sourceUrl}" else puts e. puts e.backtrace end abort end |
#disconnect ⇒ Object
rubocop:enable Metrics/MethodLength, Metrics/AbcSize
43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/rservicebus2/transporter.rb', line 43 def disconnect RServiceBus2.log "Disconnect from, #{@remote_user_name}@#{@remote_host_name}/#{@remote_queue_name}" @gateway&.shutdown! @gateway = nil @remote_host_name = nil @destination&.close @destination = nil @remote_user_name = nil @remote_queue_name = nil end |
#get_value(name, default = nil) ⇒ Object
15 16 17 18 19 |
# File 'lib/rservicebus2/transporter.rb', line 15 def get_value(name, default = nil) value = ENV[name].nil? || ENV[name] == '' ? default : ENV[name] RServiceBus2.log "Env value: #{name}: #{value}" value end |
#process ⇒ Object
rubocop:disable Metrics/MethodLength, Metrics/AbcSize
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/rservicebus2/transporter.rb', line 114 def process # Get the next job from the source queue job = @source.reserve @timeout msg = RServiceBus2.safe_load(job.body) connect(msg.remote_host_name) @remote_queue_name = msg.remote_queue_name RServiceBus2.rlog "Put msg, #{msg.remote_queue_name}" @destination.use(msg.remote_queue_name) @destination.put(job.body) RServiceBus2.log "Msg put, #{msg.remote_queue_name}" unless ENV['AUDIT_QUEUE_NAME'].nil? @source.use ENV['AUDIT_QUEUE_NAME'] @source.put job.body end # remove job job.delete RServiceBus2.log "Job sent to, #{@remote_user_name}@#{@remote_host_name}/#{@remote_queue_name}" rescue StandardError => e disconnect if e. == 'TIMED_OUT' RServiceBus2.rlog 'No Msg' return end raise e end |
#pull_config(remote_host_name) ⇒ Object
91 92 93 94 95 96 97 98 99 |
# File 'lib/rservicebus2/transporter.rb', line 91 def pull_config(remote_host_name) @remote_host_name = remote_host_name @remote_user_name = get_value("REMOTE_USER_#{remote_host_name.upcase}") return unless @remote_user_name.nil? RServiceBus2.log "**** Username not specified for Host, #{remoteHostName}" RServiceBus2.log "**** Add an environment variable of the form, REMOTE_USER_#{remoteHostName.upcase}=[USERNAME]" abort end |
#run ⇒ Object
rubocop:enable Metrics/MethodLength, Metrics/AbcSize
145 146 147 148 149 150 |
# File 'lib/rservicebus2/transporter.rb', line 145 def run @timeout = get_value('TIMEOUT', 5) connectToSourceBeanstalk loop { process } disconnect_from_remote_ssh end |