Class: Thor::AppClient

Inherits:
Application show all
Defined in:
lib/ThorClient.rb

Defined Under Namespace

Classes: Server, StructCustomer

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Application

#amqp_loop, #run

Constructor Details

#initialize(opts = {}) ⇒ AppClient

Returns a new instance of AppClient.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/ThorClient.rb', line 68

def initialize(opts = {})
  super(opts)

  @@AMQP_DEFAULT_RETRY_INTERVAL = 3
  @@AMQP_MAX_RETRY_INTERVAL = (30)
  @@AMQP_MAX_RETRY_ATTEMPS = -1
  @@AMQP_RETRY_MULTIPLER = 1.5

  @amqp_retry_interval = @@AMQP_DEFAULT_RETRY_INTERVAL
  @amqp_retry_attempt = 0

  # Signalizes that application wants exit for some reason		
  @request_exit = false
  @em_server = nil
  @clients = {}
  @clients_lock = Mutex.new
  
  # AMQP options
  options[:amqp_host] = "localhost"
  options[:amqp_port] = 8467
  options[:amqp_user] = "user"
  options[:amqp_password] = "password"
  options[:amqp_vhost] = "my-vhost"

  options[:amqp_channel_master] = "master"

  # Event Machine options
  options[:em_port] = 8467 # Thor on cell-phone keyboard
  options[:em_auth_token] = ""

  initialize_optparser { |opts|
    ############################
    # AMQP Section
    ############################
    # AMQP Host
    opts.on( '-H', '--amqp-host STRING', "AMQP Server hostname") do |host|
      options[:amqp_host] = host
    end

    # AMQP Port
    opts.on( '-p', '--amqp-port NUM', "AMQP Server port number") do |port|
      options[:amqp_port] = port
    end

    # AMQP Username
    opts.on( '-u', '--amqp-user STRING', "AMQP Username") do |user|
      options[:amqp_user] = user
    end

    # AMQP Password
    opts.on( '-P', '--amqp-password STRING', "AMQP Password") do |password|
      options[:amqp_password] = password
    end

    # AMQP Vhost
    opts.on( '-V', '--amqp-vhost STRING', "AMQP Virtual Host") do |vhost|
      options[:amqp_vhost] = vhost
    end

    ## Channels
    # Channel Master
    opts.on( '-acm', '--amqp-channel-master STRING', "AMQP Channel Mastr") do |channel|
      options[:amqp_channel_master] = channel
    end

    ############################
    # EventMachine Section
    ############################
    # EM Port
    opts.on( '-ep', '--em-port NUM', "EventMachine port") do |port|
      options[:em_port] = port
    end

    # EM Authentication token
    opts.on( '-eat', '--em-auth-token STRING', "Authentication token used for communication with EM") do |token|
      options[:em_auth_token] = token
    end
  }
end

Instance Attribute Details

#clientsObject

Returns the value of attribute clients.



39
40
41
# File 'lib/ThorClient.rb', line 39

def clients
  @clients
end

#clients_lockObject

Returns the value of attribute clients_lock.



39
40
41
# File 'lib/ThorClient.rb', line 39

def clients_lock
  @clients_lock
end

#em_serverObject

Returns the value of attribute em_server.



39
40
41
# File 'lib/ThorClient.rb', line 39

def em_server
  @em_server
end

#request_exitObject

Returns the value of attribute request_exit.



39
40
41
# File 'lib/ThorClient.rb', line 39

def request_exit
  @request_exit
end

Instance Method Details

#amqp_exchange_connect_master(master, guid_direct) ⇒ Object

Connects to master exchange



183
184
185
186
187
# File 'lib/ThorClient.rb', line 183

def amqp_exchange_connect_master(master, guid_direct)
  if(options[:verbose])
    Bsl::Logger::Log "GUID '#{guid_direct}', Connecting to master exchange '#{master}'."
  end
end

#amqp_exchange_create_direct(guid) ⇒ Object

Creates local direct exchange



176
177
178
179
180
# File 'lib/ThorClient.rb', line 176

def amqp_exchange_create_direct(guid)
  if(options[:verbose])
    Bsl::Logger::Log "Creating direct exchange '#{guid}'."
  end
end

#amqp_handle_failure(e) ⇒ Object

Handles failure when connecting to AMQP



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/ThorClient.rb', line 190

def amqp_handle_failure(e)
  amqp_stop()
  
  Bsl::Logger::Log "AMQP Failure, reason: '#{e.inspect}'."

  if(@request_exit == true)
    return false
  end

  max_attempts_reached = false
  if(@@AMQP_MAX_RETRY_ATTEMPS != nil && @@AMQP_MAX_RETRY_ATTEMPS >= 0)
    @amqp_retry_attempt = @amqp_retry_attempt + 1
    max_attempts_reached = @amqp_retry_attempt > @@AMQP_MAX_RETRY_ATTEMPS
  end

  if(max_attempts_reached == false)				
    Bsl::Logger::Log "Next attempt in #{@amqp_retry_interval} sec(s)."

    sleep (@amqp_retry_interval)
    @amqp_retry_interval = @amqp_retry_interval * @@AMQP_RETRY_MULTIPLER
    @amqp_retry_interval = @@AMQP_MAX_RETRY_INTERVAL if @amqp_retry_interval  > @@AMQP_MAX_RETRY_INTERVAL
  else
    if(@@AMQP_MAX_RETRY_ATTEMPS != nil)
      Bsl::Logger::Log "Maximum AQMP reconnect attempts limit reached (#{@@AMQP_MAX_RETRY_ATTEMPS}), quitting."
    end
    @request_exit = true
  end
  
  return true
