Source code for ib_insync.wrapper

"""Wrapper to handle incoming messages."""

import asyncio
import logging
from collections import defaultdict
from contextlib import suppress
from datetime import datetime, timezone
from typing import (Any, Dict, List, Optional, Set, TYPE_CHECKING, Tuple,
                    Union, cast)

from ib_insync.contract import (
    Contract, ContractDescription, ContractDetails, DeltaNeutralContract,
    ScanData)
from ib_insync.objects import (
    AccountValue, BarData, BarDataList, CommissionReport, DOMLevel,
    DepthMktDataDescription, Dividends, Execution, FamilyCode, Fill,
    FundamentalRatios, HistogramData, HistoricalNews, HistoricalSchedule,
    HistoricalSession, HistoricalTick, HistoricalTickBidAsk,
    HistoricalTickLast, MktDepthData, NewsArticle, NewsBulletin, NewsProvider,
    NewsTick, OptionChain, OptionComputation, PnL, PnLSingle, PortfolioItem,
    Position, PriceIncrement, RealTimeBar, RealTimeBarList, SoftDollarTier,
    TickAttribBidAsk, TickAttribLast, TickByTickAllLast, TickByTickBidAsk,
    TickByTickMidPoint, TickData, TradeLogEntry)
from ib_insync.order import Order, OrderState, OrderStatus, Trade
from ib_insync.ticker import Ticker
from ib_insync.util import (
    UNSET_DOUBLE, UNSET_INTEGER, dataclassAsDict, dataclassUpdate,
    getLoop, globalErrorEvent, isNan, parseIBDatetime)

if TYPE_CHECKING:
    from ib_insync.ib import IB


OrderKeyType = Union[int, Tuple[int, int]]


