Method: ActiveMessaging::Gateway.start

Defined in:
lib/activemessaging/gateway.rb

.startObject

Starts up an message listener to start polling for messages on each configured connection, and dispatching processing



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/activemessaging/gateway.rb', line 28

def start

  # subscribe - creating connections along the way
  subscribe

  # for each connection, start a thread
  @connections.each do |name, conn|
    @connection_threads[name] = Thread.start do
      while @running
        begin
          Thread.current[:message] = nil
          Thread.current[:message] = conn.receive
        #catch these but then stop looping
        rescue StopProcessingException=>spe
          ActiveMessaging.logger.error "ActiveMessaging: thread[#{name}]: Processing Stopped - receive interrupted, will process last message if already received"
          # break
        #catch all others, but go back and try and recieve again
        rescue Object=>exception
          ActiveMessaging.logger.error "ActiveMessaging: thread[#{name}]: Exception from connection.receive: #{exception.message}\n" + exception.backtrace.join("\n\t")
        ensure
          if Thread.current[:message]
            @guard.synchronize {
              dispatch Thread.current[:message]
            }
            Thread.current[:message] = nil
          else
            # if there is no message at all, sleep
            # maybe this should be configurable
            sleep(1)
          end
        end
        Thread.pass
      end
      ActiveMessaging.logger.error "ActiveMessaging: thread[#{name}]: receive loop terminated"
    end
  end

  while @running
    trap("TERM", "EXIT")
    living = false
    @connection_threads.each { |name, thread| living ||=  thread.alive? }
    @running = living
    sleep(1)
  end
  ActiveMessaging.logger.error "All connection threads have died..."
rescue Interrupt
  ActiveMessaging.logger.error "\n<<Interrupt received>>\n"
rescue Object=>exception
  ActiveMessaging.logger.error "#{exception.class.name}: #{exception.message}\n\t#{exception.backtrace.join("\n\t")}"
  raise exception
ensure
  ActiveMessaging.logger.error "Cleaning up..."
  stop
  ActiveMessaging.logger.error "=> END"
end