Class: RFlow::Connections::ZMQConnection

Inherits:
RFlow::Connection show all
Defined in:
lib/rflow/connections/zmq_connection.rb

Direct Known Subclasses

BrokeredZMQConnection

Class Attribute Summary collapse

Attributes inherited from RFlow::Connection

#config, #name, #options, #recv_callback, #uuid

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from RFlow::Connection

build, #input_port_key, #output_port_key

Constructor Details

#initialize(config) ⇒ ZMQConnection

Returns a new instance of ZMQConnection.



38
39
40
41
42
# File 'lib/rflow/connections/zmq_connection.rb', line 38

def initialize(config)
  super
  validate_options!
  zmq_context # cause the ZMQ context to be created before the reactor is running
end

Class Attribute Details

.zmq_contextObject

Returns the current ZeroMQ context object or creates it if it does not exist.



27
28
29
# File 'lib/rflow/connections/zmq_connection.rb', line 27

def zmq_context
  @zmq_context
end

Class Method Details

.create_zmq_contextObject



17
18
19
20
21
22
23
24
# File 'lib/rflow/connections/zmq_connection.rb', line 17

def create_zmq_context
  version = LibZMQ::version
  RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" }
  if EM.reactor_running?
    raise RuntimeError, 'EventMachine reactor is running when attempting to create a ZeroMQ context'
  end
  EM::ZeroMQ::Context.new(1)
end

Instance Method Details

#connect_input!Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/rflow/connections/zmq_connection.rb', line 44

def connect_input!
  RFlow.logger.debug "Connecting input #{uuid} with #{options.find_all {|k, v| k.to_s =~ /input/}}"
  check_address(options['input_address'])

  self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type']))
  input_socket.send(options['input_responsibility'].to_sym, options['input_address'])
  if config.delivery == 'broadcast'
    input_socket.setsockopt(ZMQ::SUBSCRIBE, '') # request all messages
  end

  input_socket.on(:message) do |*message_parts|
    begin
      message = RFlow::Message.from_avro(message_parts.last.copy_out_string)
      RFlow.logger.debug "#{name}: Received message of type '#{message_parts.first.copy_out_string}'"
      message_parts.each(&:close) # avoid memory leaks
      recv_callback.call(message)
    rescue Exception => e
      RFlow.logger.error "#{name}: Exception processing message of type '#{message.data_type_name}': #{e.message}, because: #{e.backtrace}"
    end
  end

  input_socket
end

#connect_output!Object



68
69
70
71
72
73
74
75
# File 'lib/rflow/connections/zmq_connection.rb', line 68

def connect_output!
  RFlow.logger.debug "Connecting output #{uuid} with #{options.find_all {|k, v| k.to_s =~ /output/}}"
  check_address(options['output_address'])

  self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type']))
  output_socket.send(options['output_responsibility'].to_sym, options['output_address'].to_s)
  output_socket
end

#send_message(message) ⇒ Object



77
78
79
80
81
82
83
84
85
# File 'lib/rflow/connections/zmq_connection.rb', line 77

def send_message(message)
  RFlow.logger.debug "#{name}: Sending message of type '#{message.data_type_name.to_s}'"

  begin
    output_socket.send_msg(message.data_type_name.to_s, message.to_avro)
  rescue Exception => e
    RFlow.logger.error "Exception #{e.class}: #{e.message}, because: #{e.backtrace}"
  end
end

#zmq_contextObject



32
# File 'lib/rflow/connections/zmq_connection.rb', line 32

def zmq_context; ZMQConnection.zmq_context; end