Class: IB::Connection

Inherits:
Object show all
Includes:
LogDev
Defined in:
lib/ib/connection.rb

Overview

Encapsulates API connection to TWS or Gateway

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from LogDev

#default_logger, #default_logger=, #log

Instance Attribute Details

#client_idObject

Returns the value of attribute client_id.



31
32
33
# File 'lib/ib/connection.rb', line 31

def client_id
  @client_id
end

#client_versionObject

Returns the value of attribute client_version.



33
34
35
# File 'lib/ib/connection.rb', line 33

def client_version
  @client_version
end

#next_local_idObject Also known as: next_order_id

Next valid order id



30
31
32
# File 'lib/ib/connection.rb', line 30

def next_local_id
  @next_local_id
end

#server_versionObject

Returns the value of attribute server_version.



32
33
34
# File 'lib/ib/connection.rb', line 32

def server_version
  @server_version
end

#socketObject

Please note, we are realizing only the most current TWS protocol versions, thus improving performance at the expense of backwards compatibility. Older protocol versions support can be found in older gem versions.



29
30
31
# File 'lib/ib/connection.rb', line 29

def socket
  @socket
end

Instance Method Details

#cancel_order(*local_ids) ⇒ Object

Cancel Orders by their local ids (convenience wrapper for send_message :CancelOrder).



365
366
367
368
369
# File 'lib/ib/connection.rb', line 365

def cancel_order *local_ids
  local_ids.each do |local_id|
    send_message :CancelOrder, :local_id => local_id.to_i
  end
end

#clear_received(*message_types) ⇒ Object

Clear received messages Hash



220
221
222
223
224
225
226
227
228
# File 'lib/ib/connection.rb', line 220

def clear_received *message_types
  @receive_lock.synchronize do
    if message_types.empty?
      received.each { |message_type, container| container.clear }
    else
      message_types.each { |message_type| received[message_type].clear }
    end
  end
end

#connectObject Also known as: open

Working with connection



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/ib/connection.rb', line 112

def connect
	logger.progname='IB::Connection#connect'
	if connected?
		error  "Already connected!"
		return
	end

	self.socket = IBSocket.open(@host, @port)  # raises  Errno::ECONNREFUSED  if no connection is possible
	socket.initialising_handshake
	socket.decode_message( socket.recieve_messages ) do  | the_message |
		#				logger.info{ "TheMessage :: #{the_message.inspect}" }
		@server_version =  the_message.shift.to_i
		error "ServerVersion does not match  #{@server_version} <--> #{MAX_CLIENT_VER}" if @server_version != MAX_CLIENT_VER

		@remote_connect_time = DateTime.parse the_message.shift
		@local_connect_time = Time.now
	end

	# Sending (arbitrary) client ID to identify subsequent communications.
	# The client with a client_id of 0 can manage the TWS-owned open orders.
	# Other clients can only manage their own open orders.

	# V100 initial handshake
	# Parameters borrowed from the python client
	start_api = 71
	version = 2
	#			optcap = @optional_capacities.empty? ? "" : " "+ @optional_capacities
	socket.send_messages start_api, version, @client_id  , @optional_capacities
	@connected = true
	logger.info { "Connected to server, version: #{@server_version},\n connection time: " +
						 "#{@local_connect_time} local, " +
							 "#{@remote_connect_time} remote."}

	# if the client_id is wrong or the port is not accessible the first read attempt fails
	# get the first message and proceed if something reasonable is recieved
	the_message = process_message   # recieve next_order_id
	error "Check Port/Client_id ", :reader if the_message == " "
	start_reader
end

#connected?Boolean

Returns:

  • (Boolean)


167
168
169
# File 'lib/ib/connection.rb', line 167

def connected?
  @connected
end

#disconnectObject Also known as: close



154
155
156
157
158
159
160
161
162
163
# File 'lib/ib/connection.rb', line 154

def disconnect
  if reader_running?
    @reader_running = false
    @reader_thread.join
  end
  if connected?
    socket.close
    @connected = false
  end
