Class: Aggro::ZeroMQTransport::Subscriber
- Inherits:
-
Object
- Object
- Aggro::ZeroMQTransport::Subscriber
- Defined in:
- lib/aggro/zeromq_transport/subscriber.rb
Overview
Public: Handles subscribing to messages on a given endpoint.
Defined Under Namespace
Classes: SubscriberAlreadyRunning
Instance Method Summary collapse
- #add_subscription(topic) ⇒ Object
-
#initialize(endpoint, callable = nil, &block) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(endpoint, callable = nil, &block) ⇒ Subscriber
Returns a new instance of Subscriber.
7 8 9 10 11 12 13 |
# File 'lib/aggro/zeromq_transport/subscriber.rb', line 7 def initialize(endpoint, callable = nil, &block) fail ArgumentError unless callable || block_given? @callable = block_given? ? block : callable @endpoint = endpoint @mutex = Mutex.new end |
Instance Method Details
#add_subscription(topic) ⇒ Object
15 16 17 18 19 20 21 |
# File 'lib/aggro/zeromq_transport/subscriber.rb', line 15 def add_subscription(topic) start unless @mutex.synchronize { @running } @mutex.synchronize { sub_socket.setsockopt ZMQ::SUBSCRIBE, topic } self end |
#start ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/aggro/zeromq_transport/subscriber.rb', line 23 def start @mutex.synchronize do return self if @running sub_socket start_on_thread sleep 0.01 until @running end self end |
#stop ⇒ Object
36 37 38 39 40 41 42 43 44 |
# File 'lib/aggro/zeromq_transport/subscriber.rb', line 36 def stop @mutex.synchronize do return self unless @running @running = false end self end |