Class: RFlow::Connections::ZMQConnection

Inherits:
RFlow::Connection show all
Defined in:
lib/rflow/connections/zmq_connection.rb

Overview

Represents a ZeroMQ connection.

Direct Known Subclasses

BrokeredZMQConnection

Class Attribute Summary collapse

Attributes inherited from RFlow::Connection

#config, #name, #options, #recv_callback, #uuid

Instance Method Summary collapse

Methods inherited from RFlow::Connection

build, #input_port_key, #output_port_key

Constructor Details

#initialize(config) ⇒ ZMQConnection

Returns a new instance of ZMQConnection.



44
45
46
47
48
# File 'lib/rflow/connections/zmq_connection.rb', line 44

def initialize(config)
  super
  validate_options!
  zmq_context # cause the ZMQ context to be created before the reactor is running
end

Class Attribute Details

.zmq_contextEM::ZeroMQ::Context

The ZeroMQ context object.

Returns:

  • (EM::ZeroMQ::Context)


19
20
21
# File 'lib/rflow/connections/zmq_connection.rb', line 19

def zmq_context
  @zmq_context
end

Instance Method Details

#connect_input!void

This method returns an undefined value.

Hook up the input to the real ZeroMQ sockets.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/rflow/connections/zmq_connection.rb', line 52

def connect_input!
  RFlow.logger.debug "Connecting input #{uuid} with #{options.find_all {|k, v| k.to_s =~ /input/}}"
  check_address(options['input_address'])

  self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type']))
  input_socket.send(options['input_responsibility'].to_sym, options['input_address'])
  if config.delivery == 'broadcast'
    input_socket.setsockopt(ZMQ::SUBSCRIBE, '') # request all messages
  end

  input_socket.on(:message) do |*message_parts|
    begin
      message = RFlow::Message.from_avro(message_parts.last.copy_out_string)
      RFlow.logger.debug "#{name}: Received message of type '#{message_parts.first.copy_out_string}'"
      message_parts.each(&:close) # avoid memory leaks
      recv_callback.call(message)
    rescue Exception => e
      RFlow.logger.error "#{name}: Exception processing message of type '#{message.data_type_name}': #{e.message}, because: #{e.backtrace}"
    end
  end

  input_socket
end

#connect_output!void

This method returns an undefined value.

Hook up the output to the real ZeroMQ sockets.



78
79
80
81
82
83
84
85
# File 'lib/rflow/connections/zmq_connection.rb', line 78

def connect_output!
  RFlow.logger.debug "Connecting output #{uuid} with #{options.find_all {|k, v| k.to_s =~ /output/}}"
  check_address(options['output_address'])

  self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type']))
  output_socket.send(options['output_responsibility'].to_sym, options['output_address'].to_s)
  output_socket
end

#send_message(message) ⇒ void

This method returns an undefined value.

Send a message along the connection into ZeroMQ.



89
90
91
92
93
94
95
96
97
# File 'lib/rflow/connections/zmq_connection.rb', line 89

def send_message(message)
  RFlow.logger.debug "#{name}: Sending message of type '#{message.data_type_name.to_s}'"

  begin
    output_socket.send_msg(message.data_type_name.to_s, message.to_avro)
  rescue Exception => e
    RFlow.logger.error "Exception #{e.class}: #{e.message}, because: #{e.backtrace}"
  end
end

#zmq_contextEM::ZeroMQ::Context

The ZeroMQ context object.

Returns:

  • (EM::ZeroMQ::Context)


38
# File 'lib/rflow/connections/zmq_connection.rb', line 38

def zmq_context; ZMQConnection.zmq_context; end