Class: IB::Connection

Inherits:
Object show all
Includes:
Support::Logging
Defined in:
lib/ib/connection.rb

Overview

Encapsulates API connection to TWS or Gateway

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Support::Logging

included

Instance Attribute Details

#client_idObject

Returns the value of attribute client_id.



31
32
33
# File 'lib/ib/connection.rb', line 31

def client_id
  @client_id
end

#client_versionObject

Returns the value of attribute client_version.



33
34
35
# File 'lib/ib/connection.rb', line 33

def client_version
  @client_version
end

#next_local_idObject Also known as: next_order_id

Next valid order id



30
31
32
# File 'lib/ib/connection.rb', line 30

def next_local_id
  @next_local_id
end

#server_versionObject

Returns the value of attribute server_version.



32
33
34
# File 'lib/ib/connection.rb', line 32

def server_version
  @server_version
end

#socketObject

Please note, we are realizing only the most current TWS protocol versions, thus improving performance at the expense of backwards compatibility. Older protocol versions support can be found in older gem versions.



29
30
31
# File 'lib/ib/connection.rb', line 29

def socket
  @socket
end

Instance Method Details

#cancel_order(*local_ids) ⇒ Object

Cancel Orders by their local ids (convenience wrapper for send_message :CancelOrder).



381
382
383
384
385
# File 'lib/ib/connection.rb', line 381

def cancel_order *local_ids
  local_ids.each do |local_id|
    send_message :CancelOrder, :local_id => local_id.to_i
  end
end

#clear_received(*message_types) ⇒ Object

Clear received messages Hash



220
221
222
223
224
225
226
227
228
# File 'lib/ib/connection.rb', line 220

def clear_received *message_types
  @receive_lock.synchronize do
    if message_types.empty?
      received.each { |message_type, container| container.clear }
    else
      message_types.each { |message_type| received[message_type].clear }
    end
  end
end

#connectObject Also known as: open

connect can be called directly. but is mostly called through update_next_order_id



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/ib/connection.rb', line 113

def connect
	logger.progname='IB::Connection#connect'
	if connected?
		error  "Already connected!"
		return
	end

	self.socket = IBSocket.open(@host, @port)  # raises  Errno::ECONNREFUSED  if no connection is possible
	socket.initialising_handshake
	socket.decode_message( socket.recieve_messages ) do  | the_message |
		#				logger.info{ "TheMessage :: #{the_message.inspect}" }
		@server_version =  the_message.shift.to_i
		error "ServerVersion does not match  #{@server_version} <--> #{MAX_CLIENT_VER}" if @server_version != MAX_CLIENT_VER

		@remote_connect_time = DateTime.parse the_message.shift
		@local_connect_time = Time.now
	end

	# V100 initial handshake
	# Parameters borrowed from the python client
	start_api = 71
	version = 2
	#			optcap = @optional_capacities.empty? ? "" : " "+ @optional_capacities
	socket.send_messages start_api, version, @client_id  , @optional_capacities
	@connected = true
	logger.fatal{ "Connected to server, version: #{@server_version}, " +
               "using client-id: #{client_id},\n   connection time: " +
						 "#{@local_connect_time} local, " +
							 "#{@remote_connect_time} remote." }

	start_reader
end

#connected?Boolean

Returns:

  • (Boolean)


161
162
163
# File 'lib/ib/connection.rb', line 161

def connected?
  @connected
end

#disconnectObject Also known as: close



148
149
150
151
152
153
154
155
156
157
# File 'lib/ib/connection.rb', line 148

def disconnect
  if reader_running?
    @reader_running = false
    @reader_thread.join
  end
  if connected?
    socket.close
    @connected = false
  end
end

#modify_order(order, contract) ⇒ Object

Modify Order (convenience wrapper for send_message :PlaceOrder). Returns order_id.



369
370
371
372
373
374
375
376
377
378
# File 'lib/ib/connection.rb', line 369

def modify_order order, contract
 #      order.modify contract, self    ## old
			error "Unable to modify order; local_id not specified" if order.local_id.nil?
  order.modified_at = Time.now
  send_message :PlaceOrder,
    :order => order,
    :contract => contract,
    :local_id => order.local_id
  order.local_id  # return value
