一个测试文章 包含PYTHON代码

  • VNPY中持仓查询的方法,可以根据以下以下原理灵活变通:1.在CTP_gateway中定义了计时器处理函数,def process_timer_event(self, event): 它的功能是每2秒查询一次账户持仓(CTP_gateway里也有个队列)。
  • 在CTP_gateway中向CTP柜台请求查询账户持仓后,CTP柜台响应、用def onRspQryInvestorPosition() 来处理响应,这函数调用on_position,把查询到的结果推入事件引擎队列中。
  • main_engine从队列中读持仓数据,用def process_position_event()函数来处理查询到的持仓数据,在这个函数中,把查询的持仓数据缓存到了self.positions这个字典中
  • 那么在CTA策略中如果想查看账户持仓数据,那就调用主引擎的get_position或者get_all_position(),就可看到持仓数据,也就实现了你想要的“调用”。

我们来看一下 def onRspQryInvestorPosition 具体内容

    def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool):
        """"""
        if not data:
            return

        # Check if contract data received
        if data["InstrumentID"] in symbol_exchange_map:
            # Get buffered position object
            key = f"{data['InstrumentID'], data['PosiDirection']}"
            position = self.positions.get(key, None)
            if not position:
                position = PositionData(
                    symbol=data["InstrumentID"],
                    exchange=symbol_exchange_map[data["InstrumentID"]],
                    direction=DIRECTION_CTP2VT[data["PosiDirection"]],
                    gateway_name=self.gateway_name
                )
                self.positions[key] = position

            # For SHFE and INE position data update
            if position.exchange in [Exchange.SHFE, Exchange.INE]:
                if data["YdPosition"] and not data["TodayPosition"]:
                    position.yd_volume = data["Position"]
            # For other exchange position data update
            else:
                position.yd_volume = data["Position"] - data["TodayPosition"]

            # Get contract size (spread contract has no size value)
            size = symbol_size_map.get(position.symbol, 0)

            # Calculate previous position cost
            cost = position.price * position.volume * size

            # Update new position volume
            position.volume += data["Position"]
            position.pnl += data["PositionProfit"]

            # Calculate average position price
            if position.volume and size:
                cost += data["PositionCost"]
                position.price = cost / (position.volume * size)

            # Get frozen volume
            if position.direction == Direction.LONG:
                position.frozen += data["ShortFrozen"]
            else:
                position.frozen += data["LongFrozen"]

        if last:
            for position in self.positions.values():
                self.gateway.on_position(position)

            self.positions.clear()