Class: RFlow::Connections::ZMQStreamer

Inherits:
Broker show all
Defined in:
lib/rflow/connections/zmq_connection.rb

Overview

The broker process responsible for shuttling messages back and forth on a many-to-many pipeline link. (Solutions without a broker only allow a 1-to-many or many-to-1 connection.)

Instance Attribute Summary

Attributes inherited from RFlow::ChildProcess

#name, #pid

Instance Method Summary collapse

Methods inherited from Broker

build

Methods inherited from RFlow::ChildProcess

#run_child_process, #shutdown!, #spawn!

Constructor Details

#initialize(config) ⇒ ZMQStreamer

Returns a new instance of ZMQStreamer.



137
138
139
140
# File 'lib/rflow/connections/zmq_connection.rb', line 137

def initialize(config)
  @connection = config.connection
  super("broker-#{connection.name}", 'Broker')
end

Instance Method Details

#run_processObject



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/rflow/connections/zmq_connection.rb', line 142

def run_process
  version = LibZMQ::version
  RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" }
  @context = ZMQ::Context.new
  RFlow.logger.debug { "Connecting message broker to route from #{connection.options['output_address']} to #{connection.options['input_address']}" }

  @front = case connection.options['output_socket_type']
           when 'PUSH'; context.socket(ZMQ::PULL)
           when 'PUB'; context.socket(ZMQ::XSUB)
           else raise ArgumentError, "Unknown output socket type #{connection.options['output_socket_type']}"
           end
  @back = case connection.options['input_socket_type']
          when 'PULL'; context.socket(ZMQ::PUSH)
          when 'SUB'; context.socket(ZMQ::XPUB)
          else raise ArgumentError, "Unknown input socket type #{connection.options['input_socket_type']}"
          end
  front.bind(connection.options['output_address'])
  back.bind(connection.options['input_address'])
  while true
    ZMQ::Proxy.new(front, back)
  end
rescue Exception => e
  RFlow.logger.error "Error running message broker: #{e.class}: #{e.message}, because: #{e.backtrace.inspect}"
ensure
  back.close if back
  front.close if front
  context.terminate if context
end