Class: RFlow::Connections::ZMQConnection
Class Attribute Summary collapse
#config, #name, #options, #recv_callback, #uuid
Class Method Summary
collapse
Instance Method Summary
collapse
build, #input_port_key, #output_port_key
Constructor Details
Returns a new instance of ZMQConnection.
38
39
40
41
42
|
# File 'lib/rflow/connections/zmq_connection.rb', line 38
def initialize(config)
super
validate_options!
zmq_context end
|
Class Attribute Details
.zmq_context ⇒ Object
Returns the current ZeroMQ context object or creates it if it does not exist.
27
28
29
|
# File 'lib/rflow/connections/zmq_connection.rb', line 27
def zmq_context
@zmq_context
end
|
Class Method Details
.create_zmq_context ⇒ Object
17
18
19
20
21
22
23
24
|
# File 'lib/rflow/connections/zmq_connection.rb', line 17
def create_zmq_context
version = LibZMQ::version
RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" }
if EM.reactor_running?
raise RuntimeError, 'EventMachine reactor is running when attempting to create a ZeroMQ context'
end
EM::ZeroMQ::Context.new(1)
end
|
Instance Method Details
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/rflow/connections/zmq_connection.rb', line 44
def connect_input!
RFlow.logger.debug "Connecting input #{uuid} with #{options.find_all {|k, v| k.to_s =~ /input/}}"
check_address(options['input_address'])
self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type']))
input_socket.send(options['input_responsibility'].to_sym, options['input_address'])
if config.delivery == 'broadcast'
input_socket.setsockopt(ZMQ::SUBSCRIBE, '') end
input_socket.on(:message) do |*message_parts|
begin
message = RFlow::Message.from_avro(message_parts.last.copy_out_string)
RFlow.logger.debug "#{name}: Received message of type '#{message_parts.first.copy_out_string}'"
message_parts.each(&:close) recv_callback.call(message)
rescue Exception => e
RFlow.logger.error "#{name}: Exception processing message of type '#{message.data_type_name}': #{e.message}, because: #{e.backtrace}"
end
end
input_socket
end
|
#connect_output! ⇒ Object
68
69
70
71
72
73
74
75
|
# File 'lib/rflow/connections/zmq_connection.rb', line 68
def connect_output!
RFlow.logger.debug "Connecting output #{uuid} with #{options.find_all {|k, v| k.to_s =~ /output/}}"
check_address(options['output_address'])
self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type']))
output_socket.send(options['output_responsibility'].to_sym, options['output_address'].to_s)
output_socket
end
|
#send_message(message) ⇒ Object
77
78
79
80
81
82
83
84
85
|
# File 'lib/rflow/connections/zmq_connection.rb', line 77
def send_message(message)
RFlow.logger.debug "#{name}: Sending message of type '#{message.data_type_name.to_s}'"
begin
output_socket.send_msg(message.data_type_name.to_s, message.to_avro)
rescue Exception => e
RFlow.logger.error "Exception #{e.class}: #{e.message}, because: #{e.backtrace}"
end
end
|