Class: Alondra::MessageQueue

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/alondra/message_queue.rb

Instance Method Summary collapse

Instance Method Details

#on_readable(socket, messages) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/alondra/message_queue.rb', line 23

def on_readable(socket, messages)
  messages.each do |received|
    begin
      parse received.copy_out_string
    rescue Exception => ex
      Log.error "Error raised while processing message"
      Log.error "#{ex.class}: #{ex.message}"
      Log.error ex.backtrace.join("\n") if ex.respond_to? :backtrace
    end
  end
end

#parse(received_string) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/alondra/message_queue.rb', line 35

def parse(received_string)
  received_hash = ActiveSupport::JSON.decode(received_string).symbolize_keys

  if received_hash[:event]
    event = Event.new(received_hash, received_string)
    receive(event)
  elsif received_hash[:message]
    message = Message.new(received_hash[:message], received_hash[:channel_names])
    message.send_to_channels
  else
    Log.warn "Unrecognized message type #{received_string}"
  end
end

#pull_socketObject



61
62
63
64
65
66
67
# File 'lib/alondra/message_queue.rb', line 61

def pull_socket
  @pull_socket ||= begin
    pull_socket = context.socket(ZMQ::PULL, self)  
    pull_socket.bind(Alondra.config.queue_socket)
    pull_socket
  end
end

#push_socketObject



53
54
55
56
57
58
59
# File 'lib/alondra/message_queue.rb', line 53

def push_socket
  @push_socket ||= begin
    push_socket = context.socket(ZMQ::PUSH)  
    push_socket.connect(Alondra.config.queue_socket)
    push_socket
  end
end

#receive(event) ⇒ Object



49
50
51
# File 'lib/alondra/message_queue.rb', line 49

def receive(event)
  event_router.process(event)
end

#reset!Object



69
70
71
72
73
74
75
76
# File 'lib/alondra/message_queue.rb', line 69

def reset!
  @push_socket.close()
  @pull_socket.close()
  
  @context     = nil
  @push_socket = nil
  @pull_socket = nil
end

#start_listeningObject



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/alondra/message_queue.rb', line 9

def start_listening
  Log.info "Starting message queue"

  if @pull_socket || @push_socket
    Log.warn 'Connections to message queue started twice'
    reset!
  end
  
  push_socket  
  pull_socket
  
  self
end