Class: RFlow::Connections::ZMQStreamer

Inherits:
Broker show all
Defined in:
lib/rflow/connections/zmq_connection.rb

Overview

The broker process responsible for shuttling messages back and forth on a many-to-many pipeline link. (Solutions without a broker only allow a 1-to-many or many-to-1 connection.)

Constant Summary

Constants inherited from RFlow::ChildProcess

RFlow::ChildProcess::SIGINFO

Instance Attribute Summary

Attributes inherited from RFlow::ChildProcess

#name, #pid

Instance Method Summary collapse

Methods inherited from Broker

build

Methods inherited from RFlow::ChildProcess

#shutdown!, #spawn!

Constructor Details

#initialize(config) ⇒ ZMQStreamer

Returns a new instance of ZMQStreamer.



152
153
154
155
# File 'lib/rflow/connections/zmq_connection.rb', line 152

def initialize(config)
  @connection = config.connection
  super("broker-#{connection.name}", 'Broker')
end

Instance Method Details

#run_processvoid

This method returns an undefined value.

Start the broker process. Returns when things are shutting down.



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/rflow/connections/zmq_connection.rb', line 159

def run_process
  version = LibZMQ::version
  RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" }
  @context = ZMQ::Context.new
  RFlow.logger.debug { "Connecting message broker to route from #{connection.options['output_address']} to #{connection.options['input_address']}" }

  @front = case connection.options['output_socket_type']
           when 'PUSH'; context.socket(ZMQ::PULL)
           when 'PUB'; context.socket(ZMQ::XSUB)
           else raise ArgumentError, "Unknown output socket type #{connection.options['output_socket_type']}"
           end
  @back = case connection.options['input_socket_type']
          when 'PULL'; context.socket(ZMQ::PUSH)
          when 'SUB'; context.socket(ZMQ::XPUB)
          else raise ArgumentError, "Unknown input socket type #{connection.options['input_socket_type']}"
          end
  front.bind(connection.options['output_address'])
  back.bind(connection.options['input_address'])
  while true
    ZMQ::Proxy.new(front, back)
  end
rescue Exception => e
  RFlow.logger.error "Error running message broker: #{e.class}: #{e.message}, because: #{e.backtrace.inspect}"
ensure
  back.close if back
  front.close if front
  context.terminate if context
end