end

#loggerObject

borrowed from active_support



24
# File 'lib/ib/connection.rb', line 24

mattr_accessor :logger

#modify_order(order, contract) ⇒ Object

Modify Order (convenience wrapper for send_message :PlaceOrder). Returns order_id.



353
354
355
356
357
358
359
360
361
362
# File 'lib/ib/connection.rb', line 353

def modify_order order, contract
 #      order.modify contract, self    ## old
			error "Unable to modify order; local_id not specified" if order.local_id.nil?
  order.modified_at = Time.now
  send_message :PlaceOrder,
    :order => order,
    :contract => contract,
    :local_id => order.local_id
  order.local_id  # return value
end

#place_order(order, contract) ⇒ Object

Place Order (convenience wrapper for send_message :PlaceOrder). Assigns client_id and order_id fields to placed order. Returns assigned order_id.



341
342
343
344
345
346
347
348
349
350
# File 'lib/ib/connection.rb', line 341

def place_order order, contract
 # order.place contract, self  ## old
  error "Unable to place order, next_local_id not known" unless next_local_id
			error "local_id present. Order is already placed.  Do might use  modify insteed"  unless  order.local_id.nil?
  order.client_id = client_id
  order.local_id = next_local_id
  self.next_local_id += 1
  order.placed_at = Time.now
			modify_order order, contract
end

#process_messages(poll_time = 50) ⇒ Object

Process incoming messages during poll_time (200) msecs, nonblocking



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/ib/connection.rb', line 281

def process_messages poll_time = 50 # in msec
  time_out = Time.now + poll_time/1000.0
  while (time_left = time_out - Time.now) > 0
    # If socket is readable, process single incoming message
#process_message if select [socket], nil, nil, time_left
# the following  checks for shutdown of TWS side; ensures we don't run in a spin loop.
# unfortunately, it raises Errors in windows environment
# disabled for now
    if select [socket], nil, nil, time_left
    #  # Peek at the message from the socket; if it's blank then the
    #  # server side of connection (TWS) has likely shut down.
      socket_likely_shutdown = socket.recvmsg(100, Socket::MSG_PEEK)[0] == ""
#
    #  # We go ahead process messages regardless (a no-op if socket_likely_shutdown).
      process_message
    #
    #  # After processing, if socket has shut down we sleep for 100ms
    #  # to avoid spinning in a tight loop. If the server side somehow
    #  # comes back up (gets reconnedted), normal processing
    #  # (without the 100ms wait) should happen.
     sleep(0.1) if socket_likely_shutdown
    end
  end
end

#reader_running?Boolean

Returns:

  • (Boolean)


276
277
278
# File 'lib/ib/connection.rb', line 276

def reader_running?
  @reader_running && @reader_thread && @reader_thread.alive?
end

#receivedObject

Hash of received messages, keyed by message type



231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/ib/connection.rb', line 231

def received
  @received_hash ||= Hash.new do |hash, message_type|
# enable access to the hash via
# ib.received[:MessageType].attribute
the_array = Array.new
def the_array.method_missing(method, *key)
	unless method == :to_hash || method == :to_str #|| method == :to_int
		return self.map{|x| x.public_send(method, *key)}
	end
end
			hash[message_type] = the_array
			end
end

#received?(message_type, times = 1) ⇒ Boolean

Check if messages of given type were received at_least n times

Returns:

  • (Boolean)


246
247
248
249
250
# File 'lib/ib/connection.rb', line 246

def received? message_type, times=1
  @receive_lock.synchronize do
    received[message_type].size >= times
  end
end

#send_message(what, *args) ⇒ Object Also known as: dispatch

returns the used request_id if appropiate, otherwise “true”



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/ib/connection.rb', line 310

