CTP(Comprehensive Transaction Platform)直连数据 Skill,支持行情订阅、交易下单、持仓资金查询,兼容实盘穿透式环境和 SimNow 仿真环境。
# 安装 CTP Python 封装库(openctp 或 vnpy-ctp)
pip install openctp
# 数据存储与推送依赖
pip install pandas sqlalchemy pymongo redis pyzmq websocket-client
# 可选:异步支持
pip install asyncio aiohttp
创建 config/ctp_config.yaml:
# 环境选择: simnow / production
environment: simnow
# SimNow 仿真环境
simnow:
front_address: "tcp://180.168.146.187:10131" # 行情前置
trade_front: "tcp://180.168.146.187:10130" # 交易前置
broker_id: "9999"
user_id: "${SIMNOW_USER_ID}" # 从环境变量读取
password: "${SIMNOW_PASSWORD}" # 从环境变量读取
auth_code: "${SIMNOW_AUTH_CODE}" # 从环境变量读取(16位)
app_id: "simnow_client_test"
# 实盘穿透式环境
production:
front_address: "tcp://your_broker_md_front:port"
trade_front: "tcp://your_broker_trade_front:port"
broker_id: "${PROD_BROKER_ID}"
user_id: "${PROD_USER_ID}"
password: "${PROD_PASSWORD}"
auth_code: "${PROD_AUTH_CODE}"
app_id: "${PROD_APP_ID}"
# 数据输出配置
output:
# 本地文件
file:
enabled: true
path: "./data"
format: "csv" # csv / json / parquet
# 数据库存储
database:
enabled: false
type: "sqlite" # sqlite / mysql / mongodb
connection: "sqlite:///ctp_data.db"
# 实时推送
streaming:
enabled: false
type: "zmq" # zmq / websocket / redis
address: "tcp://127.0.0.1:5555"
# 订阅合约
subscriptions:
- "cu2506" # 沪铜
- "al2506" # 沪铝
- "ni2506" # 沪镍
- "CF509" # 棉花
> ⚠️ 安全提示:配置文件中的敏感信息(密码、认证码)建议使用环境变量 ${ENV_VAR} 格式,避免明文存储。
from ctp_connector import CTPMarketData
# 初始化行情接口
md = CTPMarketData(config_path="config/ctp_config.yaml")
# 连接并登录
md.connect()
md.login()
# 订阅合约
md.subscribe(["cu2506", "al2506", "ni2506"])
# 运行(阻塞模式)
md.run()
from ctp_connector import CTPTrader
# 初始化交易接口
trader = CTPTrader(config_path="config/ctp_config.yaml")
trader.connect()
trader.login()
# 查询账户信息
account = trader.query_account()
print(f"可用资金: {account.available}")
# 查询持仓
positions = trader.query_positions()
for pos in positions:
print(f"{pos.instrument}: 多头{pos.long_vol}, 空头{pos.short_vol}")
# 下单
order = trader.insert_order(
instrument="cu2506",
direction="buy", # buy / sell
offset="open", # open / close / close_today
price=70000,
volume=1,
order_type="limit" # limit / market / fok / fak
)
# 撤单
trader.cancel_order(order.order_id)
# 查询委托
orders = trader.query_orders()
# 查询成交
trades = trader.query_trades()
class CTPMarketData:
"""CTP 行情数据接口"""
def connect(self) -> bool
def login(self) -> bool
def subscribe(self, instruments: List[str]) -> bool
def unsubscribe(self, instruments: List[str]) -> bool
def run(self) -> None # 阻塞运行
def stop(self) -> None
# 回调函数(可重写)
def on_tick(self, tick: TickData) -> None
def on_depth(self, depth: DepthData) -> None
def on_error(self, error: str) -> None
class CTPTrader:
"""CTP 交易接口"""
def connect(self) -> bool
def login(self) -> bool
# 查询类
def query_account(self) -> AccountData
def query_positions(self) -> List[PositionData]
def query_orders(self) -> List[OrderData]
def query_trades(self) -> List[TradeData]
# 交易类
def insert_order(self, ...) -> OrderData
def cancel_order(self, order_id: str) -> bool
# 回调函数
def on_order(self, order: OrderData) -> None
def on_trade(self, trade: TradeData) -> None
def on_position(self, position: PositionData) -> None
@dataclass
class TickData:
instrument: str # 合约代码
datetime: datetime # 时间戳
last_price: float # 最新价
volume: int # 成交量
open_interest: float # 持仓量
bid_price_1: float # 买一档价
bid_volume_1: int # 买一档量
ask_price_1: float # 卖一档价
ask_volume_1: int # 卖一档量
upper_limit: float # 涨停价
lower_limit: float # 跌停价
@dataclass
class AccountData:
account_id: str
balance: float # 权益
available: float # 可用资金
margin: float # 保证金
frozen: float # 冻结资金
@dataclass
class PositionData:
instrument: str
direction: str # long / short
volume: int # 总持仓
available: int # 可平持仓
open_price: float # 开仓均价
position_profit: float # 持仓盈亏
@dataclass
class OrderData:
order_id: str
instrument: str
direction: str # buy / sell
offset: str # open / close / close_today
price: float
volume: int
traded: int
status: str # pending / partial / filled / cancelled / rejected
insert_time: datetime
@dataclass
class TradeData:
trade_id: str
order_id: str
instrument: str
direction: str
offset: str
price: float
volume: int
trade_time: datetime
from ctp_connector.storage import CSVStorage
storage = CSVStorage(path="./data", format="csv")
storage.save_tick(tick_data)
from ctp_connector.storage import SQLStorage
storage = SQLStorage(connection="sqlite:///ctp_data.db")
storage.save_tick(tick_data)
storage.save_trade(trade_data)
from ctp_connector.storage import MongoStorage
storage = MongoStorage(uri="mongodb://localhost:27017", db="ctp_data")
storage.save_tick(tick_data)
from ctp_connector.streaming import ZMQPublisher
publisher = ZMQPublisher(address="tcp://127.0.0.1:5555")
publisher.publish(tick_data)
from ctp_connector.streaming import WebSocketServer
server = WebSocketServer(host="0.0.0.0", port=8765)
server.start()
server.broadcast(tick_data)
from ctp_connector.backtest import DataRecorder
# 录制行情数据
recorder = DataRecorder(
config_path="config/ctp_config.yaml",
instruments=["cu2506", "al2506"],
duration="1d" # 录制时长
)
recorder.run()
# 数据回放
from ctp_connector.backtest import DataPlayback
playback = DataPlayback(data_path="./data/cu2506_20250427.csv")
for tick in playback:
strategy.on_tick(tick)
md = CTPMarketData(config_path="config/ctp_config.yaml")
md.connect()
md.login()
# 订阅主力合约
instruments = ["cu2506", "al2506", "ni2506", "CF509"]
md.subscribe(instruments)
# 自定义回调
class MyMarketData(CTPMarketData):
def on_tick(self, tick: TickData):
# 自定义处理逻辑
if tick.instrument == "cu2506":
print(f"铜价更新: {tick.last_price}")
# 调用父类默认存储逻辑
super().on_tick(tick)
md = MyMarketData(config_path="config/ctp_config.yaml")
from ctp_connector import CTPMarketData
md = CTPMarketData(
config_path="config/ctp_config.yaml",
auto_reconnect=True,
reconnect_interval=5 # 秒
)
md.run()
from ctp_connector import MultiAccountManager
manager = MultiAccountManager()
# 添加多个账户
manager.add_account("account1", "config/account1.yaml")
manager.add_account("account2", "config/account2.yaml")
# 批量查询
accounts = manager.query_all_accounts()
positions = manager.query_all_positions()
SimNow 是 CTP 提供的仿真交易环境,无需真实资金即可测试。
| 环境 | 行情前置 | 交易前置 | 时间段 |
|---|---|---|---|
| ------ | ---------- | ---------- | -------- |
| 第一组 | 180.168.146.187:10131 | 180.168.146.187:10130 | 交易时段 |
| 第二组 | 180.168.146.187:10111 | 180.168.146.187:10101 | 交易时段 |
| 电信 | 218.202.237.33:10113 | 218.202.237.33:10103 | 交易时段 |
| 移动 | 114.80.55.135:10113 | 114.80.55.135:10103 | 交易时段 |
app_id 和 auth_code 由期货公司提供broker_id 为期货公司代码(如中信期货为 6666)检查以下几点:
contracts = trader.query_all_contracts()
for contract in contracts:
print(f"{contract.code}: {contract.name}")
ctp-connector/
├── SKILL.md # 本文件
├── scripts/
│ ├── __init__.py # 包初始化,导出核心接口
│ ├── ctp_market.py # 行情接口
│ ├── ctp_trade.py # 交易接口
│ ├── models.py # 数据模型
│ ├── storage.py # 数据存储
│ ├── streaming.py # 实时推送
│ ├── backtest.py # 回测支持
│ └── utils.py # 工具函数
├── tests/ # 单元测试
│ ├── __init__.py
│ ├── test_models.py
│ ├── test_utils.py
│ └── test_ctp_market.py
├── references/
│ └── CTP_API_Documentation.pdf
└── assets/
└── architecture.png
本 Skill 仅供学习和研究使用,不构成任何投资建议。使用 CTP 接口进行交易前,请确保您已充分了解相关风险。
cp .env.example .env
# SimNow 仿真环境
SIMNOW_USER_ID=your_simnow_userid
SIMNOW_PASSWORD=your_simnow_password
SIMNOW_AUTH_CODE=your_16char_auth_code # 替换为真实16位认证码
# 实盘环境
PROD_BROKER_ID=your_broker_id
PROD_USER_ID=your_user_id
PROD_PASSWORD=your_password
PROD_AUTH_CODE=your_auth_code
# .gitignore 已包含
echo ".env" >> .gitignore
password=**(仅显示4位)ctp_connector/
├── SKILL.md # 本文档
├── requirements.txt # 完整依赖(含可选)
├── requirements-core.txt # 核心依赖
├── requirements-optional.txt # 可选依赖
├── .env.example # 环境变量模板
└── scripts/
├── __init__.py # 包初始化,导出核心接口
├── models.py # 数据模型(318行)
├── utils.py # 工具函数,含日志脱敏(513行)
├── ctp_market.py # 行情接口(460行)
├── ctp_trade.py # 交易接口(731行)
├── storage.py # 数据存储(可选)
├── streaming.py # 实时推送(可选)
└── backtest.py # 回测引擎(可选)
| 模块 | 依赖 | 说明 |
|---|---|---|
| ------ | ------ | ------ |
| core | 必须 | CTPMarketData, CTPTrader, models, utils |
| storage | 可选 | DataRecorder, DataPlayback |
| streaming | 可选 | ZeroMQ/WebSocket/Redis 推送 |
| backtest | 可选 | 回测引擎 |
# 最小安装(仅核心功能)
pip install -r requirements-core.txt
# 完整安装(含所有功能)
pip install -r requirements.txt
# 按需添加可选功能
pip install sqlalchemy pymongo pyzmq
>= 改为 == 固定版本号.env 环境变量配置支持共 2 个版本