Class: OpenTrade::Base::Role
- Inherits:
-
Object
- Object
- OpenTrade::Base::Role
- Defined in:
- lib/open_trade/base/role.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
Returns the value of attribute name.
-
#poll_interval ⇒ Object
Returns the value of attribute poll_interval.
-
#request_timeout ⇒ Object
Returns the value of attribute request_timeout.
-
#type ⇒ Object
Returns the value of attribute type.
Instance Method Summary collapse
-
#add_server_channel(name, zmq_type, port) ⇒ Object
name symbol A unique name for the process zmq_type integer ZMQ::REP, ZMQ::PUB, etc port integer.
- #delete_channel(name) ⇒ Object
-
#destroy ⇒ Object
Destroy, called on shutdown.
-
#initialize(name, type, options = {}) ⇒ Role
constructor
:name symbol Process name :type symbol Type of the process :options Hash :request_timeout Integer Time to wait for responses, in ms (default: 25) :poll_interval Integer Time to wait for new input, in ms (default: 25) :request_port Integer Port for requests.
- #log(level, message) ⇒ Object
- #process_message(raw_message) ⇒ Object
-
#register_message_handler(message_type, handler) ⇒ Object
Register a new handler for each type of message message_type.
-
#run_loop ⇒ Object
Run loop An empty process, overridden by the inheritors if needed.
- #send(channel, message_obj) ⇒ Object
-
#start ⇒ Object
Start.
Constructor Details
#initialize(name, type, options = {}) ⇒ Role
:name symbol Process name :type symbol Type of the process :options Hash
:request_timeout Integer Time to wait for responses, in ms (default: 25)
:poll_interval Integer Time to wait for new input, in ms (default: 25)
:request_port Integer Port for requests. nil for no port, 0 for dynamic, and any other number for a static port
:publish_port Integer Port for requests. nil for no port, 0 for dynamic, and any other number for a static port
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/open_trade/base/role.rb', line 15 def initialize name, type, ={} defaults = { :poll_interval=>25, :request_timeout=>25, :request_port => 0, :publish_port => 0 } = defaults.merge @name = name @type = type @poll_interval = [:poll_interval] @request_timeout = [:request_timeout] # Internal stuff @zmq_context = ZMQ::Context.new 1 @zmq_poller = ZMQ::Poller.new @hostname = ::Socket.gethostname @channels = Hash.new @handlers = Hash.new @running = false # Set up the server channels add_server_channel :request, ZMQ::REQ, [:request_port] add_server_channel :publish, ZMQ::PUB, [:publish_port] # Make sure to shutdown all the channels ObjectSpace.define_finalizer self, method(:destroy) end |
Instance Attribute Details
#name ⇒ Object
Returns the value of attribute name.
6 7 8 |
# File 'lib/open_trade/base/role.rb', line 6 def name @name end |
#poll_interval ⇒ Object
Returns the value of attribute poll_interval.
6 7 8 |
# File 'lib/open_trade/base/role.rb', line 6 def poll_interval @poll_interval end |
#request_timeout ⇒ Object
Returns the value of attribute request_timeout.
6 7 8 |
# File 'lib/open_trade/base/role.rb', line 6 def request_timeout @request_timeout end |
#type ⇒ Object
Returns the value of attribute type.
6 7 8 |
# File 'lib/open_trade/base/role.rb', line 6 def type @type end |
Instance Method Details
#add_server_channel(name, zmq_type, port) ⇒ Object
name symbol A unique name for the process zmq_type integer ZMQ::REP, ZMQ::PUB, etc port integer
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/open_trade/base/role.rb', line 49 def add_server_channel name, zmq_type, port return if port == nil @channels[name] = @zmq_context.socket zmq_type if port > 0 # Static Port @channels[name].bind "tcp://*:#{port}" else # Dynamic port port = rand(65000 - 1024) + 1024 begin @channels[name].bind "tcp://*:#{port}" rescue port = rand(65000 - 1024) + 1024 retry end end if zmq_type == ZMQ::REQ @zmq_poller.register @channels[name], ZMQ::POLLIN end end |
#delete_channel(name) ⇒ Object
70 71 72 73 74 |
# File 'lib/open_trade/base/role.rb', line 70 def delete_channel name return unless @channels.has_key? name @channels[name].close @channels.delete name end |
#destroy ⇒ Object
Destroy, called on shutdown
128 129 130 |
# File 'lib/open_trade/base/role.rb', line 128 def destroy @channels.keys.each {|name| delete_channel name} end |
#log(level, message) ⇒ Object
132 133 134 135 |
# File 'lib/open_trade/base/role.rb', line 132 def log level, puts "[%s][%s]: %s" % [ Time.now, level, ] send :publish, Base::Message.new(:log, { :level => level, :message => }) end |
#process_message(raw_message) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/open_trade/base/role.rb', line 101 def begin = JSON.parse rescue log :error, "Invalid message: #{}" return end unless .has_key? :type log :error, "Invalid message: #{}" return end unless @handlers.has_key [:type] log :error, "No handler defined for message type #{[:type]}" return end @handlers[[:type]].call end |
#register_message_handler(message_type, handler) ⇒ Object
Register a new handler for each type of message message_type
78 79 80 |
# File 'lib/open_trade/base/role.rb', line 78 def , handler @handlers[] = handler end |
#run_loop ⇒ Object
Run loop An empty process, overridden by the inheritors if needed
124 125 |
# File 'lib/open_trade/base/role.rb', line 124 def run_loop end |
#send(channel, message_obj) ⇒ Object
137 138 139 140 141 |
# File 'lib/open_trade/base/role.rb', line 137 def send channel, return unless @channels[channel] @channels[channel].send_string .to_s end |
#start ⇒ Object
Start
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/open_trade/base/role.rb', line 83 def start @running = true while @running do @zmq_poller.poll @poll_interval @zmq_poller.readables.each do |channel| @channels.select {|channel_name, socket| socket === channel}.each do |channel,socket| socket.recv_string(msg = '') msg end end run_loop end #shutdown goes here end |