def send_message what, *args
  message =
  case
  when what.is_a?(Messages::Outgoing::AbstractMessage)
    what
  when what.is_a?(Class) && what < Messages::Outgoing::AbstractMessage
    what.new *args
  when what.is_a?(Symbol)
    Messages::Outgoing.const_get(what).new *args
  else
    error "Only able to send outgoing IB messages", :args
  end
  error   "Not able to send messages, IB not connected!"  unless connected?
			begin
  @message_lock.synchronize do
  message.send_to socket
  end
			rescue Errno::EPIPE
logger.error{ "Broken Pipe, trying to reconnect"  }
disconnect
connect
retry
			end
			## return the transmitted message
message.data[:request_id].presence || true
end

#start_readerObject

Start reader thread that continuously reads messages from @socket in background. If you don’t start reader, you should manually poll @socket for messages or use #process_messages(msec) API.



374
375
376
377
378
379
380
381
382
383
384
# File 'lib/ib/connection.rb', line 374

def start_reader
			return(@reader_thread) if @reader_running
			if connected?
Thread.abort_on_exception = true
@reader_running = true
@reader_thread = Thread.new { process_messages while @reader_running }
			else
logger.fatal {"Could not start reader, not connected!"}
nil  # return_value
			end
end

#subscribe(*args, &block) ⇒ Object

Subscribe Proc or block to specific type(s) of incoming message events. Listener will be called later with received message instance as its argument. Returns subscriber id to allow unsubscribing



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/ib/connection.rb', line 176

def subscribe *args, &block
  @subscribe_lock.synchronize do
    subscriber = args.last.respond_to?(:call) ? args.pop : block
    id = random_id

    error  "Need subscriber proc or block ", :args  unless subscriber.is_a? Proc

    args.each do |what|
      message_classes =
      case
      when what.is_a?(Class) && what < Messages::Incoming::AbstractMessage
        [what]
      when what.is_a?(Symbol)
        [Messages::Incoming.const_get(what)]
      when what.is_a?(Regexp)
        Messages::Incoming::Classes.values.find_all { |klass| klass.to_s =~ what }
      else
        error  "#{what} must represent incoming IB message class", :args
      end
 # @subscribers_lock.synchronize do
      message_classes.flatten.each do |message_class|
        # TODO: Fix: RuntimeError: can't add a new key into hash during iteration
        subscribers[message_class][id] = subscriber
      end
 # end  # lock
    end

    id
  end
end

#unsubscribe(*ids) ⇒ Object

Remove all subscribers with specific subscriber id



208
209
210
211
212
213
214
215
216
# File 'lib/ib/connection.rb', line 208

def unsubscribe *ids
	@subscribe_lock.synchronize do
		ids.collect do |id|
			removed_at_id = subscribers.map { |_, subscribers| subscribers.delete id }.compact
			logger.error  "No subscribers with id #{id}"   if removed_at_id.empty?
			removed_at_id # return_value
		end.flatten
	end
end

#update_next_order_idObject

read actual order_id and connect if not connected



100
101
102
103
104
105
106
107
108
# File 'lib/ib/connection.rb', line 100

def update_next_order_id
	i,finish = 0, false
	sub = self.subscribe(:NextValidID) { finish =  true }
	connected? ?  self.send_message( :RequestIds )  : open()
	Timeout::timeout(1, IB::TransmissionError,"Could not get NextValidId" ) do
		loop { sleep 0.1; break if finish  }
	end
	self.unsubscribe sub
end

#wait_for(*args, &block) ⇒ Object

wait_for depends heavyly on Connection#received. If collection of messages through recieved is turned off, wait_for loses most of its functionality



259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/ib/connection.rb', line 259

def wait_for *args, &block
  timeout = args.find { |arg| arg.is_a? Numeric } # extract timeout from args
  end_time = Time.now + (timeout || 1) # default timeout 1 sec
  conditions = args.delete_if { |arg| arg.is_a? Numeric }.push(block).compact

  until end_time < Time.now || satisfied?(*conditions)
    if reader_running?
      sleep 0.05
    else
      process_messages 50
    end
  end
end