end

#place_order(order, contract) ⇒ Object

Place Order (convenience wrapper for send_message :PlaceOrder). Assigns client_id and order_id fields to placed order. Returns assigned order_id.



357
358
359
360
361
362
363
364
365
366
# File 'lib/ib/connection.rb', line 357

def place_order order, contract
 # order.place contract, self  ## old
  error "Unable to place order, next_local_id not known" unless next_local_id
			error "local_id present. Order is already placed.  Do might use  modify insteed"  unless  order.local_id.nil?
  order.client_id = client_id
  order.local_id = next_local_id
  self.next_local_id += 1
  order.placed_at = Time.now
			modify_order order, contract
end

#process_messages(poll_time = 50) ⇒ Object

Process incoming messages during poll_time (200) msecs, nonblocking



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/ib/connection.rb', line 282

def process_messages poll_time = 50 # in msec
  time_out = Time.now + poll_time/1000.0
  begin
  while (time_left = time_out - Time.now) > 0
    # If socket is readable, process single incoming message
    if  RUBY_PLATFORM.match(/cygwin|mswin|mingw|bccwin|wince|emx/)
      process_message if select [socket], nil, nil, time_left
    

# the following  checks for shutdown of TWS side; ensures we don't run in a spin loop.
# unfortunately, it raises Errors in windows environment
    else
      if select [socket], nil, nil, time_left
        #  # Peek at the message from the socket; if it's blank then the
        #  # server side of connection (TWS) has likely shut down.
        socket_likely_shutdown = socket.recvmsg(100, Socket::MSG_PEEK)[0] == ""
				#
        #  # We go ahead process messages regardless (a no-op if socket_likely_shutdown).
        process_message
        #
        #  # After processing, if socket has shut down we sleep for 100ms
        #  # to avoid spinning in a tight loop. If the server side somehow
        #  # comes back up (gets reconnedted), normal processing
        #  # (without the 100ms wait) should happen.
        sleep(0.1) if socket_likely_shutdown
      end
    end
  end
  rescue Errno::ECONNRESET => e
    logger.fatal e.message
    if e.message =~ /Connection reset by peer/
      logger.fatal "Is another client listening on the same port?"
      error "try reconnecting with a different client-id", :reader
    else
      logger.fatal "Aborting"
      Kernel.exit
    end
  end
end

#reader_running?Boolean

Returns:

  • (Boolean)


277
278
279
# File 'lib/ib/connection.rb', line 277

def reader_running?
  @reader_running && @reader_thread && @reader_thread.alive?
end

#receivedObject

Hash of received messages, keyed by message type



231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/ib/connection.rb', line 231

def received
  @received_hash ||= Hash.new do |hash, message_type|
# enable access to the hash via
# ib.received[:MessageType].attribute
the_array = Array.new
def the_array.method_missing(method, *key)
	unless method == :to_hash || method == :to_str #|| method == :to_int
		return self.map{|x| x.public_send(method, *key)}
	end
end
			hash[message_type] = the_array
			end
end

#received?(message_type, times = 1) ⇒ Boolean

Check if messages of given type were received at_least n times

Returns:

  • (Boolean)


246
247
248
249
250
# File 'lib/ib/connection.rb', line 246

def received? message_type, times=1
  @receive_lock.synchronize do
    received[message_type].size >= times
  end
end

#send_message(what, *args) ⇒ Object Also known as: dispatch

returns the used request_id if appropiate, otherwise “true”



326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/ib/connection.rb', line 326

def send_message what, *args
  message =
  case
  when what.is_a?(Messages::Outgoing::AbstractMessage)
    what
  when what.is_a?(Class) && what < Messages::Outgoing::AbstractMessage
    what.new *args
  when what.is_a?(Symbol)
    Messages::Outgoing.const_get(what).new *args
  else
    error "Only able to send outgoing IB messages", :args
  end
  error   "Not able to send messages, IB not connected!"  unless connected?
			begin
  @message_lock.synchronize do
  message.send_to socket
  end
			rescue Errno::EPIPE
