Class: RFlow::Connections::ZMQConnection
- Inherits:
-
RFlow::Connection
- Object
- RFlow::Connection
- RFlow::Connections::ZMQConnection
- Defined in:
- lib/rflow/connections/zmq_connection.rb
Overview
Represents a ZeroMQ connection.
Direct Known Subclasses
Class Attribute Summary collapse
-
.zmq_context ⇒ EM::ZeroMQ::Context
The ZeroMQ context object.
Attributes inherited from RFlow::Connection
#config, #name, #options, #recv_callback, #uuid
Instance Method Summary collapse
-
#connect_input! ⇒ void
Hook up the input to the real ZeroMQ sockets.
-
#connect_output! ⇒ void
Hook up the output to the real ZeroMQ sockets.
-
#initialize(config) ⇒ ZMQConnection
constructor
A new instance of ZMQConnection.
-
#send_message(message) ⇒ void
Send a message along the connection into ZeroMQ.
-
#zmq_context ⇒ EM::ZeroMQ::Context
The ZeroMQ context object.
Methods inherited from RFlow::Connection
build, #input_port_key, #output_port_key
Constructor Details
#initialize(config) ⇒ ZMQConnection
Returns a new instance of ZMQConnection.
44 45 46 47 48 |
# File 'lib/rflow/connections/zmq_connection.rb', line 44 def initialize(config) super zmq_context # cause the ZMQ context to be created before the reactor is running end |
Class Attribute Details
.zmq_context ⇒ EM::ZeroMQ::Context
The ZeroMQ context object.
19 20 21 |
# File 'lib/rflow/connections/zmq_connection.rb', line 19 def zmq_context @zmq_context end |
Instance Method Details
#connect_input! ⇒ void
This method returns an undefined value.
Hook up the input to the real ZeroMQ sockets.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/rflow/connections/zmq_connection.rb', line 52 def connect_input! RFlow.logger.debug "Connecting input #{uuid} with #{options.find_all {|k, v| k.to_s =~ /input/}}" check_address(['input_address']) self.input_socket = zmq_context.socket(ZMQ.const_get(['input_socket_type'])) input_socket.send(['input_responsibility'].to_sym, ['input_address']) if config.delivery == 'broadcast' input_socket.setsockopt(ZMQ::SUBSCRIBE, '') # request all messages end input_socket.on(:message) do |*| begin = RFlow::Message.from_avro(.last.copy_out_string) RFlow.logger.debug "#{name}: Received message of type '#{message_parts.first.copy_out_string}'" .each(&:close) # avoid memory leaks recv_callback.call() rescue Exception => e RFlow.logger.error "#{name}: Exception processing message of type '#{message.data_type_name}': #{e.message}, because: #{e.backtrace}" end end input_socket end |
#connect_output! ⇒ void
This method returns an undefined value.
Hook up the output to the real ZeroMQ sockets.
78 79 80 81 82 83 84 85 |
# File 'lib/rflow/connections/zmq_connection.rb', line 78 def connect_output! RFlow.logger.debug "Connecting output #{uuid} with #{options.find_all {|k, v| k.to_s =~ /output/}}" check_address(['output_address']) self.output_socket = zmq_context.socket(ZMQ.const_get(['output_socket_type'])) output_socket.send(['output_responsibility'].to_sym, ['output_address'].to_s) output_socket end |
#send_message(message) ⇒ void
This method returns an undefined value.
Send a message along the connection into ZeroMQ.
89 90 91 92 93 94 95 96 97 |
# File 'lib/rflow/connections/zmq_connection.rb', line 89 def () RFlow.logger.debug "#{name}: Sending message of type '#{message.data_type_name.to_s}'" begin output_socket.send_msg(.data_type_name.to_s, .to_avro) rescue Exception => e RFlow.logger.error "Exception #{e.class}: #{e.message}, because: #{e.backtrace}" end end |
#zmq_context ⇒ EM::ZeroMQ::Context
The ZeroMQ context object.
38 |
# File 'lib/rflow/connections/zmq_connection.rb', line 38 def zmq_context; ZMQConnection.zmq_context; end |