完整技术教程见这里
有很多人建立了自己本地的行情数据库,希望能够从本地数据库将数据发到backtrader,供策略使用。一个通用的方法是将数据库的行情数据读到pandas dataframe里,然后将这个数据帧的数据传给backtrader的pandas feed数据对象,这样策略就能够使用了。
但是有些同学不想通过pandas dataframe中转,而是想直接从数据库将数据喂给backtrader的数据馈送对象,这就需要针对数据库开发专门的data feed类了。
上一篇我们介绍了如何开发针对MySQL数据库的data feed,本篇介绍针对sqlite数据库的data feed开发。
以下就是backtrader社区提供的一个从sqlite数据库读取数据的feed类SQLiteData,大家可以试一试。
import datetime as dt
from backtrader import TimeFrame
from backtrader.feed import DataBase
from backtrader import date2numclass SQLiteData(DataBase):'''Fetches data from SQLite, and wraps it into a Feed consumable by cerebrotakes url connection string in form of :sqlite://{database}this implementation assumes a single table (historical_data) with all prices,conforming to a schema similar to the following:symbol TEXT,date TEXT (YYYY-mm-dd HH:mm),open REAL,high REAL,low REAL,close REAL,volume INTEGER,unique (symbol, date)if your databases are set up differently, you can override thestart() method.'''params = (('database', None),('symbol', 'XBTUSD'),('tick_value', 0.01),('timeframe ', TimeFrame.Minutes),('compression', 1),('fromdate', dt.datetime(1900, 1, 1)),('todate', dt.datetime.max),# parameterized column indices for ease of overwriting/re-implementing('datetime', 0),('open', 1),('high', 2),('low', 3),('close', 4),('volume', 5),('openinterest', -1),)def __init__(self):self._timeframe = self.p.timeframeself._compression = self.p.compressionself._dataname = '{0}-{1:.2f}'.format(self.p.symbol, self.p.tick_value)def start(self):super(SQLiteData, self).start()self.biter = Noneself.preloaded = Falsedef _preload(self):engine = self._connect_db()sql_query = "SELECT `date`,`open`,`high`,`low`,`close`,`volume` FROM `historical_data` WHERE `symbol` = '" + self.p.symbol + "' AND `date` between '" + self.p.fromdate.strftime("%Y-%m-%d %H:%M:%S") + "' and '" + self.p.todate.strftime("%Y-%m-%d %H:%M:%S") + "' ORDER BY `date` ASC"result = engine.execute(sql_query)dbars = result.fetchall()result.close()self.biter = iter(dbars)def preload(self):if not self.biter:self._preload()while self.load():passself._last()self.home()self.biter = Noneself.preloaded = Truedef _load(self):if self.preloaded:return Falseif not self.biter:self._preload()try:bar = next(self.biter)except StopIteration:return Falsefor field in self.getlinealiases():if field == 'datetime':self.lines.datetime[0] = date2num(dt.datetime.strptime(bar[self.p.datetime], '%Y-%m-%d %H:%M:%S'))elif field == 'volume':self.lines.volume[0] = bar[self.p.volume]else:# get the column indexcolidx = getattr(self.params, field)if colidx < 0:# column not present -- skipcontinue# get the line to be setline = getattr(self.lines, field)line[0] = float(bar[colidx])return Truedef _connect_db(self):from sqlalchemy import create_engineurl = 'sqlite:///{0}'.format(self.p.database)engine = create_engine(url, echo=False)return engine
发布于 3 小时前