[docs] class RequestError(Exception): """ Exception to raise when the API reports an error that can be tied to a single request. """ def __init__(self, reqId: int, code: int, message: str): """ Args: reqId: Original request ID. code: Original error code. message: Original error message. """ super().__init__(f'API error: {code}: {message}') self.reqId = reqId self.code = code self.message = message
class Wrapper: """Wrapper implementation for use with the IB class.""" ib: 'IB' accountValues: Dict[tuple, AccountValue] """ (account, tag, currency, modelCode) -> AccountValue """ acctSummary: Dict[tuple, AccountValue] """ (account, tag, currency) -> AccountValue """ portfolio: Dict[str, Dict[int, PortfolioItem]] """ account -> conId -> PortfolioItem """ positions: Dict[str, Dict[int, Position]] """ account -> conId -> Position """ trades: Dict[OrderKeyType, Trade] """ (client, orderId) or permId -> Trade """ permId2Trade: Dict[int, Trade] """ permId -> Trade """ fills: Dict[str, Fill] """ execId -> Fill """ newsTicks: List[NewsTick] msgId2NewsBulletin: Dict[int, NewsBulletin] """ msgId -> NewsBulletin """ tickers: Dict[int, Ticker] """ id(Contract) -> Ticker """ pendingTickers: Set[Ticker] reqId2Ticker: Dict[int, Ticker] """ reqId -> Ticker """ ticker2ReqId: Dict[Union[int, str], Dict[Ticker, int]] """ tickType -> Ticker -> reqId """ reqId2Subscriber: Dict[int, Any] """ live bars or live scan data """ reqId2PnL: Dict[int, PnL] """ reqId -> PnL """ reqId2PnlSingle: Dict[int, PnLSingle] """ reqId -> PnLSingle """ pnlKey2ReqId: Dict[tuple, int] """ (account, modelCode) -> reqId """ pnlSingleKey2ReqId: Dict[tuple, int] """ (account, modelCode, conId) -> reqId """ lastTime: datetime """ UTC time of last network packet arrival. """ accounts: List[str] clientId: int wshMetaReqId: int wshEventReqId: int _reqId2Contract: Dict[int, Contract] _timeout: float _futures: Dict[Any, asyncio.Future] """ _futures and _results are linked by key. """ _results: Dict[Any, Any] """ _futures and _results are linked by key. """ _logger: logging.Logger _timeoutHandle: Union[asyncio.TimerHandle, None] def __init__(self, ib: 'IB'): self.ib = ib self._logger = logging.getLogger('ib_insync.wrapper') self._timeoutHandle = None self.reset() def reset(self): self.accountValues = {} self.acctSummary = {} self.portfolio = defaultdict(dict) self.positions = defaultdict(dict) self.trades = {} self.permId2Trade = {} self.fills = {} self.newsTicks = [] self.msgId2NewsBulletin = {} self.tickers = {} self.pendingTickers = set() self.reqId2Ticker = {} self.ticker2ReqId = defaultdict(dict) self.reqId2Subscriber = {} self.reqId2PnL = {} self.reqId2PnlSingle = {} self.pnlKey2ReqId = {} self.pnlSingleKey2ReqId = {} self.lastTime = datetime.min self.accounts = [] self.clientId = -1 self.wshMetaReqId = 0 self.wshEventReqId = 0 self._reqId2Contract = {} self._timeout = 0 self._futures = {} self._results = {} self.setTimeout(0) def setEventsDone(self): """Set all subscription-type events as done.""" events = [ticker.updateEvent for ticker in self.tickers.values()] events += [sub.updateEvent for sub in self.reqId2Subscriber.values()] for trade in self.trades.values(): events += [ trade.statusEvent, trade.modifyEvent, trade.fillEvent, trade.filledEvent, trade.commissionReportEvent, trade.cancelEvent, trade.cancelledEvent] for event in events: event.set_done() def connectionClosed(self): error = ConnectionError('Socket disconnect') for future in self._futures.values(): if not future.done(): future.set_exception(error) globalErrorEvent.emit(error) self.reset() def startReq(self, key, contract=None, container=None): """ Start a new request and return the future that is associated with the key and container. The container is a list by default. """ future: asyncio.Future = asyncio.Future() self._futures[key] = future self._results[key] = container if container is not None else [] if contract: self._reqId2Contract[key] = contract return future def _endReq(self, key, result=None, success=True): """ Finish the future of corresponding key with the given result. If no result is given then it will be popped of the general results. """ future = self._futures.pop(key, None) self._reqId2Contract.pop(key, None) if future: if result is None: result = self._results.pop(key, []) if not future.done(): if success: future.set_result(result) else: future.set_exception(result) def startTicker( self, reqId: int, contract: Contract, tickType: Union[int, str]): """ Start a tick request that has the reqId associated with the contract. Return the ticker. """ ticker = self.tickers.get(id(contract)) if not ticker: ticker = Ticker( contract=contract, ticks=[], tickByTicks=[], domBids=[], domAsks=[], domTicks=[]) self.tickers[id(contract)] = ticker self.reqId2Ticker[reqId] = ticker self._reqId2Contract[reqId] = contract self.ticker2ReqId[tickType][ticker] = reqId return ticker def endTicker(self, ticker: Ticker, tickType: Union[int, str]): reqId = self.ticker2ReqId[tickType].pop(ticker, 0) self._reqId2Contract.pop(reqId, None) return reqId def startSubscription(self, reqId, subscriber, contract=None): """Register a live subscription.""" self._reqId2Contract[reqId] = contract self.reqId2Subscriber[reqId] = subscriber def endSubscription(self, subscriber): """Unregister a live subscription.""" self._reqId2Contract.pop(subscriber.reqId, None) self.reqId2Subscriber.pop(subscriber.reqId, None) def orderKey(self, clientId: int, orderId: int, permId: int) -> \ OrderKeyType: key: OrderKeyType if orderId <= 0: # order is placed manually from TWS key = permId else: key = (clientId, orderId) return key def setTimeout(self, timeout: float): self.lastTime = datetime.now(timezone.utc) if self._timeoutHandle: self._timeoutHandle.cancel() self._timeoutHandle = None self._timeout = timeout if timeout: self._setTimer(timeout) def _setTimer(self, delay: float = 0): if self.lastTime == datetime.min: return now = datetime.now(timezone.utc) diff = (now - self.lastTime).total_seconds() if not delay: delay = self._timeout - diff if delay > 0: loop = getLoop() self._timeoutHandle = loop.call_later(delay, self._setTimer) else: self._logger.debug('Timeout') self.setTimeout(0) self.ib.timeoutEvent.emit(diff) # wrapper methods def connectAck(self): pass def nextValidId(self, reqId: int): pass def managedAccounts(self, accountsList: str): self.accounts = [a for a in accountsList.split(',') if a] def updateAccountTime(self, timestamp: str): pass def updateAccountValue( self, tag: str, val: str, currency: str, account: str): key = (account, tag, currency, '') acctVal = AccountValue(account, tag, val, currency, '') self.accountValues[key] = acctVal self.ib.accountValueEvent.emit(acctVal) def accountDownloadEnd(self, _account: str): # sent after updateAccountValue and updatePortfolio both finished self._endReq('accountValues') def accountUpdateMulti( self, reqId: int, account: str, modelCode: str, tag: str, val: str, currency: str): key = (account, tag, currency, modelCode) acctVal = AccountValue(account, tag, val, currency, modelCode) self.accountValues[key] = acctVal self.ib.accountValueEvent.emit(acctVal) def accountUpdateMultiEnd(self, reqId: int): self._endReq(reqId) def accountSummary( self, _reqId: int, account: str, tag: str, value: str, currency: str): key = (account, tag, currency) acctVal = AccountValue(account, tag, value, currency, '') self.acctSummary[key] = acctVal self.ib.accountSummaryEvent.emit(acctVal) def accountSummaryEnd(self, reqId: int): self._endReq(reqId) def updatePortfolio( self, contract: Contract, posSize: float, marketPrice: float, marketValue: float, averageCost: float, unrealizedPNL: float, realizedPNL: float, account: str): contract = Contract.create(**dataclassAsDict(contract)) portfItem = PortfolioItem( contract, posSize, marketPrice, marketValue, averageCost, unrealizedPNL, realizedPNL, account) portfolioItems = self.portfolio[account] if posSize == 0: portfolioItems.pop(contract.conId, None) else: portfolioItems[contract.conId] = portfItem self._logger.info(f'updatePortfolio: {portfItem}') self.ib.updatePortfolioEvent.emit(portfItem) def position( self, account: str, contract: Contract, posSize: float, avgCost: float): contract = Contract.create(**dataclassAsDict(contract)) position = Position(account, contract, posSize, avgCost) positions = self.positions[account] if posSize == 0: positions.pop(contract.conId, None) else: positions[contract.conId] = position self._logger.info(f'position: {position}') results = self._results.get('positions') if results is not None: results.append(position) self.ib.positionEvent.emit(position) def positionEnd(self): self._endReq('positions') def positionMulti( self, reqId: int, account: str, modelCode: str, contract: Contract, pos: float, avgCost: float): pass def positionMultiEnd(self, reqId: int): pass def pnl( self, reqId: int, dailyPnL: float, unrealizedPnL: float, realizedPnL: float): pnl = self.reqId2PnL.get(reqId) if not pnl: return pnl.dailyPnL = dailyPnL pnl.unrealizedPnL = unrealizedPnL pnl.realizedPnL = realizedPnL self.ib.pnlEvent.emit(pnl) def pnlSingle( self, reqId: int, pos: int, dailyPnL: float, unrealizedPnL: float, realizedPnL: float, value: float): pnlSingle = self.reqId2PnlSingle.get(reqId) if not pnlSingle: return pnlSingle.position = pos pnlSingle.dailyPnL = dailyPnL pnlSingle.unrealizedPnL = unrealizedPnL pnlSingle.realizedPnL = realizedPnL pnlSingle.value = value self.ib.pnlSingleEvent.emit(pnlSingle) def openOrder( self, orderId: int, contract: Contract, order: Order, orderState: OrderState): """ This wrapper is called to: * feed in open orders at startup; * feed in open orders or order updates from other clients and TWS if clientId=master id; * feed in manual orders and order updates from TWS if clientId=0; * handle openOrders and allOpenOrders responses. """ if order.whatIf: # response to whatIfOrder if orderState.initMarginChange != str(UNSET_DOUBLE): self._endReq(order.orderId, orderState) else: key = self.orderKey(order.clientId, order.orderId, order.permId) trade = self.trades.get(key) if trade: trade.order.permId = order.permId trade.order.totalQuantity = order.totalQuantity trade.order.lmtPrice = order.lmtPrice trade.order.auxPrice = order.auxPrice trade.order.orderType = order.orderType trade.order.orderRef = order.orderRef else: # ignore '?' values in the order order = Order(**{ k: v for k, v in dataclassAsDict(order).items() if v != '?'}) contract = Contract.create(**dataclassAsDict(contract)) orderStatus = OrderStatus( orderId=orderId, status=orderState.status) trade = Trade(contract, order, orderStatus, [], []) self.trades[key] = trade self._logger.info(f'openOrder: {trade}') self.permId2Trade.setdefault(order.permId, trade) results = self._results.get('openOrders') if results is None: self.ib.openOrderEvent.emit(trade) else: # response to reqOpenOrders or reqAllOpenOrders results.append(trade) # make sure that the client issues order ids larger than any # order id encountered (even from other clients) to avoid # "Duplicate order id" error self.ib.client.updateReqId(orderId + 1) def openOrderEnd(self): self._endReq('openOrders') def completedOrder( self, contract: Contract, order: Order, orderState: OrderState): contract = Contract.create(**dataclassAsDict(contract)) orderStatus = OrderStatus( orderId=order.orderId, status=orderState.status) trade = Trade(contract, order, orderStatus, [], []) self._results['completedOrders'].append(trade) if order.permId not in self.permId2Trade: self.trades[order.permId] = trade self.permId2Trade[order.permId] = trade def completedOrdersEnd(self): self._endReq('completedOrders') def orderStatus( self, orderId: int, status: str, filled: float, remaining: float, avgFillPrice: float, permId: int, parentId: int, lastFillPrice: float, clientId: int, whyHeld: str, mktCapPrice: float = 0.0): key = self.orderKey(clientId, orderId, permId) trade = self.trades.get(key) if trade: msg: Optional[str] oldStatus = trade.orderStatus.status new = dict( status=status, filled=filled, remaining=remaining, avgFillPrice=avgFillPrice, permId=permId, parentId=parentId, lastFillPrice=lastFillPrice, clientId=clientId, whyHeld=whyHeld, mktCapPrice=mktCapPrice) curr = dataclassAsDict(trade.orderStatus) isChanged = curr != {**curr, **new} if isChanged: dataclassUpdate(trade.orderStatus, **new) msg = '' elif (status == 'Submitted' and trade.log and trade.log[-1].message == 'Modify'): # order modifications are acknowledged msg = 'Modified' else: msg = None if msg is not None: logEntry = TradeLogEntry(self.lastTime, status, msg) trade.log.append(logEntry) self._logger.info(f'orderStatus: {trade}') self.ib.orderStatusEvent.emit(trade) trade.statusEvent.emit(trade) if status != oldStatus: if status == OrderStatus.Filled: trade.filledEvent.emit(trade) elif status == OrderStatus.Cancelled: trade.cancelledEvent.emit(trade) else: self._logger.error( 'orderStatus: No order found for ' 'orderId %s and clientId %s', orderId, clientId) def execDetails( self, reqId: int, contract: Contract, execution: Execution): """ This wrapper handles both live fills and responses to reqExecutions. """ self._logger.info(f'execDetails {execution}') if execution.orderId == UNSET_INTEGER: # bug in TWS: executions of manual orders have unset value execution.orderId = 0 trade = self.permId2Trade.get(execution.permId) if not trade: key = self.orderKey( execution.clientId, execution.orderId, execution.permId) trade = self.trades.get(key) if trade and contract == trade.contract: contract = trade.contract else: contract = Contract.create(**dataclassAsDict(contract)) execId = execution.execId isLive = reqId not in self._futures time = self.lastTime if isLive else execution.time fill = Fill(contract, execution, CommissionReport(), time) if execId not in self.fills: # first time we see this execution so add it self.fills[execId] = fill if trade: trade.fills.append(fill) logEntry = TradeLogEntry( time, trade.orderStatus.status, f'Fill {execution.shares}@{execution.price}') trade.log.append(logEntry) if isLive: self._logger.info(f'execDetails: {fill}') self.ib.execDetailsEvent.emit(trade, fill) trade.fillEvent(trade, fill) if not isLive: self._results[reqId].append(fill) def execDetailsEnd(self, reqId: int): self._endReq(reqId) def commissionReport(self, commissionReport: CommissionReport): if commissionReport.yield_ == UNSET_DOUBLE: commissionReport.yield_ = 0.0 if commissionReport.realizedPNL == UNSET_DOUBLE: commissionReport.realizedPNL = 0.0 fill = self.fills.get(commissionReport.execId) if fill: report = dataclassUpdate(fill.commissionReport, commissionReport) self._logger.info(f'commissionReport: {report}') trade = self.permId2Trade.get(fill.execution.permId) if trade: self.ib.commissionReportEvent.emit(trade, fill, report) trade.commissionReportEvent.emit(trade, fill, report) else: # this is not a live execution and the order was filled # before this connection started pass else: # commission report is not for this client pass def orderBound(self, reqId: int, apiClientId: int, apiOrderId: int): pass def contractDetails(self, reqId: int, contractDetails: ContractDetails): self._results[reqId].append(contractDetails) bondContractDetails = contractDetails def contractDetailsEnd(self, reqId: int): self._endReq(reqId) def symbolSamples( self, reqId: int, contractDescriptions: List[ContractDescription]): self._endReq(reqId, contractDescriptions) def marketRule( self, marketRuleId: int, priceIncrements: List[PriceIncrement]): self._endReq(f'marketRule-{marketRuleId}', priceIncrements) def marketDataType(self, reqId: int, marketDataId: int): ticker = self.reqId2Ticker.get(reqId) if ticker: ticker.marketDataType = marketDataId def realtimeBar( self, reqId: int, time: int, open_: float, high: float, low: float, close: float, volume: float, wap: float, count: int): dt = datetime.fromtimestamp(time, timezone.utc) bar = RealTimeBar(dt, -1, open_, high, low, close, volume, wap, count) bars = self.reqId2Subscriber.get(reqId) if bars is not None: bars.append(bar) self.ib.barUpdateEvent.emit(bars, True) bars.updateEvent.emit(bars, True) def historicalData(self, reqId: int, bar: BarData): results = self._results.get(reqId) if results is not None: bar.date = parseIBDatetime(bar.date) # type: ignore results.append(bar) def historicalDataEnd(self, reqId, _start: str, _end: str): self._endReq(reqId) def historicalDataUpdate(self, reqId: int, bar: BarData): bars = self.reqId2Subscriber.get(reqId) if not bars: return bar.date = parseIBDatetime(bar.date) # type: ignore lastDate = bars[-1].date if bar.date < lastDate: return hasNewBar = len(bars) == 0 or bar.date > lastDate if hasNewBar: bars.append(bar) elif bars[-1] != bar: bars[-1] = bar else: return self.ib.barUpdateEvent.emit(bars, hasNewBar) bars.updateEvent.emit(bars, hasNewBar) def headTimestamp(self, reqId: int, headTimestamp: str): try: dt = parseIBDatetime(headTimestamp) self._endReq(reqId, dt) except ValueError as exc: self._endReq(reqId, exc, False) def historicalTicks( self, reqId: int, ticks: List[HistoricalTick], done: bool): result = self._results.get(reqId) if result is not None: result += ticks if done: self._endReq(reqId) def historicalTicksBidAsk( self, reqId: int, ticks: List[HistoricalTickBidAsk], done: bool): result = self._results.get(reqId) if result is not None: result += ticks if done: self._endReq(reqId) def historicalTicksLast( self, reqId: int, ticks: List[HistoricalTickLast], done: bool): result = self._results.get(reqId) if result is not None: result += ticks if done: self._endReq(reqId) # additional wrapper method provided by Client def priceSizeTick( self, reqId: int, tickType: int, price: float, size: float): ticker = self.reqId2Ticker.get(reqId) if not ticker: self._logger.error(f'priceSizeTick: Unknown reqId: {reqId}') return # https://interactivebrokers.github.io/tws-api/tick_types.html if tickType in (1, 66): if price == ticker.bid and size == ticker.bidSize: return if price != ticker.bid: ticker.prevBid = ticker.bid ticker.bid = price if size != ticker.bidSize: ticker.prevBidSize = ticker.bidSize ticker.bidSize = size elif tickType in (2, 67): if price == ticker.ask and size == ticker.askSize: return if price != ticker.ask: ticker.prevAsk = ticker.ask ticker.ask = price if size != ticker.askSize: ticker.prevAskSize = ticker.askSize ticker.askSize = size elif tickType in (4, 68): if price != ticker.last: ticker.prevLast = ticker.last ticker.last = price if size != ticker.lastSize: ticker.prevLastSize = ticker.lastSize ticker.lastSize = size elif tickType in (6, 72): ticker.high = price elif tickType in (7, 73): ticker.low = price elif tickType in (9, 75): ticker.close = price elif tickType in (14, 76): ticker.open = price elif tickType == 15: ticker.low13week = price elif tickType == 16: ticker.high13week = price elif tickType == 17: ticker.low26week = price elif tickType == 18: ticker.high26week = price elif tickType == 19: ticker.low52week = price elif tickType == 20: ticker.high52week = price elif tickType == 35: ticker.auctionPrice = price elif tickType == 37: ticker.markPrice = price elif tickType in (50, 103): ticker.bidYield = price elif tickType in (51, 104): ticker.askYield = price elif tickType == 52: ticker.lastYield = price if price or size: tick = TickData(self.lastTime, tickType, price, size) ticker.ticks.append(tick) self.pendingTickers.add(ticker) def tickSize(self, reqId: int, tickType: int, size: float): ticker = self.reqId2Ticker.get(reqId) if not ticker: self._logger.error(f'tickSize: Unknown reqId: {reqId}') return price = -1.0 # https://interactivebrokers.github.io/tws-api/tick_types.html if tickType in (0, 69): if size == ticker.bidSize: return price = ticker.bid ticker.prevBidSize = ticker.bidSize ticker.bidSize = size elif tickType in (3, 70): if size == ticker.askSize: return price = ticker.ask ticker.prevAskSize = ticker.askSize ticker.askSize = size elif tickType in (5, 71): price = ticker.last if isNan(price): return if size != ticker.lastSize: ticker.prevLastSize = ticker.lastSize ticker.lastSize = size elif tickType in (8, 74): ticker.volume = size elif tickType == 21: ticker.avVolume = size elif tickType == 27: ticker.callOpenInterest = size elif tickType == 28: ticker.putOpenInterest = size elif tickType == 29: ticker.callVolume = size elif tickType == 30: ticker.putVolume = size elif tickType == 34: ticker.auctionVolume = size elif tickType == 36: ticker.auctionImbalance = size elif tickType == 61: ticker.regulatoryImbalance = size elif tickType == 86: ticker.futuresOpenInterest = size elif tickType == 87: ticker.avOptionVolume = size elif tickType == 89: ticker.shortableShares = size if price or size: tick = TickData(self.lastTime, tickType, price, size) ticker.ticks.append(tick) self.pendingTickers.add(ticker) def tickSnapshotEnd(self, reqId: int): self._endReq(reqId) def tickByTickAllLast( self, reqId: int, tickType: int, time: int, price: float, size: float, tickAttribLast: TickAttribLast, exchange, specialConditions): ticker = self.reqId2Ticker.get(reqId) if not ticker: self._logger.error(f'tickByTickAllLast: Unknown reqId: {reqId}') return if price != ticker.last: ticker.prevLast = ticker.last ticker.last = price if size != ticker.lastSize: ticker.prevLastSize = ticker.lastSize ticker.lastSize = size tick = TickByTickAllLast( tickType, self.lastTime, price, size, tickAttribLast, exchange, specialConditions) ticker.tickByTicks.append(tick) self.pendingTickers.add(ticker) def tickByTickBidAsk( self, reqId: int, time: int, bidPrice: float, askPrice: float, bidSize: float, askSize: float, tickAttribBidAsk: TickAttribBidAsk): ticker = self.reqId2Ticker.get(reqId) if not ticker: self._logger.error(f'tickByTickBidAsk: Unknown reqId: {reqId}') return if bidPrice != ticker.bid: ticker.prevBid = ticker.bid ticker.bid = bidPrice if bidSize != ticker.bidSize: ticker.prevBidSize = ticker.bidSize ticker.bidSize = bidSize if askPrice != ticker.ask: ticker.prevAsk = ticker.ask ticker.ask = askPrice if askSize != ticker.askSize: ticker.prevAskSize = ticker.askSize ticker.askSize = askSize tick = TickByTickBidAsk( self.lastTime, bidPrice, askPrice, bidSize, askSize, tickAttribBidAsk) ticker.tickByTicks.append(tick) self.pendingTickers.add(ticker) def tickByTickMidPoint(self, reqId: int, time: int, midPoint: float): ticker = self.reqId2Ticker.get(reqId) if not ticker: self._logger.error(f'tickByTickMidPoint: Unknown reqId: {reqId}') return tick = TickByTickMidPoint(self.lastTime, midPoint) ticker.tickByTicks.append(tick) self.pendingTickers.add(ticker) def tickString(self, reqId: int, tickType: int, value: str): ticker = self.reqId2Ticker.get(reqId) if not ticker: return try: if tickType == 32: ticker.bidExchange = value elif tickType == 33: ticker.askExchange = value elif tickType == 84: ticker.lastExchange = value elif tickType == 47: # https://web.archive.org/web/20200725010343/https://interactivebrokers.github.io/tws-api/fundamental_ratios_tags.html d = dict(t.split('=') # type: ignore for t in value.split(';') if t) # type: ignore for k, v in d.items(): with suppress(ValueError): if v == '-99999.99': v = 'nan' d[k] = float(v) # type: ignore d[k] = int(v) # type: ignore ticker.fundamentalRatios = FundamentalRatios(**d) elif tickType in (48, 77): # RT Volume or RT Trade Volume string format: # price;size;ms since epoch;total volume;VWAP;single trade # example: # 701.28;1;1348075471534;67854;701.46918464;true priceStr, sizeStr, rtTime, volume, vwap, _ = value.split(';') if volume: if tickType == 48: ticker.rtVolume = float(volume) elif tickType == 77: ticker.rtTradeVolume = float(volume) if vwap: ticker.vwap = float(vwap) if rtTime: ticker.rtTime = datetime.fromtimestamp( int(rtTime) / 1000, timezone.utc) if priceStr == '': return price = float(priceStr) size = float(sizeStr) if price and size: if ticker.prevLast != ticker.last: ticker.prevLast = ticker.last ticker.last = price if ticker.prevLastSize != ticker.lastSize: ticker.prevLastSize = ticker.lastSize ticker.lastSize = size tick = TickData(self.lastTime, tickType, price, size) ticker.ticks.append(tick) elif tickType == 59: # Dividend tick: # https://interactivebrokers.github.io/tws-api/tick_types.html#ib_dividends # example value: '0.83,0.92,20130219,0.23' past12, next12, nextDate, nextAmount = value.split(',') ticker.dividends = Dividends( float(past12) if past12 else None, float(next12) if next12 else None, parseIBDatetime(nextDate) if nextDate else None, float(nextAmount) if nextAmount else None) self.pendingTickers.add(ticker) except ValueError: self._logger.error( f'tickString with tickType {tickType}: ' f'malformed value: {value!r}') def tickGeneric(self, reqId: int, tickType: int, value: float): ticker = self.reqId2Ticker.get(reqId) if not ticker: return try: value = float(value) except ValueError: self._logger.error(f'genericTick: malformed value: {value!r}') return if tickType == 23: ticker.histVolatility = value elif tickType == 24: ticker.impliedVolatility = value elif tickType == 31: ticker.indexFuturePremium = value elif tickType == 49: ticker.halted = value elif tickType == 54: ticker.tradeCount = value elif tickType == 55: ticker.tradeRate = value elif tickType == 56: ticker.volumeRate = value elif tickType == 58: ticker.rtHistVolatility = value tick = TickData(self.lastTime, tickType, value, 0) ticker.ticks.append(tick) self.pendingTickers.add(ticker) def tickReqParams( self, reqId: int, minTick: float, bboExchange: str, snapshotPermissions: int): ticker = self.reqId2Ticker.get(reqId) if not ticker: return ticker.minTick = minTick ticker.bboExchange = bboExchange ticker.snapshotPermissions = snapshotPermissions def smartComponents(self, reqId, components): self._endReq(reqId, components) def mktDepthExchanges( self, depthMktDataDescriptions: List[DepthMktDataDescription]): self._endReq('mktDepthExchanges', depthMktDataDescriptions) def updateMktDepth( self, reqId: int, position: int, operation: int, side: int, price: float, size: float): self.updateMktDepthL2( reqId, position, '', operation, side, price, size) def updateMktDepthL2( self, reqId: int, position: int, marketMaker: str, operation: int, side: int, price: float, size: float, isSmartDepth: bool = False): # operation: 0 = insert, 1 = update, 2 = delete # side: 0 = ask, 1 = bid ticker = self.reqId2Ticker[reqId] dom = ticker.domBids if side else ticker.domAsks if operation == 0: dom.insert(position, DOMLevel(price, size, marketMaker)) elif operation == 1: dom[position] = DOMLevel(price, size, marketMaker) elif operation == 2: if position < len(dom): level = dom.pop(position) price = level.price size = 0 tick = MktDepthData( self.lastTime, position, marketMaker, operation, side, price, size) ticker.domTicks.append(tick) self.pendingTickers.add(ticker) def tickOptionComputation( self, reqId: int, tickType: int, tickAttrib: int, impliedVol: float, delta: float, optPrice: float, pvDividend: float, gamma: float, vega: float, theta: float, undPrice: float): comp = OptionComputation( tickAttrib, impliedVol if impliedVol != -1 else None, delta if delta != -2 else None, optPrice if optPrice != -1 else None, pvDividend if pvDividend != -1 else None, gamma if gamma != -2 else None, vega if vega != -2 else vega, theta if theta != -2 else theta, undPrice if undPrice != -1 else None) ticker = self.reqId2Ticker.get(reqId) if ticker: # reply from reqMktData # https://interactivebrokers.github.io/tws-api/tick_types.html if tickType in (10, 80): ticker.bidGreeks = comp elif tickType in (11, 81): ticker.askGreeks = comp elif tickType in (12, 82): ticker.lastGreeks = comp elif tickType in (13, 83): ticker.modelGreeks = comp self.pendingTickers.add(ticker) elif reqId in self._futures: # reply from calculateImpliedVolatility or calculateOptionPrice self._endReq(reqId, comp) else: self._logger.error( f'tickOptionComputation: Unknown reqId: {reqId}') def deltaNeutralValidation(self, reqId: int, dnc: DeltaNeutralContract): pass def fundamentalData(self, reqId: int, data: str): self._endReq(reqId, data) def scannerParameters(self, xml: str): self._endReq('scannerParams', xml) def scannerData( self, reqId: int, rank: int, contractDetails: ContractDetails, distance: str, benchmark: str, projection: str, legsStr: str): data = ScanData( rank, contractDetails, distance, benchmark, projection, legsStr) dataList = self.reqId2Subscriber.get(reqId) if dataList is None: dataList = self._results.get(reqId) if dataList is not None: if rank == 0: dataList.clear() dataList.append(data) def scannerDataEnd(self, reqId: int): dataList = self._results.get(reqId) if dataList is not None: self._endReq(reqId) else: dataList = self.reqId2Subscriber.get(reqId) if dataList is not None: self.ib.scannerDataEvent.emit(dataList) dataList.updateEvent.emit(dataList) def histogramData(self, reqId: int, items: List[HistogramData]): result = [HistogramData(item.price, item.count) for item in items] self._endReq(reqId, result) def securityDefinitionOptionParameter( self, reqId: int, exchange: str, underlyingConId: int, tradingClass: str, multiplier: str, expirations: List[str], strikes: List[float]): chain = OptionChain( exchange, underlyingConId, tradingClass, multiplier, expirations, strikes) self._results[reqId].append(chain) def securityDefinitionOptionParameterEnd(self, reqId: int): self._endReq(reqId) def newsProviders(self, newsProviders: List[NewsProvider]): newsProviders = [ NewsProvider(code=p.code, name=p.name) for p in newsProviders] self._endReq('newsProviders', newsProviders) def tickNews( self, _reqId: int, timeStamp: int, providerCode: str, articleId: str, headline: str, extraData: str): news = NewsTick( timeStamp, providerCode, articleId, headline, extraData) self.newsTicks.append(news) self.ib.tickNewsEvent.emit(news) def newsArticle(self, reqId: int, articleType: int, articleText: str): article = NewsArticle(articleType, articleText) self._endReq(reqId, article) def historicalNews( self, reqId: int, time: str, providerCode: str, articleId: str, headline: str): dt = parseIBDatetime(time) dt = cast(datetime, dt) article = HistoricalNews(dt, providerCode, articleId, headline) self._results[reqId].append(article) def historicalNewsEnd(self, reqId, _hasMore: bool): self._endReq(reqId) def updateNewsBulletin( self, msgId: int, msgType: int, message: str, origExchange: str): bulletin = NewsBulletin(msgId, msgType, message, origExchange) self.msgId2NewsBulletin[msgId] = bulletin self.ib.newsBulletinEvent.emit(bulletin) def receiveFA(self, _faDataType: int, faXmlData: str): self._endReq('requestFA', faXmlData) def currentTime(self, time: int): dt = datetime.fromtimestamp(time, timezone.utc) self._endReq('currentTime', dt) def tickEFP( self, reqId: int, tickType: int, basisPoints: float, formattedBasisPoints: str, totalDividends: float, holdDays: int, futureLastTradeDate: str, dividendImpact: float, dividendsToLastTradeDate: float): pass def historicalSchedule( self, reqId: int, startDateTime: str, endDateTime: str, timeZone: str, sessions: List[HistoricalSession]): schedule = HistoricalSchedule( startDateTime, endDateTime, timeZone, sessions) self._endReq(reqId, schedule) def wshMetaData(self, reqId: int, dataJson: str): self.ib.wshMetaEvent.emit(dataJson) self._endReq(reqId, dataJson) def wshEventData(self, reqId: int, dataJson: str): self.ib.wshEvent.emit(dataJson) self._endReq(reqId, dataJson) def userInfo(self, reqId: int, whiteBrandingId: str): self._endReq(reqId) def softDollarTiers(self, reqId: int, tiers: List[SoftDollarTier]): pass def familyCodes(self, familyCodes: List[FamilyCode]): pass def error( self, reqId: int, errorCode: int, errorString: str, advancedOrderRejectJson: str): # https://interactivebrokers.github.io/tws-api/message_codes.html isRequest = reqId in self._futures trade = self.trades.get((self.clientId, reqId)) warningCodes = {110, 165, 202, 399, 404, 434, 492, 10167} isWarning = errorCode in warningCodes or 2100 <= errorCode < 2200 if errorCode == 110 and isRequest: # whatIf request failed isWarning = False if errorCode == 110 and trade and \ trade.orderStatus.status == OrderStatus.PendingSubmit: # invalid price for a new order must cancel it isWarning = False msg = ( f'{"Warning" if isWarning else "Error"} ' f'{errorCode}, reqId {reqId}: {errorString}') contract = self._reqId2Contract.get(reqId) if contract: msg += f', contract: {contract}' if isWarning: self._logger.info(msg) else: self._logger.error(msg) if isRequest: # the request failed if self.ib.RaiseRequestErrors: error = RequestError(reqId, errorCode, errorString) self._endReq(reqId, error, success=False) else: self._endReq(reqId) elif trade: # something is wrong with the order, cancel it if advancedOrderRejectJson: trade.advancedError = advancedOrderRejectJson if not trade.isDone(): status = trade.orderStatus.status = OrderStatus.Cancelled logEntry = TradeLogEntry( self.lastTime, status, msg, errorCode) trade.log.append(logEntry) self._logger.warning(f'Canceled order: {trade}') self.ib.orderStatusEvent.emit(trade) trade.statusEvent.emit(trade) trade.cancelledEvent.emit(trade) if errorCode == 165: # for scan data subscription there are no longer matching results dataList = self.reqId2Subscriber.get(reqId) if dataList: dataList.clear() dataList.updateEvent.emit(dataList) elif errorCode == 317: # Market depth data has been RESET ticker = self.reqId2Ticker.get(reqId) if ticker: # clear all DOM levels ticker.domTicks += [MktDepthData( self.lastTime, 0, '', 2, 0, level.price, 0) for level in ticker.domAsks] ticker.domTicks += [MktDepthData( self.lastTime, 0, '', 2, 1, level.price, 0) for level in ticker.domBids] ticker.domAsks.clear() ticker.domBids.clear() self.pendingTickers.add(ticker) elif errorCode == 10225: # Bust event occurred, current subscription is deactivated. # Please resubscribe real-time bars immediately bars = self.reqId2Subscriber.get(reqId) if isinstance(bars, RealTimeBarList): self.ib.client.cancelRealTimeBars(reqId) self.ib.client.reqRealTimeBars( reqId, bars.contract, bars.barSize, bars.whatToShow, bars.useRTH, bars.realTimeBarsOptions) elif isinstance(bars, BarDataList): self.ib.client.cancelHistoricalData(reqId) self.ib.client.reqHistoricalData( reqId, bars.contract, bars.endDateTime, bars.durationStr, bars.barSizeSetting, bars.whatToShow, bars.useRTH, bars.formatDate, bars.keepUpToDate, bars.chartOptions) self.ib.errorEvent.emit(reqId, errorCode, errorString, contract) def tcpDataArrived(self): self.lastTime = datetime.now(timezone.utc) for ticker in self.pendingTickers: ticker.ticks = [] ticker.tickByTicks = [] ticker.domTicks = [] self.pendingTickers = set() def tcpDataProcessed(self): self.ib.updateEvent.emit() if self.pendingTickers: for ticker in self.pendingTickers: ticker.time = self.lastTime ticker.updateEvent.emit(ticker) self.ib.pendingTickersEvent.emit(self.pendingTickers)