"""Wrapper to handle incoming messages."""
import asyncio
import logging
from collections import defaultdict
from contextlib import suppress
from datetime import datetime, timezone
from typing import (
Any, Dict, List, Optional, Set, Tuple, Union, cast)
from ib_insync.contract import (
Contract, ContractDescription, ContractDetails, DeltaNeutralContract,
ScanData)
from ib_insync.objects import (
AccountValue, BarData, BarDataList, CommissionReport,
DOMLevel, DepthMktDataDescription,
Dividends, Execution, Fill, FundamentalRatios, HistogramData,
HistoricalNews, HistoricalTick, HistoricalTickBidAsk, HistoricalTickLast,
MktDepthData, NewsArticle, NewsBulletin, NewsProvider, NewsTick,
OptionChain, OptionComputation, PnL, PnLSingle, PortfolioItem, Position,
PriceIncrement, RealTimeBar, RealTimeBarList, TickAttribBidAsk,
TickAttribLast, TickByTickAllLast, TickByTickBidAsk, TickByTickMidPoint,
TickData, TradeLogEntry)
from ib_insync.order import Order, OrderState, OrderStatus, Trade
from ib_insync.ticker import Ticker
from ib_insync.util import (
UNSET_DOUBLE, UNSET_INTEGER, dataclassAsDict, dataclassUpdate,
globalErrorEvent, isNan, parseIBDatetime)
OrderKeyType = Union[int, Tuple[int, int]]
[docs]class RequestError(Exception):
"""
Exception to raise when the API reports an error that can be tied to
a single request.
"""
def __init__(self, reqId: int, code: int, message: str):
"""
Args:
reqId: Original request ID.
code: Original error code.
message: Original error message.
"""
super().__init__(f'API error: {code}: {message}')
self.reqId = reqId
self.code = code
self.message = message
class Wrapper:
"""Wrapper implementation for use with the IB class."""
def __init__(self, ib):
self.ib = ib
self._logger = logging.getLogger('ib_insync.wrapper')
self._timeoutHandle = None
self.reset()
def reset(self):
self.accountValues: Dict[tuple, AccountValue] = {}
# (account, tag, currency, modelCode) -> AccountValue
self.acctSummary: Dict[tuple, AccountValue] = {}
# (account, tag, currency) -> AccountValue
self.portfolio: Dict[str, Dict[int, PortfolioItem]] = defaultdict(dict)
# account -> conId -> PortfolioItem
self.positions = defaultdict(dict)
# account -> conId -> Position
self.trades: Dict[OrderKeyType, Trade] = {}
# (client, orderId) or permId -> Trade
self.permId2Trade: Dict[int, Trade] = {}
# permId -> Trade
self.fills: Dict[str, Fill] = {}
# execId -> Fill
self.newsTicks: List[NewsTick] = []
self.msgId2NewsBulletin: Dict[int, NewsBulletin] = {}
# msgId -> NewsBulletin
self.tickers: Dict[int, Ticker] = {}
# id(Contract) -> Ticker
self.pendingTickers: Set[Ticker] = set()
self.reqId2Ticker: Dict[int, Ticker] = {}
# reqId -> Ticker
self.ticker2ReqId: Dict[Union[int, str], Dict[Ticker, int]] = \
defaultdict(dict)
# tickType -> Ticker -> reqId
self.reqId2MarketDataType: Dict[int, int] = {}
# reqId -> marketDataType
self.reqId2Subscriber: Dict[int, Any] = {}
# live bars or live scan data
self.reqId2PnL: Dict[int, PnL] = {}
# reqId -> PnL
self.reqId2PnlSingle: Dict[int, PnLSingle] = {}
# reqId -> PnLSingle
self.pnlKey2ReqId: Dict[tuple, int] = {}
# (account, modelCode) -> reqId
self.pnlSingleKey2ReqId: Dict[tuple, int] = {}
# (account, modelCode, conId) -> reqId
# futures and results are linked by key:
self._futures: Dict[Any, asyncio.Future] = {}
self._results: Dict[Any, Any] = {}
# UTC time of last network packet arrival:
self.lastTime: datetime = None
self._reqId2Contract: Dict[int, Contract] = {}
self.accounts: List[str] = []
self.clientId: int = -1
self._timeout: float = 0
self.setTimeout(0)
def setEventsDone(self):
"""Set all subscribtion-type events as done."""
events = [ticker.updateEvent for ticker in self.tickers.values()]
events += [sub.updateEvent for sub in self.reqId2Subscriber.values()]
for trade in self.trades.values():
events += [
trade.statusEvent, trade.modifyEvent, trade.fillEvent,
trade.filledEvent, trade.commissionReportEvent,
trade.cancelEvent, trade.cancelledEvent]
for event in events:
event.set_done()
def connectionClosed(self):
error = ConnectionError('Socket disconnect')
for future in self._futures.values():
if not future.done():
future.set_exception(error)
globalErrorEvent.emit(error)
self.reset()
def startReq(self, key, contract=None, container=None):
"""
Start a new request and return the future that is associated
with with the key and container. The container is a list by default.
"""
future = asyncio.Future()
self._futures[key] = future
self._results[key] = container if container is not None else []
if contract:
self._reqId2Contract[key] = contract
return future
def _endReq(self, key, result=None, success=True):
"""
Finish the future of corresponding key with the given result.
If no result is given then it will be popped of the general results.
"""
future = self._futures.pop(key, None)
self._reqId2Contract.pop(key, None)
if future:
if result is None:
result = self._results.pop(key, [])
if not future.done():
if success:
future.set_result(result)
else:
future.set_exception(result)
def startTicker(
self, reqId: int, contract: Contract, tickType: Union[int, str]):
"""
Start a tick request that has the reqId associated with the contract.
Return the ticker.
"""
ticker = self.tickers.get(id(contract))
if not ticker:
ticker = Ticker(
contract=contract, ticks=[], tickByTicks=[],
domBids=[], domAsks=[], domTicks=[])
self.tickers[id(contract)] = ticker
self.reqId2Ticker[reqId] = ticker
self._reqId2Contract[reqId] = contract
self.ticker2ReqId[tickType][ticker] = reqId
return ticker
def endTicker(self, ticker: Ticker, tickType: Union[int, str]):
reqId = self.ticker2ReqId[tickType].pop(ticker, 0)
self._reqId2Contract.pop(reqId, None)
return reqId
def startSubscription(self, reqId, subscriber, contract=None):
"""Register a live subscription."""
self._reqId2Contract[reqId] = contract
self.reqId2Subscriber[reqId] = subscriber
def endSubscription(self, subscriber):
"""Unregister a live subscription."""
self._reqId2Contract.pop(subscriber.reqId, None)
self.reqId2Subscriber.pop(subscriber.reqId, None)
def orderKey(self, clientId: int, orderId: int, permId: int) -> \
OrderKeyType:
key: OrderKeyType
if orderId <= 0:
# order is placed manually from TWS
key = permId
else:
key = (clientId, orderId)
return key
def setTimeout(self, timeout: float):
self.lastTime = datetime.now(timezone.utc)
if self._timeoutHandle:
self._timeoutHandle.cancel()
self._timeoutHandle = None
self._timeout = timeout
if timeout:
self._setTimer(timeout)
def _setTimer(self, delay: float = 0):
if not self.lastTime:
return
now = datetime.now(timezone.utc)
diff = (now - self.lastTime).total_seconds()
if not delay:
delay = self._timeout - diff
if delay > 0:
loop = asyncio.get_event_loop()
self._timeoutHandle = loop.call_later(delay, self._setTimer)
else:
self._logger.debug('Timeout')
self.setTimeout(0)
self.ib.timeoutEvent.emit(diff)
# wrapper methods
def connectAck(self):
pass
def nextValidId(self, reqId: int):
pass
def managedAccounts(self, accountsList: str):
self.accounts = [a for a in accountsList.split(',') if a]
def updateAccountTime(self, timestamp: str):
pass
def updateAccountValue(
self, tag: str, val: str, currency: str, account: str):
key = (account, tag, currency, '')
acctVal = AccountValue(account, tag, val, currency, '')
self.accountValues[key] = acctVal
self.ib.accountValueEvent.emit(acctVal)
def accountDownloadEnd(self, _account: str):
# sent after updateAccountValue and updatePortfolio both finished
self._endReq('accountValues')
def accountUpdateMulti(
self, reqId: int, account: str, modelCode: str, tag: str, val: str,
currency: str):
key = (account, tag, currency, modelCode)
acctVal = AccountValue(account, tag, val, currency, modelCode)
self.accountValues[key] = acctVal
self.ib.accountValueEvent.emit(acctVal)
def accountUpdateMultiEnd(self, reqId: int):
self._endReq(reqId)
def accountSummary(
self, _reqId: int, account: str, tag: str, value: str,
currency: str):
key = (account, tag, currency)
acctVal = AccountValue(account, tag, value, currency, '')
self.acctSummary[key] = acctVal
self.ib.accountSummaryEvent.emit(acctVal)
def accountSummaryEnd(self, reqId: int):
self._endReq(reqId)
def updatePortfolio(
self, contract: Contract, posSize: float, marketPrice: float,
marketValue: float, averageCost: float, unrealizedPNL: float,
realizedPNL: float, account: str):
contract = Contract.create(**dataclassAsDict(contract))
portfItem = PortfolioItem(
contract, posSize, marketPrice, marketValue,
averageCost, unrealizedPNL, realizedPNL, account)
portfolioItems = self.portfolio[account]
if posSize == 0:
portfolioItems.pop(contract.conId, None)
else:
portfolioItems[contract.conId] = portfItem
self._logger.info(f'updatePortfolio: {portfItem}')
self.ib.updatePortfolioEvent.emit(portfItem)
def position(
self, account: str, contract: Contract, posSize: float,
avgCost: float):
contract = Contract.create(**dataclassAsDict(contract))
position = Position(account, contract, posSize, avgCost)
positions = self.positions[account]
if posSize == 0:
positions.pop(contract.conId, None)
else:
positions[contract.conId] = position
self._logger.info(f'position: {position}')
results = self._results.get('positions')
if results is not None:
results.append(position)
self.ib.positionEvent.emit(position)
def positionEnd(self):
self._endReq('positions')
def pnl(
self, reqId: int, dailyPnL: float, unrealizedPnL: float,
realizedPnL: float):
pnl = self.reqId2PnL.get(reqId)
if not pnl:
return
pnl.dailyPnL = dailyPnL
pnl.unrealizedPnL = unrealizedPnL
pnl.realizedPnL = realizedPnL
self.ib.pnlEvent.emit(pnl)
def pnlSingle(
self, reqId: int, pos: int, dailyPnL: float, unrealizedPnL: float,
realizedPnL: float, value: float):
pnlSingle = self.reqId2PnlSingle.get(reqId)
if not pnlSingle:
return
pnlSingle.position = pos
pnlSingle.dailyPnL = dailyPnL
pnlSingle.unrealizedPnL = unrealizedPnL
pnlSingle.realizedPnL = realizedPnL
pnlSingle.value = value
self.ib.pnlSingleEvent.emit(pnlSingle)
def openOrder(
self, orderId: int, contract: Contract, order: Order,
orderState: OrderState):
"""
This wrapper is called to:
* feed in open orders at startup;
* feed in open orders or order updates from other clients and TWS
if clientId=master id;
* feed in manual orders and order updates from TWS if clientId=0;
* handle openOrders and allOpenOrders responses.
"""
if order.whatIf:
# response to whatIfOrder
if orderState.initMarginChange != str(UNSET_DOUBLE):
self._endReq(order.orderId, orderState)
else:
key = self.orderKey(order.clientId, order.orderId, order.permId)
trade = self.trades.get(key)
if trade:
trade.order.permId = order.permId
trade.order.totalQuantity = order.totalQuantity
trade.order.lmtPrice = order.lmtPrice
trade.order.auxPrice = order.auxPrice
trade.order.orderType = order.orderType
else:
# ignore '?' values in the order
order = Order(**{
k: v for k, v in dataclassAsDict(order).items()
if v != '?'})
contract = Contract.create(**dataclassAsDict(contract))
orderStatus = OrderStatus(
orderId=orderId, status=orderState.status)
trade = Trade(contract, order, orderStatus, [], [])
self.trades[key] = trade
self._logger.info(f'openOrder: {trade}')
self.permId2Trade.setdefault(order.permId, trade)
results = self._results.get('openOrders')
if results is None:
self.ib.openOrderEvent.emit(trade)
else:
# response to reqOpenOrders or reqAllOpenOrders
results.append(order)
# make sure that the client issues order ids larger then any
# order id encountered (even from other clients) to avoid
# "Duplicate order id" error
self.ib.client.updateReqId(orderId + 1)
def openOrderEnd(self):
self._endReq('openOrders')
def completedOrder(
self, contract: Contract, order: Order, orderState: OrderState):
contract = Contract.create(**dataclassAsDict(contract))
orderStatus = OrderStatus(
orderId=order.orderId, status=orderState.status)
trade = Trade(contract, order, orderStatus, [], [])
self._results['completedOrders'].append(trade)
if order.permId not in self.permId2Trade:
self.trades[order.permId] = trade
self.permId2Trade[order.permId] = trade
def completedOrdersEnd(self):
self._endReq('completedOrders')
def orderStatus(
self, orderId: int, status: str, filled: float, remaining: float,
avgFillPrice: float, permId: int, parentId: int,
lastFillPrice: float, clientId: int, whyHeld: str,
mktCapPrice: float = 0.0):
key = self.orderKey(clientId, orderId, permId)
trade = self.trades.get(key)
if trade:
msg: Optional[str]
oldStatus = trade.orderStatus.status
new = dict(
status=status, filled=filled,
remaining=remaining, avgFillPrice=avgFillPrice,
permId=permId, parentId=parentId,
lastFillPrice=lastFillPrice, clientId=clientId,
whyHeld=whyHeld, mktCapPrice=mktCapPrice)
curr = dataclassAsDict(trade.orderStatus)
isChanged = curr != {**curr, **new}
if isChanged:
dataclassUpdate(trade.orderStatus, **new)
msg = ''
elif (status == 'Submitted' and trade.log
and trade.log[-1].message == 'Modify'):
# order modifications are acknowledged
msg = 'Modified'
else:
msg = None
if msg is not None:
logEntry = TradeLogEntry(self.lastTime, status, msg)
trade.log.append(logEntry)
self._logger.info(f'orderStatus: {trade}')
self.ib.orderStatusEvent.emit(trade)
trade.statusEvent.emit(trade)
if status != oldStatus:
if status == OrderStatus.Filled:
trade.filledEvent.emit(trade)
elif status == OrderStatus.Cancelled:
trade.cancelledEvent.emit(trade)
else:
self._logger.error(
'orderStatus: No order found for '
'orderId %s and clientId %s', orderId, clientId)
def execDetails(
self, reqId: int, contract: Contract, execution: Execution):
"""
This wrapper handles both live fills and responses to
reqExecutions.
"""
self._logger.info(f'execDetails {execution}')
if execution.orderId == UNSET_INTEGER:
# bug in TWS: executions of manual orders have unset value
execution.orderId = 0
trade = self.permId2Trade.get(execution.permId)
if not trade:
key = self.orderKey(
execution.clientId, execution.orderId, execution.permId)
trade = self.trades.get(key)
if trade and contract == trade.contract:
contract = trade.contract
else:
contract = Contract.create(**dataclassAsDict(contract))
execId = execution.execId
isLive = reqId not in self._futures
time = self.lastTime if isLive else execution.time
fill = Fill(contract, execution, CommissionReport(), time)
if execId not in self.fills:
# first time we see this execution so add it
self.fills[execId] = fill
if trade:
trade.fills.append(fill)
logEntry = TradeLogEntry(
time,
trade.orderStatus.status,
f'Fill {execution.shares}@{execution.price}')
trade.log.append(logEntry)
if isLive:
self._logger.info(f'execDetails: {fill}')
self.ib.execDetailsEvent.emit(trade, fill)
trade.fillEvent(trade, fill)
if not isLive:
self._results[reqId].append(fill)
def execDetailsEnd(self, reqId: int):
self._endReq(reqId)
def commissionReport(self, commissionReport: CommissionReport):
if commissionReport.yield_ == UNSET_DOUBLE:
commissionReport.yield_ = 0.0
if commissionReport.realizedPNL == UNSET_DOUBLE:
commissionReport.realizedPNL = 0.0
fill = self.fills.get(commissionReport.execId)
if fill:
report = dataclassUpdate(fill.commissionReport, commissionReport)
self._logger.info(f'commissionReport: {report}')
trade = self.permId2Trade.get(fill.execution.permId)
if trade:
self.ib.commissionReportEvent.emit(trade, fill, report)
trade.commissionReportEvent.emit(trade, fill, report)
else:
# this is not a live execution and the order was filled
# before this connection started
pass
else:
# commission report is not for this client
pass
def orderBound(self, reqId: int, apiClientId: int, apiOrderId: int):
pass
def contractDetails(self, reqId: int, contractDetails: ContractDetails):
self._results[reqId].append(contractDetails)
bondContractDetails = contractDetails
def contractDetailsEnd(self, reqId: int):
self._endReq(reqId)
def symbolSamples(
self, reqId: int, contractDescriptions: List[ContractDescription]):
self._endReq(reqId, contractDescriptions)
def marketRule(
self, marketRuleId: int, priceIncrements: List[PriceIncrement]):
self._endReq(f'marketRule-{marketRuleId}', priceIncrements)
def marketDataType(self, reqId: int, marketDataId: int):
self.reqId2MarketDataType[reqId] = marketDataId
def realtimeBar(
self, reqId: int, time: int, open_: float, high: float, low: float,
close: float, volume: float, wap: float, count: int):
dt = datetime.fromtimestamp(time, timezone.utc)
bar = RealTimeBar(dt, -1, open_, high, low, close, volume, wap, count)
bars = self.reqId2Subscriber.get(reqId)
if bars is not None:
bars.append(bar)
self.ib.barUpdateEvent.emit(bars, True)
bars.updateEvent.emit(bars, True)
def historicalData(self, reqId: int, bar: BarData):
results = self._results.get(reqId)
if results is not None:
bar.date = parseIBDatetime(bar.date) # type: ignore
results.append(bar)
def historicalDataEnd(self, reqId, _start: str, _end: str):
self._endReq(reqId)
def historicalDataUpdate(self, reqId: int, bar: BarData):
bars = self.reqId2Subscriber.get(reqId)
if not bars:
return
bar.date = parseIBDatetime(bar.date) # type: ignore
lastDate = bars[-1].date
if bar.date < lastDate:
return
hasNewBar = len(bars) == 0 or bar.date > lastDate
if hasNewBar:
bars.append(bar)
elif bars[-1] != bar:
bars[-1] = bar
else:
return
self.ib.barUpdateEvent.emit(bars, hasNewBar)
bars.updateEvent.emit(bars, hasNewBar)
def headTimestamp(self, reqId: int, headTimestamp: str):
try:
dt = parseIBDatetime(headTimestamp)
self._endReq(reqId, dt)
except ValueError as exc:
self._endReq(reqId, exc, False)
def historicalTicks(
self, reqId: int, ticks: List[HistoricalTick], done: bool):
result = self._results.get(reqId)
if result is not None:
result += ticks
if done:
self._endReq(reqId)
def historicalTicksBidAsk(
self, reqId: int, ticks: List[HistoricalTickBidAsk], done: bool):
result = self._results.get(reqId)
if result is not None:
result += ticks
if done:
self._endReq(reqId)
def historicalTicksLast(
self, reqId: int, ticks: List[HistoricalTickLast], done: bool):
result = self._results.get(reqId)
if result is not None:
result += ticks
if done:
self._endReq(reqId)
# additional wrapper method provided by Client
def priceSizeTick(
self, reqId: int, tickType: int, price: float, size: float):
ticker = self.reqId2Ticker.get(reqId)
if not ticker:
self._logger.error(f'priceSizeTick: Unknown reqId: {reqId}')
return
# https://interactivebrokers.github.io/tws-api/tick_types.html
if tickType in (1, 66):
if price == ticker.bid and size == ticker.bidSize:
return
if price != ticker.bid:
ticker.prevBid = ticker.bid
ticker.bid = price
if size != ticker.bidSize:
ticker.prevBidSize = ticker.bidSize
ticker.bidSize = size
elif tickType in (2, 67):
if price == ticker.ask and size == ticker.askSize:
return
if price != ticker.ask:
ticker.prevAsk = ticker.ask
ticker.ask = price
if size != ticker.askSize:
ticker.prevAskSize = ticker.askSize
ticker.askSize = size
elif tickType in (4, 68):
if price != ticker.last:
ticker.prevLast = ticker.last
ticker.last = price
if size != ticker.lastSize:
ticker.prevLastSize = ticker.lastSize
ticker.lastSize = size
elif tickType in (6, 72):
ticker.high = price
elif tickType in (7, 73):
ticker.low = price
elif tickType in (9, 75):
ticker.close = price
elif tickType in (14, 76):
ticker.open = price
elif tickType == 15:
ticker.low13week = price
elif tickType == 16:
ticker.high13week = price
elif tickType == 17:
ticker.low26week = price
elif tickType == 18:
ticker.high26week = price
elif tickType == 19:
ticker.low52week = price
elif tickType == 20:
ticker.high52week = price
elif tickType == 35:
ticker.auctionPrice = price
elif tickType == 37:
ticker.markPrice = price
elif tickType == 50:
ticker.bidYield = price
elif tickType == 51:
ticker.askYield = price
elif tickType == 52:
ticker.lastYield = price
if price or size:
tick = TickData(self.lastTime, tickType, price, size)
ticker.ticks.append(tick)
ticker.marketDataType = self.reqId2MarketDataType.get(reqId, 0)
self.pendingTickers.add(ticker)
def tickSize(self, reqId: int, tickType: int, size: float):
ticker = self.reqId2Ticker.get(reqId)
if not ticker:
self._logger.error(f'tickSize: Unknown reqId: {reqId}')
return
price = -1.0
# https://interactivebrokers.github.io/tws-api/tick_types.html
if tickType in (0, 69):
if size == ticker.bidSize:
return
price = ticker.bid
ticker.prevBidSize = ticker.bidSize
ticker.bidSize = size
elif tickType in (3, 70):
if size == ticker.askSize:
return
price = ticker.ask
ticker.prevAskSize = ticker.askSize
ticker.askSize = size
elif tickType in (5, 71):
price = ticker.last
if isNan(price):
return
if size != ticker.lastSize:
ticker.prevLastSize = ticker.lastSize
ticker.lastSize = size
elif tickType in (8, 74):
ticker.volume = size
elif tickType == 21:
ticker.avVolume = size
elif tickType == 27:
ticker.callOpenInterest = size
elif tickType == 28:
ticker.putOpenInterest = size
elif tickType == 29:
ticker.callVolume = size
elif tickType == 30:
ticker.putVolume = size
elif tickType == 34:
ticker.auctionVolume = size
elif tickType == 36:
ticker.auctionImbalance = size
elif tickType == 61:
ticker.regulatoryImbalance = size
elif tickType == 86:
ticker.futuresOpenInterest = size
elif tickType == 87:
ticker.avOptionVolume = size
elif tickType == 89:
ticker.shortableShares = size
if price or size:
tick = TickData(self.lastTime, tickType, price, size)
ticker.ticks.append(tick)
ticker.marketDataType = self.reqId2MarketDataType.get(reqId, 0)
self.pendingTickers.add(ticker)
def tickSnapshotEnd(self, reqId: int):
self._endReq(reqId)
def tickByTickAllLast(
self, reqId: int, tickType: int, time: int, price: float,
size: float, tickAttribLast: TickAttribLast,
exchange, specialConditions):
ticker = self.reqId2Ticker.get(reqId)
if not ticker:
self._logger.error(f'tickByTickAllLast: Unknown reqId: {reqId}')
return
if price != ticker.last:
ticker.prevLast = ticker.last
ticker.last = price
if size != ticker.lastSize:
ticker.prevLastSize = ticker.lastSize
ticker.lastSize = size
tick = TickByTickAllLast(
tickType, self.lastTime, price, size, tickAttribLast,
exchange, specialConditions)
ticker.tickByTicks.append(tick)
self.pendingTickers.add(ticker)
def tickByTickBidAsk(
self, reqId: int, time: int, bidPrice: float, askPrice: float,
bidSize: float, askSize: float,
tickAttribBidAsk: TickAttribBidAsk):
ticker = self.reqId2Ticker.get(reqId)
if not ticker:
self._logger.error(f'tickByTickBidAsk: Unknown reqId: {reqId}')
return
if bidPrice != ticker.bid:
ticker.prevBid = ticker.bid
ticker.bid = bidPrice
if bidSize != ticker.bidSize:
ticker.prevBidSize = ticker.bidSize
ticker.bidSize = bidSize
if askPrice != ticker.ask:
ticker.prevAsk = ticker.ask
ticker.ask = askPrice
if askSize != ticker.askSize:
ticker.prevAskSize = ticker.askSize
ticker.askSize = askSize
tick = TickByTickBidAsk(
self.lastTime, bidPrice, askPrice, bidSize, askSize,
tickAttribBidAsk)
ticker.tickByTicks.append(tick)
self.pendingTickers.add(ticker)
def tickByTickMidPoint(self, reqId: int, time: int, midPoint: float):
ticker = self.reqId2Ticker.get(reqId)
if not ticker:
self._logger.error(f'tickByTickMidPoint: Unknown reqId: {reqId}')
return
tick = TickByTickMidPoint(self.lastTime, midPoint)
ticker.tickByTicks.append(tick)
self.pendingTickers.add(ticker)
def tickString(self, reqId: int, tickType: int, value: str):
ticker = self.reqId2Ticker.get(reqId)
if not ticker:
return
try:
if tickType == 47:
# https://interactivebrokers.github.io/tws-api/fundamental_ratios_tags.html
d = dict(t.split('=') # type: ignore
for t in value.split(';') if t) # type: ignore
for k, v in d.items():
with suppress(ValueError):
if v == '-99999.99':
v = 'nan'
d[k] = float(v)
d[k] = int(v)
ticker.fundamentalRatios = FundamentalRatios(**d)
elif tickType in (48, 77):
# RT Volume or RT Trade Volume string format:
# price;size;ms since epoch;total volume;VWAP;single trade
# example:
# 701.28;1;1348075471534;67854;701.46918464;true
priceStr, sizeStr, rtTime, volume, vwap, _ = value.split(';')
if volume:
if tickType == 48:
ticker.rtVolume = float(volume)
elif tickType == 77:
ticker.rtTradeVolume = float(volume)
if vwap:
ticker.vwap = float(vwap)
if rtTime:
ticker.rtTime = datetime.fromtimestamp(
int(rtTime) / 1000, timezone.utc)
if priceStr == '':
return
price = float(priceStr)
size = float(sizeStr)
if price and size:
if ticker.prevLast != ticker.last:
ticker.prevLast = ticker.last
ticker.last = price
if ticker.prevLastSize != ticker.lastSize:
ticker.prevLastSize = ticker.lastSize
ticker.lastSize = size
tick = TickData(self.lastTime, tickType, price, size)
ticker.ticks.append(tick)
elif tickType == 59:
# Dividend tick:
# https://interactivebrokers.github.io/tws-api/tick_types.html#ib_dividends
# example value: '0.83,0.92,20130219,0.23'
past12, next12, nextDate, nextAmount = value.split(',')
ticker.dividends = Dividends(
float(past12) if past12 else None,
float(next12) if next12 else None,
parseIBDatetime(nextDate) if nextDate else None,
float(nextAmount) if nextAmount else None)
self.pendingTickers.add(ticker)
except ValueError:
self._logger.error(
f'tickString with tickType {tickType}: '
f'malformed value: {value!r}')
def tickGeneric(self, reqId: int, tickType: int, value: float):
ticker = self.reqId2Ticker.get(reqId)
if not ticker:
return
try:
value = float(value)
except ValueError:
self._logger.error(f'genericTick: malformed value: {value!r}')
return
if tickType == 23:
ticker.histVolatility = value
elif tickType == 24:
ticker.impliedVolatility = value
elif tickType == 31:
ticker.indexFuturePremium = value
elif tickType == 49:
ticker.halted = value
elif tickType == 54:
ticker.tradeCount = value
elif tickType == 55:
ticker.tradeRate = value
elif tickType == 56:
ticker.volumeRate = value
elif tickType == 58:
ticker.rtHistVolatility = value
tick = TickData(self.lastTime, tickType, value, 0)
ticker.ticks.append(tick)
self.pendingTickers.add(ticker)
def tickReqParams(
self, reqId: int, minTick: float, bboExchange: str,
snapshotPermissions: int):
pass
def mktDepthExchanges(
self, depthMktDataDescriptions: List[DepthMktDataDescription]):
self._endReq('mktDepthExchanges', depthMktDataDescriptions)
def updateMktDepth(
self, reqId: int, position: int, operation: int, side: int,
price: float, size: float):
self.updateMktDepthL2(
reqId, position, '', operation, side, price, size)
def updateMktDepthL2(
self, reqId: int, position: int, marketMaker: str, operation: int,
side: int, price: float, size: float, isSmartDepth: bool = False):
# operation: 0 = insert, 1 = update, 2 = delete
# side: 0 = ask, 1 = bid
ticker = self.reqId2Ticker[reqId]
dom = ticker.domBids if side else ticker.domAsks
if operation == 0:
dom.insert(position, DOMLevel(price, size, marketMaker))
elif operation == 1:
dom[position] = DOMLevel(price, size, marketMaker)
elif operation == 2:
if position < len(dom):
level = dom.pop(position)
price = level.price
size = 0
tick = MktDepthData(
self.lastTime, position, marketMaker, operation, side, price, size)
ticker.domTicks.append(tick)
self.pendingTickers.add(ticker)
def tickOptionComputation(
self, reqId: int, tickType: int, tickAttrib: int,
impliedVol: float, delta: float, optPrice: float, pvDividend:
float, gamma: float, vega: float, theta: float, undPrice: float):
comp = OptionComputation(
tickAttrib, impliedVol, delta, optPrice, pvDividend,
gamma, vega, theta, undPrice)
ticker = self.reqId2Ticker.get(reqId)
if ticker:
# reply from reqMktData
# https://interactivebrokers.github.io/tws-api/tick_types.html
if tickType in (10, 80):
ticker.bidGreeks = comp
elif tickType in (11, 81):
ticker.askGreeks = comp
elif tickType in (12, 82):
ticker.lastGreeks = comp
elif tickType in (13, 83):
ticker.modelGreeks = comp
self.pendingTickers.add(ticker)
elif reqId in self._futures:
# reply from calculateImpliedVolatility or calculateOptionPrice
self._endReq(reqId, comp)
else:
self._logger.error(
f'tickOptionComputation: Unknown reqId: {reqId}')
def deltaNeutralValidation(self, reqId: int, dnc: DeltaNeutralContract):
pass
def fundamentalData(self, reqId: int, data: str):
self._endReq(reqId, data)
def scannerParameters(self, xml: str):
self._endReq('scannerParams', xml)
def scannerData(
self, reqId: int, rank: int,
contractDetails: ContractDetails,
distance: str, benchmark: str, projection: str,
legsStr: str):
data = ScanData(
rank, contractDetails, distance, benchmark, projection, legsStr)
dataList = self.reqId2Subscriber.get(reqId)
if dataList is None:
dataList = self._results.get(reqId)
if dataList is not None:
if rank == 0:
dataList.clear()
dataList.append(data)
def scannerDataEnd(self, reqId: int):
dataList = self._results.get(reqId)
if dataList is not None:
self._endReq(reqId)
else:
dataList = self.reqId2Subscriber.get(reqId)
if dataList is not None:
self.ib.scannerDataEvent.emit(dataList)
dataList.updateEvent.emit(dataList)
def histogramData(self, reqId: int, items: List[HistogramData]):
result = [HistogramData(item.price, item.count) for item in items]
self._endReq(reqId, result)
def securityDefinitionOptionParameter(
self, reqId: int, exchange: str, underlyingConId: int,
tradingClass: str, multiplier: str, expirations: List[str],
strikes: List[float]):
chain = OptionChain(
exchange, underlyingConId, tradingClass, multiplier,
expirations, strikes)
self._results[reqId].append(chain)
def securityDefinitionOptionParameterEnd(self, reqId: int):
self._endReq(reqId)
def newsProviders(self, newsProviders: List[NewsProvider]):
newsProviders = [
NewsProvider(code=p.code, name=p.name)
for p in newsProviders]
self._endReq('newsProviders', newsProviders)
def tickNews(
self, _reqId: int, timeStamp: int, providerCode: str,
articleId: str, headline: str, extraData: str):
news = NewsTick(
timeStamp, providerCode, articleId, headline, extraData)
self.newsTicks.append(news)
self.ib.tickNewsEvent.emit(news)
def newsArticle(self, reqId: int, articleType: int, articleText: str):
article = NewsArticle(articleType, articleText)
self._endReq(reqId, article)
def historicalNews(
self, reqId: int, time: str, providerCode: str,
articleId: str, headline: str):
dt = parseIBDatetime(time)
dt = cast(datetime, dt)
article = HistoricalNews(dt, providerCode, articleId, headline)
self._results[reqId].append(article)
def historicalNewsEnd(self, reqId, _hasMore: bool):
self._endReq(reqId)
def updateNewsBulletin(
self, msgId: int, msgType: int, message: str,
origExchange: str):
bulletin = NewsBulletin(msgId, msgType, message, origExchange)
self.msgId2NewsBulletin[msgId] = bulletin
self.ib.newsBulletinEvent.emit(bulletin)
def receiveFA(self, _faDataType: int, faXmlData: str):
self._endReq('requestFA', faXmlData)
def currentTime(self, time: int):
dt = datetime.fromtimestamp(time, timezone.utc)
self._endReq('currentTime', dt)
def tickEFP(
self, reqId: int, tickType: int, basisPoints: float,
formattedBasisPoints: str, totalDividends: float,
holdDays: int, futureLastTradeDate: str, dividendImpact: float,
dividendsToLastTradeDate: float):
pass
def error(self, reqId: int, errorCode: int, errorString: str):
# https://interactivebrokers.github.io/tws-api/message_codes.html
warningCodes = {165, 202, 399, 404, 434, 492, 10167}
isWarning = errorCode in warningCodes or 2100 <= errorCode < 2200
msg = (
f'{"Warning" if isWarning else "Error"} '
f'{errorCode}, reqId {reqId}: {errorString}')
contract = self._reqId2Contract.get(reqId)
if contract:
msg += f', contract: {contract}'
if isWarning:
self._logger.info(msg)
else:
self._logger.error(msg)
if reqId in self._futures:
# the request failed
if self.ib.RaiseRequestErrors:
error = RequestError(reqId, errorCode, errorString)
self._endReq(reqId, error, success=False)
else:
self._endReq(reqId)
elif (self.clientId, reqId) in self.trades:
# something is wrong with the order, cancel it
trade = self.trades[(self.clientId, reqId)]
if not trade.isDone():
status = trade.orderStatus.status = OrderStatus.Cancelled
logEntry = TradeLogEntry(
self.lastTime, status, msg, errorCode)
trade.log.append(logEntry)
self._logger.warning(f'Canceled order: {trade}')
self.ib.orderStatusEvent.emit(trade)
trade.statusEvent.emit(trade)
trade.cancelledEvent.emit(trade)
if errorCode == 165:
# for scan data subscription there are no longer matching results
dataList = self.reqId2Subscriber.get(reqId)
if dataList:
dataList.clear()
dataList.updateEvent.emit(dataList)
elif errorCode == 317:
# Market depth data has been RESET
ticker = self.reqId2Ticker.get(reqId)
if ticker:
# clear all DOM levels
ticker.domTicks += [MktDepthData(
self.lastTime, 0, '', 2, 0, level.price, 0)
for level in ticker.domAsks]
ticker.domTicks += [MktDepthData(
self.lastTime, 0, '', 2, 1, level.price, 0)
for level in ticker.domBids]
ticker.domAsks.clear()
ticker.domBids.clear()
self.pendingTickers.add(ticker)
elif errorCode == 10225:
# Bust event occurred, current subscription is deactivated.
# Please resubscribe real-time bars immediately
bars = self.reqId2Subscriber.get(reqId)
if isinstance(bars, RealTimeBarList):
self.ib.client.cancelRealTimeBars(reqId)
self.ib.client.reqRealTimeBars(
reqId, bars.contract, bars.barSize, bars.whatToShow,
bars.useRTH, bars.realTimeBarsOptions)
elif isinstance(bars, BarDataList):
self.ib.client.cancelHistoricalData(reqId)
self.ib.client.reqHistoricalData(
reqId, bars.contract, bars.endDateTime,
bars.durationStr, bars.barSizeSetting, bars.whatToShow,
bars.useRTH, bars.formatDate, bars.keepUpToDate,
bars.chartOptions)
self.ib.errorEvent.emit(reqId, errorCode, errorString, contract)
def tcpDataArrived(self):
self.lastTime = datetime.now(timezone.utc)
for ticker in self.pendingTickers:
ticker.ticks = []
ticker.tickByTicks = []
ticker.domTicks = []
self.pendingTickers = set()
def tcpDataProcessed(self):
self.ib.updateEvent.emit()
if self.pendingTickers:
for ticker in self.pendingTickers:
ticker.time = self.lastTime
ticker.updateEvent.emit(ticker)
self.ib.pendingTickersEvent.emit(self.pendingTickers)