logger.error{ "Broken Pipe, trying to reconnect"  }
disconnect
connect
retry
			end
			## return the transmitted message
message.data[:request_id].presence || true
end

#start_readerObject

Start reader thread that continuously reads messages from @socket in background. If you don’t start reader, you should manually poll @socket for messages or use #process_messages(msec) API.



390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
# File 'lib/ib/connection.rb', line 390

def start_reader
  if @reader_running
    @reader_thread
  elsif connected?
    begin
    Thread.abort_on_exception = true
    @reader_running = true
    @reader_thread = Thread.new { process_messages while @reader_running }
  rescue Errno::ECONNRESET => e
      logger.fatal e.message
      Kernel.exit
    end
  else
    error "Could not start reader, not connected!", :reader, true
  end
end

#subscribe(*args, &block) ⇒ Object

Subscribe Proc or block to specific type(s) of incoming message events. Listener will be called later with received message instance as its argument. Returns subscriber id to allow unsubscribing



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/ib/connection.rb', line 170

def subscribe *args, &block
  @subscribe_lock.synchronize do
    subscriber = args.last.respond_to?(:call) ? args.pop : block
    id = random_id

    error  "Need subscriber proc or block ", :args  unless subscriber.is_a? Proc

    args.each do |what|
      message_classes =
      case
      when what.is_a?(Class) && what < Messages::Incoming::AbstractMessage
        [what]
      when what.is_a?(Symbol)
        if Messages::Incoming.const_defined?(what)
          [Messages::Incoming.const_get(what)]
        elsif defined?( TechnicalAnalysis ) && TechnicalAnalysis::Signals.const_defined?(what)
          [TechnicalAnalysis::Signals.const_get?(what)]
        else
          error "#{what} is no IB::Messages or TechnicalAnalyis::Signals class"
        end
      when what.is_a?(Regexp)
        Messages::Incoming::Classes.values.find_all { |klass| klass.to_s =~ what }
      else
        error  "#{what} must represent incoming IB message class", :args
      end
 # @subscribers_lock.synchronize do
      message_classes.flatten.each do |message_class|
        # TODO: Fix: RuntimeError: can't add a new key into hash during iteration
        subscribers[message_class][id] = subscriber
      end
 # end  # lock
    end

    id
  end
end

#unsubscribe(*ids) ⇒ Object

Remove all subscribers with specific subscriber id



208
209
210
211
212
213
214
215
216
# File 'lib/ib/connection.rb', line 208

def unsubscribe *ids
	@subscribe_lock.synchronize do
		ids.collect do |id|
			removed_at_id = subscribers.map { |_, subscribers| subscribers.delete id }.compact
			logger.error  "No subscribers with id #{id}"   if removed_at_id.empty?
			removed_at_id # return_value
		end.flatten
	end
end

#update_next_order_idObject

read actual order_id and connect if not connected



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/ib/connection.rb', line 91

def update_next_order_id
  q = Queue.new
  subscription = subscribe(:NextValidId){ |msg| q.push msg.local_id }
  unless connected?
    connect() # connect implies requesting NextValidId
  else
    send_message :RequestIds
  end
  th = Thread.new { sleep 5; q.close }
  local_id = q.pop
  if q.closed?
    error "Could not get NextValidID", :reader
  else
    th.kill
  end
  unsubscribe subscription
  local_id  # return next_id
end

#wait_for(*args, &block) ⇒ Object

wait_for depends heavyly on Connection#received. If collection of messages through recieved is turned off, wait_for loses most of its functionality



260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/ib/connection.rb', line 260

def wait_for *args, &block
  timeout = args.find { |arg| arg.is_a? Numeric } # extract timeout from args
  end_time = Time.now + (timeout || 1) # default timeout 1 sec
  conditions = args.delete_if { |arg| arg.is_a? Numeric }.push(block).compact

  until end_time < Time.now || satisfied?(*conditions)
    if reader_running?
      sleep 0.05
    else
      process_messages 50
    end
  end
end