Class: OpenTrade::Base::Role

Inherits:
Object
  • Object
show all
Defined in:
lib/open_trade/base/role.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, type, options = {}) ⇒ Role

:name symbol Process name :type symbol Type of the process :options Hash

:request_timeout		Integer		Time to wait for responses, in ms (default: 25)
:poll_interval			Integer 	Time to wait for new input, in ms (default: 25)
:request_port			Integer 	Port for requests. nil for no port, 0 for dynamic, and any other number for a static port
:publish_port			Integer 	Port for requests. nil for no port, 0 for dynamic, and any other number for a static port


15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/open_trade/base/role.rb', line 15

def initialize name, type, options={}
	defaults = {
		:poll_interval=>25, 
		:request_timeout=>25, 
		:request_port => 0, 
		:publish_port => 0
	}

	options = defaults.merge options

	@name            = name
	@type            = type
	@poll_interval   = options[:poll_interval]
	@request_timeout = options[:request_timeout]

	# Internal stuff
	@zmq_context 	= ZMQ::Context.new 1
	@zmq_poller  	= ZMQ::Poller.new
	@hostname    	= ::Socket.gethostname
	@channels			= Hash.new
	@handlers			= Hash.new
	@running			= false

	# Set up the server channels
	add_server_channel :request, ZMQ::REQ, options[:request_port]
	add_server_channel :publish, ZMQ::PUB, options[:publish_port]

	# Make sure to shutdown all the channels
	ObjectSpace.define_finalizer self, method(:destroy)
end

Instance Attribute Details

#nameObject

Returns the value of attribute name.



6
7
8
# File 'lib/open_trade/base/role.rb', line 6

def name
  @name
end

#poll_intervalObject

Returns the value of attribute poll_interval.



6
7
8
# File 'lib/open_trade/base/role.rb', line 6

def poll_interval
  @poll_interval
end

#request_timeoutObject

Returns the value of attribute request_timeout.



6
7
8
# File 'lib/open_trade/base/role.rb', line 6

def request_timeout
  @request_timeout
end

#typeObject

Returns the value of attribute type.



6
7
8
# File 'lib/open_trade/base/role.rb', line 6

def type
  @type
end

Instance Method Details

#add_server_channel(name, zmq_type, port) ⇒ Object

name symbol A unique name for the process zmq_type integer ZMQ::REP, ZMQ::PUB, etc port integer



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/open_trade/base/role.rb', line 49

def add_server_channel	name, zmq_type, port
	return if port == nil
	@channels[name] = @zmq_context.socket zmq_type

	if port > 0 # Static Port
		@channels[name].bind "tcp://*:#{port}"
	else # Dynamic port
		port = rand(65000 - 1024) + 1024
		begin
			@channels[name].bind "tcp://*:#{port}"
		rescue
			port = rand(65000 - 1024) + 1024
			retry
		end
	end

	if zmq_type == ZMQ::REQ
		@zmq_poller.register @channels[name], ZMQ::POLLIN
	end
end

#delete_channel(name) ⇒ Object



70
71
72
73
74
# File 'lib/open_trade/base/role.rb', line 70

def delete_channel name
	return unless @channels.has_key? name
	@channels[name].close
	@channels.delete name
end

#destroyObject

Destroy, called on shutdown



128
129
130
# File 'lib/open_trade/base/role.rb', line 128

def destroy
	@channels.keys.each {|name| delete_channel name}
end

#log(level, message) ⇒ Object



132
133
134
135
# File 'lib/open_trade/base/role.rb', line 132

def log level, message
	puts "[%s][%s]: %s" % [ Time.now, level, message ]
	send :publish, Base::Message.new(:log, { :level => level, :message => message })
end

#process_message(raw_message) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/open_trade/base/role.rb', line 101

def process_message raw_message
	begin
		message = JSON.parse raw_message
	rescue
		log :error, "Invalid message: #{message}"
		return
	end

	unless message.has_key? :type
		log :error, "Invalid message: #{message}"
		return
	end

	unless @handlers.has_key message[:type]
		log :error, "No handler defined for message type #{message[:type]}"
		return
	end

	@handlers[message[:type]].call message
end

#register_message_handler(message_type, handler) ⇒ Object

Register a new handler for each type of message message_type



78
79
80
# File 'lib/open_trade/base/role.rb', line 78

def register_message_handler message_type, handler
	@handlers[message_type] = handler
end

#run_loopObject

Run loop An empty process, overridden by the inheritors if needed



124
125
# File 'lib/open_trade/base/role.rb', line 124

def run_loop
end

#send(channel, message_obj) ⇒ Object



137
138
139
140
141
# File 'lib/open_trade/base/role.rb', line 137

def send channel, message_obj
	return unless @channels[channel]

	@channels[channel].send_string message_obj.to_s
end

#startObject

Start



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/open_trade/base/role.rb', line 83

def start
	@running = true

	while @running do
		@zmq_poller.poll @poll_interval
		@zmq_poller.readables.each do |channel|
			@channels.select {|channel_name, socket| socket === channel}.each do |channel,socket|
				socket.recv_string(msg = '') 
				process_message msg
			end
		end

		run_loop
	end

	#shutdown goes here
end