Module: ZMQMachine::Server::Base

Included in:
PUB, PULL, PUSH, Pair, REP, REQ, SUB, XREP, XREQ
Defined in:
lib/zm/server/base.rb

Instance Method Summary collapse

Instance Method Details

#initialize(configuration) ⇒ Object



7
8
9
10
11
12
13
14
15
# File 'lib/zm/server/base.rb', line 7

def initialize configuration
  @reactor = configuration.reactor
  @configuration = configuration

  @on_read = @configuration.on_read
  allocate_socket

  @message_queue = []
end

#on_attach(socket) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/zm/server/base.rb', line 23

def on_attach socket
  # socket options *must* be set before we bind/connect otherwise they are ignored
  set_options socket
  rc = -1

  if @configuration.bind
    rc = socket.bind @configuration.endpoint
    @reactor.log :debug, "#{self.class}#on_attach, bind rc [#{rc}], endpoint #{@configuration.endpoint}"
    raise "#{self.class}#on_attach, failed to bind to endpoint [#{@configuration.endpoint}]" unless ZMQ::Util.resultcode_ok?(rc)
  elsif @configuration.connect
    rc = socket.connect @configuration.endpoint
    @reactor.log :debug, "#{self.class}#on_attach, connect rc [#{rc}], endpoint #{@configuration.endpoint}"
    raise "#{self.class}#on_attach, failed to connect to endpoint [#{@configuration.endpoint}]" unless ZMQ::Util.resultcode_ok?(rc)
  end


  register_for_events socket
end

#on_readable(socket, messages) ⇒ Object

Prints each message when global debugging is enabled.

Forwards messages on to the :on_read callback given in the constructor.



59
60
61
62
# File 'lib/zm/server/base.rb', line 59

def on_readable socket, messages
  @on_read.call socket, messages
  close_messages messages
end

#on_readable_error(socket, return_code) ⇒ Object



71
72
73
# File 'lib/zm/server/base.rb', line 71

def on_readable_error socket, return_code
  STDERR.puts "#{self.class}#on_readable_error, rc [#{return_code}], errno [#{ZMQ::Util.errno}], description [#{ZMQ::Util.error_string}], sock #{socket.inspect}"
end

#on_writable(socket) ⇒ Object

Just deregisters from receiving any further write events



66
67
68
69
# File 'lib/zm/server/base.rb', line 66

def on_writable socket
  #@reactor.log :debug, "#{self.class}#on_writable, deregister for writes on sid [#{@session_id}]"
  @reactor.deregister_writable socket
end

#on_writable_error(socket, return_code) ⇒ Object



75
76
77
# File 'lib/zm/server/base.rb', line 75

def on_writable_error socket, return_code
  STDERR.puts "#{self.class}#on_writable_error, rc [#{return_code}], errno [#{ZMQ::Util.errno}], description [#{ZMQ::Util.error_string}], sock #{socket.inspect}"
end

#shutdownObject



17
18
19
20
21
# File 'lib/zm/server/base.rb', line 17

def shutdown
  @reactor.log :debug, "#{self.class}#shutdown_socket, closing reactor socket"
  @on_read = nil
  @reactor.close_socket @socket
end

#write(messages, verbose = false) ⇒ Object

Takes an array of ZM::Message instances and writes them out to the socket. If any socket write fails, the message is saved. We will attempt to write it again in 10 milliseconds or when another message array is sent, whichever comes first.

All messages passed here are guaranteed to be written in the *order they were received*.



49
50
51
52
53
# File 'lib/zm/server/base.rb', line 49

def write messages, verbose = false
  @verbose = verbose
  @message_queue << messages
  write_queue_to_socket
end