一个测试文章 包含PYTHON代码
- VNPY中持仓查询的方法,可以根据以下以下原理灵活变通:1.在CTP_gateway中定义了计时器处理函数,
def process_timer_event(self, event):
它的功能是每2秒查询一次账户持仓(CTP_gateway里也有个队列)。
- 在CTP_gateway中向CTP柜台请求查询账户持仓后,CTP柜台响应、用
def onRspQryInvestorPosition()
来处理响应,这函数调用on_position,把查询到的结果推入事件引擎队列中。 - main_engine从队列中读持仓数据,用
def process_position_event()
函数来处理查询到的持仓数据,在这个函数中,把查询的持仓数据缓存到了self.positions这个字典中 - 那么在CTA策略中如果想查看账户持仓数据,那就调用主引擎的get_position或者get_all_position(),就可看到持仓数据,也就实现了你想要的“调用”。
我们来看一下 def onRspQryInvestorPosition 具体内容
def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool): """""" if not data: return # Check if contract data received if data["InstrumentID"] in symbol_exchange_map: # Get buffered position object key = f"{data['InstrumentID'], data['PosiDirection']}" position = self.positions.get(key, None) if not position: position = PositionData( symbol=data["InstrumentID"], exchange=symbol_exchange_map[data["InstrumentID"]], direction=DIRECTION_CTP2VT[data["PosiDirection"]], gateway_name=self.gateway_name ) self.positions[key] = position # For SHFE and INE position data update if position.exchange in [Exchange.SHFE, Exchange.INE]: if data["YdPosition"] and not data["TodayPosition"]: position.yd_volume = data["Position"] # For other exchange position data update else: position.yd_volume = data["Position"] - data["TodayPosition"] # Get contract size (spread contract has no size value) size = symbol_size_map.get(position.symbol, 0) # Calculate previous position cost cost = position.price * position.volume * size # Update new position volume position.volume += data["Position"] position.pnl += data["PositionProfit"] # Calculate average position price if position.volume and size: cost += data["PositionCost"] position.price = cost / (position.volume * size) # Get frozen volume if position.direction == Direction.LONG: position.frozen += data["ShortFrozen"] else: position.frozen += data["LongFrozen"] if last: for position in self.positions.values(): self.gateway.on_position(position) self.positions.clear()