Class: Alondra::MessageQueue

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/alondra/message_queue.rb

Instance Method Summary collapse

Instance Method Details

#on_readable(socket, messages) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/alondra/message_queue.rb', line 21

def on_readable(socket, messages)
  messages.each do |received|
    begin
      parse received.copy_out_string
    rescue Exception => ex
      Log.error "Error raised while processing message"
      Log.error "#{ex.class}: #{ex.message}"
      Log.error ex.backtrace.join("\n") if ex.respond_to? :backtrace
    end
  end
end

#parse(received_string) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/alondra/message_queue.rb', line 33

def parse(received_string)
  received_hash = ActiveSupport::JSON.decode(received_string).symbolize_keys

  if received_hash[:event]
    event = Event.new(received_hash, received_string)
    receive(event)
  elsif received_hash[:message]
    message = Message.new(received_hash[:message], received_hash[:channel_names])
    message.send_to_channels
  else
    Log.warn "Unrecognized message type #{received_string}"
  end
end

#receive(event) ⇒ Object



47
48
49
# File 'lib/alondra/message_queue.rb', line 47

def receive(event)
  event_router.process(event)
end

#reset!Object



51
52
53
54
55
56
57
# File 'lib/alondra/message_queue.rb', line 51

def reset!
  @connection.close_connection()

  @connection  = nil
  @context     = nil
  @push_socket = nil
end

#start_listeningObject



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/alondra/message_queue.rb', line 9

def start_listening
  Log.info "Starting message queue"

  if @connection
    Log.warn 'Push connection to message queue started twice'
    reset!
  end

  @connection = context.bind(ZMQ::SUB, Alondra.config.queue_socket, self)
  @connection.setsockopt ZMQ::SUBSCRIBE, '' # receive all
end