Class: RServiceBus2::Transporter

Inherits:
Object
  • Object
show all
Defined in:
lib/rservicebus2/transporter.rb

Overview

TODO: Poison Message? Can I bury with timeout in beanstalk ? Needs to end up on an error queue, destination queue may be down. rubocop:disable Metrics/ClassLength

Instance Method Summary collapse

Instance Method Details

#connect(remote_host_name) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/rservicebus2/transporter.rb', line 101

def connect(remote_host_name)
  RServiceBus2.rlog "connect called, #{remote_host_name}"
  disconnect if @gateway.nil? || remoteHostName != @remote_host_name || @destination.nil?

  return unless @gateway.nil?

  # Get destination url from job
  pull_config(remote_host_name)
  connect_local
  connect_destination
end

#connect_destinationObject

rubocop:enable Metrics/MethodLength



78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/rservicebus2/transporter.rb', line 78

def connect_destination
  destination_url = "127.0.0.1:#{@local_port}"
  RServiceBus2.rlog "Connect to Remote Beanstalk, #{destination_url}"
  @destination = Beanstalk::Pool.new([destinationUrl])
  RServiceBus2.rlog "Connected to Remote Beanstalk, #{destination_url}"
rescue StandardError => e
  if e.message == 'Beanstalk::NotConnected'
    puts "***Could not connect to destination, check beanstalk is running at, #{destination_url}"
    raise CouldNotConnectToDestination
  end
  raise
end

#connect_localObject

rubocop:disable Metrics/MethodLength



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/rservicebus2/transporter.rb', line 58

def connect_local
  @local_port = get_value('LOCAL_PORT', 27_018).to_i
  RServiceBus2.rlog "Local Port: #{@local_port}"

  RServiceBus2.log "Connect SSH, #{@remote_user_name}@#{@remoteHostName}"
  # Open port 27018 to forward to 127.0.0.11300 on the remote host
  @gateway = Net::SSH::Gateway.new(@remote_host_name, @remote_user_name)
  @gateway.open('127.0.0.1', 11_300, @local_port)
  RServiceBus2.log "Connected to SSH, #{@remote_user_name}@#{@remote_host_name}"
rescue Errno::EADDRINUSE
  puts "*** Local transport port in use, #{@local_port}"
  puts "*** Change local transport port, #{@localPort}, using format, LOCAL_PORT=#{@localPort + 1}"
  abort
rescue Errno::EACCES
  puts "*** Local transport port specified, #{@local_port}, needs sudo access"
  puts '*** Change local transport port using format, LOCAL_PORT=27018'
  abort
end

#connect_to_source_beanstalkObject

rubocop:disable Metrics/MethodLength, Metrics/AbcSize



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/rservicebus2/transporter.rb', line 22

def connect_to_source_beanstalk
  source_queue_name = get_value('SOURCE_QUEUE_NAME', 'transport-out')
  source_url = get_value('SOURCE_URL', '127.0.0.1:11300')
  @source = Beanstalk::Pool.new([source_url])
  @source.watch source_queue_name

  RServiceBus2.log "Connected to, #{source_queue_name}@#{source_url}"
rescue StandardError => e
  puts 'Error connecting to Beanstalk'
  puts "Host string, #{sourceUrl}"
  if e.message == 'Beanstalk::NotConnected'
    puts '***Most likely, beanstalk is not running. Start beanstalk, and try running this again.'
    puts "***If you still get this error, check beanstalk is running at, #{sourceUrl}"
  else
    puts e.message
    puts e.backtrace
  end
  abort
end

#disconnectObject

rubocop:enable Metrics/MethodLength, Metrics/AbcSize



43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/rservicebus2/transporter.rb', line 43

def disconnect
  RServiceBus2.log "Disconnect from,
    #{@remote_user_name}@#{@remote_host_name}/#{@remote_queue_name}"
  @gateway&.shutdown!
  @gateway = nil
  @remote_host_name = nil

  @destination&.close
  @destination = nil

  @remote_user_name = nil
  @remote_queue_name = nil
end

#get_value(name, default = nil) ⇒ Object



15
16
17
18
19
# File 'lib/rservicebus2/transporter.rb', line 15

def get_value(name, default = nil)
  value = ENV[name].nil? || ENV[name] == '' ? default : ENV[name]
  RServiceBus2.log "Env value: #{name}: #{value}"
  value
end

#processObject

rubocop:disable Metrics/MethodLength, Metrics/AbcSize



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/rservicebus2/transporter.rb', line 114

def process
  # Get the next job from the source queue
  job = @source.reserve @timeout
  msg = RServiceBus2.safe_load(job.body)

  connect(msg.remote_host_name)

  @remote_queue_name = msg.remote_queue_name
  RServiceBus2.rlog "Put msg, #{msg.remote_queue_name}"
  @destination.use(msg.remote_queue_name)
  @destination.put(job.body)
  RServiceBus2.log "Msg put, #{msg.remote_queue_name}"

  unless ENV['AUDIT_QUEUE_NAME'].nil?
    @source.use ENV['AUDIT_QUEUE_NAME']
    @source.put job.body
  end
  # remove job
  job.delete

  RServiceBus2.log "Job sent to, #{@remote_user_name}@#{@remote_host_name}/#{@remote_queue_name}"
rescue StandardError => e
  disconnect
  if e.message == 'TIMED_OUT'
    RServiceBus2.rlog 'No Msg'
    return
  end
  raise e
end

#pull_config(remote_host_name) ⇒ Object



91
92
93
94
95
96
97
98
99
# File 'lib/rservicebus2/transporter.rb', line 91

def pull_config(remote_host_name)
  @remote_host_name = remote_host_name
  @remote_user_name = get_value("REMOTE_USER_#{remote_host_name.upcase}")
  return unless @remote_user_name.nil?

  RServiceBus2.log "**** Username not specified for Host, #{remoteHostName}"
  RServiceBus2.log "**** Add an environment variable of the form, REMOTE_USER_#{remoteHostName.upcase}=[USERNAME]"
  abort
end

#runObject

rubocop:enable Metrics/MethodLength, Metrics/AbcSize



145
146
147
148
149
150
# File 'lib/rservicebus2/transporter.rb', line 145

def run
  @timeout = get_value('TIMEOUT', 5)
  connectToSourceBeanstalk
  loop { process }
  disconnect_from_remote_ssh
end