Class: IB::Connection

Inherits:
Object show all
Defined in:
lib/ib/connection.rb

Overview

Encapsulates API connection to TWS or Gateway

Constant Summary collapse

DEFAULT_OPTIONS =

Please note, we are realizing only the most current TWS protocol versions, thus improving performance at the expense of backwards compatibility. Older protocol versions support can be found in older gem versions.

{:host =>'127.0.0.1',
                   :port => '4001', # IB Gateway connection (default)
                   #:port => '7496', # TWS connection, with annoying pop-ups
                   :connect => true, # Connect at initialization
                   :reader => true, # Start a separate reader Thread
                   :received => true, # Keep all received messages in a Hash
                   :logger => nil,
                   :client_id => nil, # Will be randomly assigned
                   :client_version => 57, # 48, # 57 = can receive commissionReport message
                   :server_version => 60 # 53? Minimal server version required
}

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Connection

Returns a new instance of Connection.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/ib/connection.rb', line 36

def initialize opts = {}
  @options = DEFAULT_OPTIONS.merge(opts)

  # A couple of locks to avoid race conditions in JRuby
  @subscribe_lock = Mutex.new
  @receive_lock = Mutex.new

  self.default_logger = options[:logger] if options[:logger]
  @connected = false
  self.next_local_id = nil
  @server = Hash.new

  connect if options[:connect]
  Connection.current = self
end

Class Attribute Details

.currentObject

Returns the value of attribute current.



26
27
28
# File 'lib/ib/connection.rb', line 26

def current
  @current
end

Instance Attribute Details

#next_local_idObject Also known as: next_order_id

Info about IB server and server connection state



29
30
31
# File 'lib/ib/connection.rb', line 29

def next_local_id
  @next_local_id
end

#optionsObject

Info about IB server and server connection state



29
30
31
# File 'lib/ib/connection.rb', line 29

def options
  @options
end

#serverObject

Info about IB server and server connection state



29
30
31
# File 'lib/ib/connection.rb', line 29

def server
  @server
end

Instance Method Details

#cancel_order(*local_ids) ⇒ Object

Cancel Orders by their local ids (convenience wrapper for send_message :CancelOrder).



309
310
311
312
313
# File 'lib/ib/connection.rb', line 309

def cancel_order *local_ids
  local_ids.each do |local_id|
    send_message :CancelOrder, :local_id => local_id.to_i
  end
end

#clear_received(*message_types) ⇒ Object

Clear received messages Hash



170
171
172
173
174
175
176
177
178
# File 'lib/ib/connection.rb', line 170

def clear_received *message_types
  @receive_lock.synchronize do
    if message_types.empty?
      received.each { |message_type, container| container.clear }
    else
      message_types.each { |message_type| received[message_type].clear }
    end
  end
end

#connectObject Also known as: open

Working with connection



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/ib/connection.rb', line 54

def connect
  error "Already connected!" if connected?

  # TWS always sends NextValidId message at connect - save this id
  self.subscribe(:NextValidId) do |msg|
    self.next_local_id = msg.local_id
    log.info "Got next valid order id: #{next_local_id}."
  end

  server[:socket] = IBSocket.open(options[:host], options[:port])

  # Secret handshake
  socket.write_data options[:client_version]
  server[:client_version] = options[:client_version]
  server[:server_version] = socket.read_int
  if server[:server_version] < options[:server_version]
    error "TWS version #{server[:server_version]}, #{options[:server_version]} required."
  end
  server[:remote_connect_time] = socket.read_string
  server[:local_connect_time] = Time.now()

  # Sending (arbitrary) client ID to identify subsequent communications.
  # The client with a client_id of 0 can manage the TWS-owned open orders.
  # Other clients can only manage their own open orders.
  server[:client_id] = options[:client_id] || random_id
  socket.write_data server[:client_id]

  @connected = true
  log.info "Connected to server, version: #{server[:server_version]}, connection time: " +
               "#{server[:local_connect_time]} local, " +
               "#{server[:remote_connect_time]} remote."

  start_reader if options[:reader] # Allows reconnect
end

#connected?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/ib/connection.rb', line 105

def connected?
  @connected
end

#disconnectObject Also known as: close



91
92
93
94
95
96
97
98
99
100
101
# File 'lib/ib/connection.rb', line 91

def disconnect
  if reader_running?
    @reader_running = false
    server[:reader].join
  end
  if connected?
    socket.close
    @server = Hash.new
    @connected = false
  end
end

#modify_order(order, contract) ⇒ Object

Modify Order (convenience wrapper for send_message :PlaceOrder). Returns order_id.



304
305
306
# File 'lib/ib/connection.rb', line 304

def modify_order order, contract
  order.modify contract, self
end

#place_order(order, contract) ⇒ Object

Place Order (convenience wrapper for send_message :PlaceOrder). Assigns client_id and order_id fields to placed order. Returns assigned order_id.



299
300
301
# File 'lib/ib/connection.rb', line 299

def place_order order, contract
  order.place contract, self
end

#process_messageObject

Process single incoming message (blocking!)



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/ib/connection.rb', line 252

def process_message
  msg_id = socket.read_int # This read blocks!

  # Debug:
  log.debug "Got message #{msg_id} (#{Messages::Incoming::Classes[msg_id]})"

  # Create new instance of the appropriate message type,
  # and have it read the message from server.
  # NB: Failure here usually means unsupported message type received
  error "Got unsupported message #{msg_id}" unless Messages::Incoming::Classes[msg_id]
  msg = Messages::Incoming::Classes[msg_id].new(server)

  # Deliver message to all registered subscribers, alert if no subscribers
  @subscribe_lock.synchronize do
    subscribers[msg.class].each { |_, subscriber| subscriber.call(msg) }
  end
  log.warn "No subscribers for message #{msg.class}!" if subscribers[msg.class].empty?

  # Collect all received messages into a @received Hash
  @receive_lock.synchronize do
    received[msg.message_type] << msg if options[:received]
  end
