Class: IB::Connection
Overview
Encapsulates API connection to TWS or Gateway
Constant Summary collapse
- DEFAULT_OPTIONS =
Please note, we are realizing only the most current TWS protocol versions, thus improving performance at the expense of backwards compatibility. Older protocol versions support can be found in older gem versions.
{:host =>'127.0.0.1', :port => '4001', # IB Gateway connection (default) #:port => '7496', # TWS connection, with annoying pop-ups :connect => true, # Connect at initialization :reader => true, # Start a separate reader Thread :received => true, # Keep all received messages in a Hash :logger => nil, :client_id => nil, # Will be randomly assigned :client_version => 57, # 48, # 57 = can receive commissionReport message :server_version => 60 # 53? Minimal server version required }
Class Attribute Summary collapse
-
.current ⇒ Object
Returns the value of attribute current.
Instance Attribute Summary collapse
-
#next_local_id ⇒ Object
(also: #next_order_id)
Info about IB server and server connection state.
-
#options ⇒ Object
Info about IB server and server connection state.
-
#server ⇒ Object
Info about IB server and server connection state.
Instance Method Summary collapse
-
#cancel_order(*local_ids) ⇒ Object
Cancel Orders by their local ids (convenience wrapper for send_message :CancelOrder).
-
#clear_received(*message_types) ⇒ Object
Clear received messages Hash.
-
#connect ⇒ Object
(also: #open)
Working with connection.
- #connected? ⇒ Boolean
- #disconnect ⇒ Object (also: #close)
-
#initialize(opts = {}) ⇒ Connection
constructor
A new instance of Connection.
-
#modify_order(order, contract) ⇒ Object
Modify Order (convenience wrapper for send_message :PlaceOrder).
-
#place_order(order, contract) ⇒ Object
Place Order (convenience wrapper for send_message :PlaceOrder).
-
#process_message ⇒ Object
Process single incoming message (blocking!).
-
#process_messages(poll_time = 200) ⇒ Object
Process incoming messages during poll_time (200) msecs, nonblocking.
- #reader_running? ⇒ Boolean
-
#received ⇒ Object
Hash of received messages, keyed by message type.
-
#received?(message_type, times = 1) ⇒ Boolean
Check if messages of given type were received at_least n times.
-
#satisfied?(*conditions) ⇒ Boolean
Check if all given conditions are satisfied.
-
#send_message(what, *args) ⇒ Object
(also: #dispatch)
Send an outgoing message.
- #socket ⇒ Object
-
#start_reader ⇒ Object
Start reader thread that continuously reads messages from server in background.
-
#subscribe(*args, &block) ⇒ Object
Subscribe Proc or block to specific type(s) of incoming message events.
-
#subscribers ⇒ Object
Message subscribers.
-
#unsubscribe(*ids) ⇒ Object
Remove all subscribers with specific subscriber id (TODO: multiple ids).
-
#wait_for(*args, &block) ⇒ Object
Wait for specific condition(s) - given as callable/block, or message type(s) - given as Symbol or [Symbol, times] pair.
Constructor Details
#initialize(opts = {}) ⇒ Connection
Returns a new instance of Connection.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/ib/connection.rb', line 36 def initialize opts = {} @options = DEFAULT_OPTIONS.merge(opts) # A couple of locks to avoid race conditions in JRuby @subscribe_lock = Mutex.new @receive_lock = Mutex.new self.default_logger = [:logger] if [:logger] @connected = false self.next_local_id = nil @server = Hash.new connect if [:connect] Connection.current = self end |
Class Attribute Details
.current ⇒ Object
Returns the value of attribute current.
26 27 28 |
# File 'lib/ib/connection.rb', line 26 def current @current end |
Instance Attribute Details
#next_local_id ⇒ Object Also known as: next_order_id
Info about IB server and server connection state
29 30 31 |
# File 'lib/ib/connection.rb', line 29 def next_local_id @next_local_id end |
#options ⇒ Object
Info about IB server and server connection state
29 30 31 |
# File 'lib/ib/connection.rb', line 29 def @options end |
#server ⇒ Object
Info about IB server and server connection state
29 30 31 |
# File 'lib/ib/connection.rb', line 29 def server @server end |
Instance Method Details
#cancel_order(*local_ids) ⇒ Object
Cancel Orders by their local ids (convenience wrapper for send_message :CancelOrder).
309 310 311 312 313 |
# File 'lib/ib/connection.rb', line 309 def cancel_order *local_ids local_ids.each do |local_id| :CancelOrder, :local_id => local_id.to_i end end |
#clear_received(*message_types) ⇒ Object
Clear received messages Hash
170 171 172 173 174 175 176 177 178 |
# File 'lib/ib/connection.rb', line 170 def clear_received * @receive_lock.synchronize do if .empty? received.each { |, container| container.clear } else .each { || received[].clear } end end end |
#connect ⇒ Object Also known as: open
Working with connection
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/ib/connection.rb', line 54 def connect error "Already connected!" if connected? # TWS always sends NextValidId message at connect - save this id self.subscribe(:NextValidId) do |msg| self.next_local_id = msg.local_id log.info "Got next valid order id: #{next_local_id}." end server[:socket] = IBSocket.open([:host], [:port]) # Secret handshake socket.write_data [:client_version] server[:client_version] = [:client_version] server[:server_version] = socket.read_int if server[:server_version] < [:server_version] error "TWS version #{server[:server_version]}, #{[:server_version]} required." end server[:remote_connect_time] = socket.read_string server[:local_connect_time] = Time.now() # Sending (arbitrary) client ID to identify subsequent communications. # The client with a client_id of 0 can manage the TWS-owned open orders. # Other clients can only manage their own open orders. server[:client_id] = [:client_id] || random_id socket.write_data server[:client_id] @connected = true log.info "Connected to server, version: #{server[:server_version]}, connection time: " + "#{server[:local_connect_time]} local, " + "#{server[:remote_connect_time]} remote." start_reader if [:reader] # Allows reconnect end |
#connected? ⇒ Boolean
105 106 107 |
# File 'lib/ib/connection.rb', line 105 def connected? @connected end |
#disconnect ⇒ Object Also known as: close
91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/ib/connection.rb', line 91 def disconnect if reader_running? @reader_running = false server[:reader].join end if connected? socket.close @server = Hash.new @connected = false end end |
#modify_order(order, contract) ⇒ Object
Modify Order (convenience wrapper for send_message :PlaceOrder). Returns order_id.
304 305 306 |
# File 'lib/ib/connection.rb', line 304 def modify_order order, contract order.modify contract, self end |
#place_order(order, contract) ⇒ Object
Place Order (convenience wrapper for send_message :PlaceOrder). Assigns client_id and order_id fields to placed order. Returns assigned order_id.
299 300 301 |
# File 'lib/ib/connection.rb', line 299 def place_order order, contract order.place contract, self end |
#process_message ⇒ Object
Process single incoming message (blocking!)
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/ib/connection.rb', line 252 def msg_id = socket.read_int # This read blocks! # Debug: log.debug "Got message #{msg_id} (#{Messages::Incoming::Classes[msg_id]})" # Create new instance of the appropriate message type, # and have it read the message from server. # NB: Failure here usually means unsupported message type received error "Got unsupported message #{msg_id}" unless Messages::Incoming::Classes[msg_id] msg = Messages::Incoming::Classes[msg_id].new(server) # Deliver message to all registered subscribers, alert if no subscribers @subscribe_lock.synchronize do subscribers[msg.class].each { |_, subscriber| subscriber.call(msg) } end log.warn "No subscribers for message #{msg.class}!" if subscribers[msg.class].empty? # Collect all received messages into a @received Hash @receive_lock.synchronize do received[msg.] << msg if [:received] end end |
#process_messages(poll_time = 200) ⇒ Object
Process incoming messages during poll_time (200) msecs, nonblocking
243 244 245 246 247 248 249 |
# File 'lib/ib/connection.rb', line 243 def poll_time = 200 # in msec time_out = Time.now + poll_time/1000.0 while (time_left = time_out - Time.now) > 0 # If server socket is readable, process single incoming message if select [socket], nil, nil, time_left end end |
#reader_running? ⇒ Boolean
238 239 240 |
# File 'lib/ib/connection.rb', line 238 def reader_running? @reader_running && server[:reader] && server[:reader].alive? end |
#received ⇒ Object
Hash of received messages, keyed by message type
181 182 183 |
# File 'lib/ib/connection.rb', line 181 def received @received ||= Hash.new { |hash, | hash[] = Array.new } end |
#received?(message_type, times = 1) ⇒ Boolean
Check if messages of given type were received at_least n times
186 187 188 189 190 |
# File 'lib/ib/connection.rb', line 186 def received? , times=1 @receive_lock.synchronize do received[].size >= times end end |
#satisfied?(*conditions) ⇒ Boolean
Check if all given conditions are satisfied
193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/ib/connection.rb', line 193 def satisfied? *conditions !conditions.empty? && conditions.inject(true) do |result, condition| result && if condition.is_a?(Symbol) received?(condition) elsif condition.is_a?(Array) received?(*condition) elsif condition.respond_to?(:call) condition.call else error "Unknown wait condition #{condition}" end end end |
#send_message(what, *args) ⇒ Object Also known as: dispatch
Send an outgoing message.
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/ib/connection.rb', line 279 def what, *args = case when what.is_a?(Messages::Outgoing::AbstractMessage) what when what.is_a?(Class) && what < Messages::Outgoing::AbstractMessage what.new *args when what.is_a?(Symbol) Messages::Outgoing.const_get(what).new *args else error "Only able to send outgoing IB messages", :args end error "Not able to send messages, IB not connected!" unless connected? .send_to server end |
#socket ⇒ Object
109 110 111 |
# File 'lib/ib/connection.rb', line 109 def socket server[:socket] end |
#start_reader ⇒ Object
Start reader thread that continuously reads messages from server in background. If you don’t start reader, you should manually poll @socket for messages or use #process_messages(msec) API.
230 231 232 233 234 235 236 |
# File 'lib/ib/connection.rb', line 230 def start_reader Thread.abort_on_exception = true @reader_running = true server[:reader] = Thread.new do while @reader_running end end |
#subscribe(*args, &block) ⇒ Object
Subscribe Proc or block to specific type(s) of incoming message events. Listener will be called later with received message instance as its argument. Returns subscriber id to allow unsubscribing
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/ib/connection.rb', line 118 def subscribe *args, &block @subscribe_lock.synchronize do subscriber = args.last.respond_to?(:call) ? args.pop : block id = random_id error "Need subscriber proc or block", :args unless subscriber.is_a? Proc args.each do |what| = case when what.is_a?(Class) && what < Messages::Incoming::AbstractMessage [what] when what.is_a?(Symbol) [Messages::Incoming.const_get(what)] when what.is_a?(Regexp) Messages::Incoming::Classes.values.find_all { |klass| klass.to_s =~ what } else error "#{what} must represent incoming IB message class", :args end .flatten.each do || # TODO: Fix: RuntimeError: can't add a new key into hash during iteration subscribers[][id] = subscriber end end id end end |
#subscribers ⇒ Object
Message subscribers. Key is the message class to listen for. Value is a Hash of subscriber Procs, keyed by their subscription id. All subscriber Procs will be called with the message instance as an argument when a message of that type is received.
163 164 165 |
# File 'lib/ib/connection.rb', line 163 def subscribers @subscribers ||= Hash.new { |hash, subs| hash[subs] = Hash.new } end |
#unsubscribe(*ids) ⇒ Object
Remove all subscribers with specific subscriber id (TODO: multiple ids)
147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/ib/connection.rb', line 147 def unsubscribe *ids @subscribe_lock.synchronize do removed = [] ids.each do |id| removed_at_id = subscribers.map { |_, subscribers| subscribers.delete id }.compact error "No subscribers with id #{id}" if removed_at_id.empty? removed << removed_at_id end removed.flatten end end |
#wait_for(*args, &block) ⇒ Object
Wait for specific condition(s) - given as callable/block, or message type(s) - given as Symbol or [Symbol, times] pair. Timeout after given time or 1 second.
211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/ib/connection.rb', line 211 def wait_for *args, &block timeout = args.find { |arg| arg.is_a? Numeric } # extract timeout from args end_time = Time.now + (timeout || 1) # default timeout 1 sec conditions = args.delete_if { |arg| arg.is_a? Numeric }.push(block).compact until end_time < Time.now || satisfied?(*conditions) if server[:reader] sleep 0.05 else 50 end end end |