end

#amqp_reset_retry_intervalObject

Resets internal AMQP connection failure counter/interval



149
150
151
152
# File 'lib/ThorClient.rb', line 149

def amqp_reset_retry_interval
  @amqp_retry_interval = @@AMQP_DEFAULT_RETRY_INTERVAL
  @amqp_retry_attempt = 0
end

#amqp_startObject

Starts AMQP connection



155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/ThorClient.rb', line 155

def amqp_start
  Bsl::Logger::Log "Starting AMQP - Connecting #{options[:amqp_user]}@#{options[:amqp_host]}:#{options[:amqp_port]}#{options[:amqp_vhost]}"		
  AMQP.start(:host => options[:amqp_host], :port => options[:amqp_port],  :vhost => options[:amqp_vhost], :user => options[:amqp_user], :password => options[:amqp_password] ) do
    Bsl::Logger::Log "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..."
    
    amqp_reset_retry_interval()
    em_start()
    
    guid = Thor::generate_guid
    amqp_exchange_create_direct(guid) # Create local direct exchange
    amqp_exchange_connect_master(options[:amqp_channel_master], guid)
  end
end

#amqp_stopObject

Stops Running AMQP connection



170
171
172
173
# File 'lib/ThorClient.rb', line 170

def amqp_stop
  Bsl::Logger::Log "Stopping AMQP"
  AMQP.stop { EM.stop }
end

#em_post_init(em_connection, em_options) ⇒ Object

Called when client connects



243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/ThorClient.rb', line 243

def em_post_init(em_connection, em_options)
  pid = Digest::MD5.hexdigest(em_connection.to_s) # em_connection.get_pid
  port, ip = Socket.unpack_sockaddr_in(em_connection.get_peername)
  if(options[:verbose])
    Bsl::Logger::Log "Client connected #{ip}:#{port}, pid: #{pid}"
  end
  
  @clients_lock.synchronize {
    c = StructCustomer.new(pid, em_connection)
    @clients[pid] = c
    #puts "Clients now: #{@clients.size}"
  }
end

#em_receive(em_connection, em_options, status, raw_data) ⇒ Object

Event machine receive handle



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/ThorClient.rb', line 271

def em_receive(em_connection, em_options, status, raw_data)
  port, ip = Socket.unpack_sockaddr_in(em_connection.get_peername)
  if(options[:verbose])
    Bsl::Logger::Log "EventMachine received data from #{ip}:#{port}, :status => '#{status}', :raw_data => #{raw_data.chomp}"
  end
  
  data = nil
  begin
    data = JSON.parse(raw_data)
  rescue Exception => e
    Bsl::Logger::Log "Unable to parse incomming json, reason: #{e.message}."
    return false
  end
  
  if(data['type'] == "system" && data['code'] == "stop")
    @request_exit = true
    em_stop()
    amqp_stop()        
  end
  
  return true
end

#em_startObject

Event machine loop



222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/ThorClient.rb', line 222

def em_start
  Bsl::Logger::Log "Starting EventMachine at port #{options[:em_port]}"
  EM.run do
    em_opts = {:thor_client => self}
    EM.start_server 'localhost', options[:em_port], Server, em_opts do |conn|
      em_opts[:thor_client].em_server = conn
      em_opts[:conn] = conn
      conn.options = em_opts
      conn.status = :OK
    end
  end
end

#em_stopObject

Stop EventMachine



236
237
238
239
240
# File 'lib/ThorClient.rb', line 236

def em_stop
  Bsl::Logger::Log "Stopping EventMachine"
  #@em_server.stop
  EventMachine::stop_event_loop
end

#em_unbind(em_connection, em_options) ⇒ Object

Called when client disconnects



258
259
260
261
262
263
264
265
266
267
268
# File 'lib/ThorClient.rb', line 258

def em_unbind(em_connection, em_options)
  pid = Digest::MD5.hexdigest(em_connection.to_s) # em_connection.get_pid
  if(@options[:verbose])
    Bsl::Logger::Log "Client disconnected, pid: #{pid}"
  end
  
  @clients_lock.synchronize {
    @clients.delete(pid)
    #puts "Clients now: #{@clients.size}"
  }
end

#mainObject

Main entry-point



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/ThorClient.rb', line 295

def main
  super()

  # Run loop while exit is not requested
  while(@request_exit == false)
    begin
      amqp_start()
    rescue SystemExit, Interrupt
      Bsl::Logger::Log "Received interrupt, quitting!"
      @request_exit = true
      #em_stop()
      amqp_stop()
    rescue Exception => e
      amqp_handle_failure(e)
    end
  end
end