Class: RFlow::Connections::ZMQStreamer
- Inherits:
-
Broker
- Object
- RFlow::ChildProcess
- Broker
- RFlow::Connections::ZMQStreamer
- Defined in:
- lib/rflow/connections/zmq_connection.rb
Overview
The broker process responsible for shuttling messages back and forth on a many-to-many pipeline link. (Solutions without a broker only allow a 1-to-many or many-to-1 connection.)
Constant Summary
Constants inherited from RFlow::ChildProcess
Instance Attribute Summary
Attributes inherited from RFlow::ChildProcess
Instance Method Summary collapse
-
#initialize(config) ⇒ ZMQStreamer
constructor
A new instance of ZMQStreamer.
-
#run_process ⇒ void
Start the broker process.
Methods inherited from Broker
Methods inherited from RFlow::ChildProcess
Constructor Details
#initialize(config) ⇒ ZMQStreamer
Returns a new instance of ZMQStreamer.
152 153 154 155 |
# File 'lib/rflow/connections/zmq_connection.rb', line 152 def initialize(config) @connection = config.connection super("broker-#{connection.name}", 'Broker') end |
Instance Method Details
#run_process ⇒ void
This method returns an undefined value.
Start the broker process. Returns when things are shutting down.
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/rflow/connections/zmq_connection.rb', line 159 def run_process version = LibZMQ::version RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" } @context = ZMQ::Context.new RFlow.logger.debug { "Connecting message broker to route from #{connection.options['output_address']} to #{connection.options['input_address']}" } @front = case connection.['output_socket_type'] when 'PUSH'; context.socket(ZMQ::PULL) when 'PUB'; context.socket(ZMQ::XSUB) else raise ArgumentError, "Unknown output socket type #{connection.options['output_socket_type']}" end @back = case connection.['input_socket_type'] when 'PULL'; context.socket(ZMQ::PUSH) when 'SUB'; context.socket(ZMQ::XPUB) else raise ArgumentError, "Unknown input socket type #{connection.options['input_socket_type']}" end front.bind(connection.['output_address']) back.bind(connection.['input_address']) while true ZMQ::Proxy.new(front, back) end rescue Exception => e RFlow.logger.error "Error running message broker: #{e.class}: #{e.message}, because: #{e.backtrace.inspect}" ensure back.close if back front.close if front context.terminate if context end |