end

#process_messages(poll_time = 200) ⇒ Object

Process incoming messages during poll_time (200) msecs, nonblocking



243
244
245
246
247
248
249
# File 'lib/ib/connection.rb', line 243

def process_messages poll_time = 200 # in msec
  time_out = Time.now + poll_time/1000.0
  while (time_left = time_out - Time.now) > 0
    # If server socket is readable, process single incoming message
    process_message if select [socket], nil, nil, time_left
  end
end

#reader_running?Boolean

Returns:

  • (Boolean)


238
239
240
# File 'lib/ib/connection.rb', line 238

def reader_running?
  @reader_running && server[:reader] && server[:reader].alive?
end

#receivedObject

Hash of received messages, keyed by message type



181
182
183
# File 'lib/ib/connection.rb', line 181

def received
  @received ||= Hash.new { |hash, message_type| hash[message_type] = Array.new }
end

#received?(message_type, times = 1) ⇒ Boolean

Check if messages of given type were received at_least n times

Returns:

  • (Boolean)


186
187
188
189
190
# File 'lib/ib/connection.rb', line 186

def received? message_type, times=1
  @receive_lock.synchronize do
    received[message_type].size >= times
  end
end

#satisfied?(*conditions) ⇒ Boolean

Check if all given conditions are satisfied

Returns:

  • (Boolean)


193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/ib/connection.rb', line 193

def satisfied? *conditions
  !conditions.empty? &&
      conditions.inject(true) do |result, condition|
        result && if condition.is_a?(Symbol)
                    received?(condition)
                  elsif condition.is_a?(Array)
                    received?(*condition)
                  elsif condition.respond_to?(:call)
                    condition.call
                  else
                    error "Unknown wait condition #{condition}"
                  end
      end
end

#send_message(what, *args) ⇒ Object Also known as: dispatch

Send an outgoing message.



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/ib/connection.rb', line 279

def send_message what, *args
  message =
      case
        when what.is_a?(Messages::Outgoing::AbstractMessage)
          what
        when what.is_a?(Class) && what < Messages::Outgoing::AbstractMessage
          what.new *args
        when what.is_a?(Symbol)
          Messages::Outgoing.const_get(what).new *args
        else
          error "Only able to send outgoing IB messages", :args
      end
  error "Not able to send messages, IB not connected!" unless connected?
  message.send_to server
end

#socketObject



109
110
111
# File 'lib/ib/connection.rb', line 109

def socket
  server[:socket]
end

#start_readerObject

Start reader thread that continuously reads messages from server in background. If you don’t start reader, you should manually poll @socket for messages or use #process_messages(msec) API.



230
231
232
233
234
235
236
# File 'lib/ib/connection.rb', line 230

def start_reader
  Thread.abort_on_exception = true
  @reader_running = true
  server[:reader] = Thread.new do
    process_messages while @reader_running
  end
end

#subscribe(*args, &block) ⇒ Object

Subscribe Proc or block to specific type(s) of incoming message events. Listener will be called later with received message instance as its argument. Returns subscriber id to allow unsubscribing



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/ib/connection.rb', line 118

def subscribe *args, &block
  @subscribe_lock.synchronize do
    subscriber = args.last.respond_to?(:call) ? args.pop : block
    id = random_id

    error "Need subscriber proc or block", :args unless subscriber.is_a? Proc

    args.each do |what|
      message_classes =
          case
            when what.is_a?(Class) && what < Messages::Incoming::AbstractMessage
              [what]
            when what.is_a?(Symbol)
              [Messages::Incoming.const_get(what)]
            when what.is_a?(Regexp)
              Messages::Incoming::Classes.values.find_all { |klass| klass.to_s =~ what }
            else
              error "#{what} must represent incoming IB message class", :args
          end
      message_classes.flatten.each do |message_class|
        # TODO: Fix: RuntimeError: can't add a new key into hash during iteration
        subscribers[message_class][id] = subscriber
      end
    end
    id
  end
end

#subscribersObject

Message subscribers. Key is the message class to listen for. Value is a Hash of subscriber Procs, keyed by their subscription id. All subscriber Procs will be called with the message instance as an argument when a message of that type is received.



163
164
165
# File 'lib/ib/connection.rb', line 163

def subscribers
  @subscribers ||= Hash.new { |hash, subs| hash[subs] = Hash.new }
end

#unsubscribe(*ids) ⇒ Object

Remove all subscribers with specific subscriber id (TODO: multiple ids)



147
148
149
150
151
152
153
154
155
156
157
# File 'lib/ib/connection.rb', line 147

def unsubscribe *ids
  @subscribe_lock.synchronize do
    removed = []
    ids.each do |id|
      removed_at_id = subscribers.map { |_, subscribers| subscribers.delete id }.compact
      error "No subscribers with id #{id}" if removed_at_id.empty?
      removed << removed_at_id
    end
    removed.flatten
  end
end

#wait_for(*args, &block) ⇒ Object

Wait for specific condition(s) - given as callable/block, or message type(s) - given as Symbol or [Symbol, times] pair. Timeout after given time or 1 second.



211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/ib/connection.rb', line 211

def wait_for *args, &block
  timeout = args.find { |arg| arg.is_a? Numeric } # extract timeout from args
  end_time = Time.now + (timeout || 1) # default timeout 1 sec
  conditions = args.delete_if { |arg| arg.is_a? Numeric }.push(block).compact

  until end_time < Time.now || satisfied?(*conditions)
    if server[:reader]
      sleep 0.05
    else
      process_messages 50
    end
  end
end