Class: Pantry::Communication::SubscribeSocket

Inherits:
ReadingSocket show all
Defined in:
lib/pantry/communication/subscribe_socket.rb

Overview

The SubscribeSocket manages the Subscription side of the Pub/Sub channel, using a 0MQ PUB socket. This socket can subscribe to any number of streams depending on the filtering given. Messages received by this socket are passed to the configured listener as Messages.

Instance Attribute Summary

Attributes inherited from ReadingSocket

#host, #port

Instance Method Summary collapse

Methods inherited from ReadingSocket

#add_listener, #has_source_header?, #open, #shutdown

Constructor Details

#initialize(host, port, security) ⇒ SubscribeSocket

Returns a new instance of SubscribeSocket.



10
11
12
13
# File 'lib/pantry/communication/subscribe_socket.rb', line 10

def initialize(host, port, security)
  super
  @filter = ClientFilter.new
end

Instance Method Details

#build_socketObject



19
20
21
# File 'lib/pantry/communication/subscribe_socket.rb', line 19

def build_socket
  Celluloid::ZMQ::SubSocket.new
end

#filter_on(client_filter) ⇒ Object



15
16
17
# File 'lib/pantry/communication/subscribe_socket.rb', line 15

def filter_on(client_filter)
  @filter = client_filter
end

#open_socket(socket) ⇒ Object



23
24
25
26
27
28
29
# File 'lib/pantry/communication/subscribe_socket.rb', line 23

def open_socket(socket)
  socket.connect("tcp://#{host}:#{port}")

  @filter.streams.each do |stream|
    socket.subscribe(stream)
  end
end