Class: RFlow::Connections::ZMQStreamer
- Inherits:
-
Broker
- Object
- RFlow::ChildProcess
- Broker
- RFlow::Connections::ZMQStreamer
- Defined in:
- lib/rflow/connections/zmq_connection.rb
Overview
The broker process responsible for shuttling messages back and forth on a many-to-many pipeline link. (Solutions without a broker only allow a 1-to-many or many-to-1 connection.)
Instance Attribute Summary
Attributes inherited from RFlow::ChildProcess
Instance Method Summary collapse
-
#initialize(config) ⇒ ZMQStreamer
constructor
A new instance of ZMQStreamer.
- #run_process ⇒ Object
Methods inherited from Broker
Methods inherited from RFlow::ChildProcess
#run_child_process, #shutdown!, #spawn!
Constructor Details
#initialize(config) ⇒ ZMQStreamer
Returns a new instance of ZMQStreamer.
137 138 139 140 |
# File 'lib/rflow/connections/zmq_connection.rb', line 137 def initialize(config) @connection = config.connection super("broker-#{connection.name}", 'Broker') end |
Instance Method Details
#run_process ⇒ Object
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/rflow/connections/zmq_connection.rb', line 142 def run_process version = LibZMQ::version RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" } @context = ZMQ::Context.new RFlow.logger.debug { "Connecting message broker to route from #{connection.['output_address']} to #{connection.['input_address']}" } @front = case connection.['output_socket_type'] when 'PUSH'; context.socket(ZMQ::PULL) when 'PUB'; context.socket(ZMQ::XSUB) else raise ArgumentError, "Unknown output socket type #{connection.['output_socket_type']}" end @back = case connection.['input_socket_type'] when 'PULL'; context.socket(ZMQ::PUSH) when 'SUB'; context.socket(ZMQ::XPUB) else raise ArgumentError, "Unknown input socket type #{connection.['input_socket_type']}" end front.bind(connection.['output_address']) back.bind(connection.['input_address']) while true ZMQ::Proxy.new(front, back) end rescue Exception => e RFlow.logger.error "Error running message broker: #{e.class}: #{e.}, because: #{e.backtrace.inspect}" ensure back.close if back front.close if front context.terminate if context end |