简单说明:
此模块儿常用来把关系数据库的表结构映射到对象上,允许开发人员首先设计数据模型,并能决定稍候可视化数据的方式(CLI/WEB/GUI),和以往的先绝对如何在框架允许的范围内使用数据模型的开发方法完全相反,它兼容众多数据库(SQLite/MySQL/Postgres/Oracle/MS-SQL/SQLServer/Firebird)等(http://www.sqlalchemy.org/organizations.html)
会话实例:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建引擎
from sqlalchemy import create_engine
# 创建会话类
from sqlalchemy.orm import sessionmaker
def main():
"""Main function.
"""
# mysqldb是数据库引擎,root:root@127.0.0.1分别为帐号:密码@主机,xmdevops为数据库,charset为字符集
sql_connect_str = ('mysql+mysqldb://root:root@127.0.0.1/'
'xmdevops?charset=utf8')
# 通过连接字符串创建一个引擎,echo=True会通过logging模块儿来输出日志
sql_engine = create_engine(sql_connect_str, echo=True)
# 基于数据库引擎创建会话类
SessionCls = sessionmaker(bind=sql_engine)
# 实例化一个会话,其实创建了默认为5个连接的连接池
sql_session = SessionCls()
s.execute(clause, params=None, mapper=None, bind=None, **kw) -> ResultProxy
说明: 执行SQL语句
s.commit() -> None
说明:提交
s.rollback()
说明:回滚
s.close() -> None
说明:关闭内存中的当前会话
s.close_all() -> None
说明:关闭内存中的所有会话
游标实例:
r.keys() -> list
说明:列表形式返回当前解决集的所有列字段
r.closed -> bool
说明:判断游标是否关闭
r.close() -> None
说明:关闭游标
r.fetchall() -> list
说明:游标获取所有结果集
r.fetchone() -> list
说明:游标获取一行结果集
r.fetchmany(size=None) -> list
说明:游标获取多行结果集
r.first() -> list
说明:游标获取结果集第一行
r.rowcount -> int
说明:返回游标结果集行数
r.__dict__
说明:返回游标记过集的字典形式
定义映射:
1.经典映射模式,需要先描述这个表,例如要描述如下表结构
CREATE TABLE [users] ([id] INTEGER PRIMARY KEY,
[name] TEXT NOT NULL,
[fullname] TEXT NOT NULL,
[password] TEXT NOT NULL);
第一步骤:描述表结构
from sqlalchemy.orm import mapper
from sqlalchemy import (Table, MetaData, Column,
CHAR, Text,
VARCHAR, Integer, String)
metadata = MetaData()
user = Table('users',
metadata,
Column('u_id', Integer, primary_key=True),
Column('u_name', Text),
Column('u_fullname', Text),
Column('u_password', Text))
第二步骤:定义映射类
class User(object):
def __init__(self, u_name, u_fullname, u_password):
self.u_name = u_name
self.u_fullname = u_fullname
self.u_password = u_password
第三步骤:绑映射描述
mapper(User, user)
说明:上面的方式是SB的原始方式,当大家都在乐此不疲的定义描述表,定义类再实现ORM的时候,SQLAlchemy团队搞出更加简单的映射方法,就是现代模式,只需要定义映射类就可以一次性完成所有的任务
2.现代映射模式
1.为了定义的类能够被SQLAlchemy管理,所以引入第三方类Declarative,我们所有定义的映射类都必须是它的子类
第一步骤:生成公共基类
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
说明:一个程序内,基类最好的唯一的,建议存储在全局变量如上Base中供所有映射类使用,通过
基类可以定义N多的映射子类,这些子类都能被SQLAlchemy Declarative管理
第二步骤:定义映射类
from sqlalchemy import (Column, CHAR, VARCHAR,
Integer, String, Text)
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
def __init__(self, u_name, u_fullname, u_password):
self.u_name = u_name
self.u_fullname = u_fullname
self.u_password = u_password
def __repr__(self):
return '<User(%s, %s, %s)>' % (self.u_name,
self.u_fullname,
self.u_password)
说明:如上代码就完成了先前经典模式所需三步,代码比原先更简洁和容易管理,同刚才经典模式的Table定义的Column,这个代表数据库表中的列,Text/Integer代表数据库表的字段类型,这样User类就建立起与数据库表的映射,真实的名字可以使用__tablename__指明,然后是表列的集合,包括id,name,fullname,password,通过primary_key=True已经指明id为主键,ORM为了能够实际映射表需要至少一个列被定义为主键列,多列当然也支持,其实上面的魔术方法__init__()/__repr__()都是可选的,可以手动加入任意方法或属性,唯一需要注意的是必须继承Base基类,保证SQLAlchary Declarative可管理这些映射类和数据库表.随着User映射类通过Declaractive_base系统构造成功,我们就拥有了相关的定义信息,例如经典模式中的Table()描述,也包含映射到表中的类,User自身,可以通过User.__table__来获取表描述,User.__mapper__来获取映射对象,Base.metedata来获取metedata对象,如果想将定义好数据结构实现为实体数据库,只需要Base.metedata.create_all(engine)即可
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Text, create_engine
def main():
"""Main function.
"""
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
# 获取经典模式下的Table()表描述信息
pprint.pprint(repr(User.__table__))
# 获取经典模式下mapper()表和类映射关系
pprint.pprint(repr(User.__mapper__))
# 将表结构写入数据库
sql_conn_str = 'mysql+mysqldb://root:root@127.0.0.1/xmdevops?charset=utf8'
sql_engine = create_engine(sql_conn_str, echo=True)
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
main()
会话创建:
1.创建类的实例,declearative会自动检测子类构造方法,如果没有定义会自动定义,主键默认为None,因为数据对象未持久化存储
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Text, create_engine
def main():
"""Main function.
"""
# 创建基类
Base = declarative_base()
# 定义表结构
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
""" 可以省略__init__()构造方法,因为Base基类会检测,缺少构造方法会自动补上,
但其提供的构造方法建议使用键值对的参数访问方式,包含我们用Column定义映射的列
"""
def __init__(self, u_name, u_fullname, u_password):
self.u_name = u_name
self.u_fullname = u_fullname
self.u_password = u_password
# 实例映射类(id也可以传入,通常意义上这类主键由系统自动维护,无需为其赋值)
user_manman = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lz')
# id为None是因为没有对对象持久化存储
print user_manman.u_id
print user_manman.u_name
print user_manman.u_fullname
print user_manman.u_password
if __name__ == '__main__':
main()
2.ORM的操作句柄被称为会话SESSION,为了使用会话需要首先创建会话实例(先建立引擎然基于引擎创建会话,其实先建立会话再绑定引擎也是可以滴)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
def main():
"""Main function.
"""
sql_conn_str = 'mysql+mysqldb://root:root@127.0.0.1/xmdevops?charset=utf8'
# 方法一: 先创建引擎再基于引擎创建会话
sql_engine = create_engine(sql_conn_str)
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
# 方法二: 先创建会话再绑定引擎
SQLSession = sessionmaker()
SQLSession.configure(bind=sql_engine)
sql_session = SQLSession()
if __name__ == '__main__':
main()
说明:到此就获取了由engine维护的数据库连接池,并且会维持内存中的映射数据直到commit提交或是更改或是关闭会话对象
添加对象:
1.要想将映射到实体表中的映射类如User持久化,需要将这个User类建立的对象实例添加到上面创建好的会话实例中
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建会话类
from sqlalchemy.orm import sessionmaker
# 创建表结构
from sqlalchemy import Text, Integer, Column, create_engine
Base = declarative_base()
# 定义uses表结构
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
# 省略__init__()构造,Base自动构造
# 获取表结构(比默认打印更直观)
def __repr__(self):
return '<User(%s, %s, %s)>' % (self.u_name,
self.u_fullname,
self.u_password)
# 创建数据引擎
def create_sql_engine(uname, upass, rhost, rdatabase,
rport=3306, rcharset='utf8'):
"""Create database engine from common use.
Args:
uname : sql user name
upass : sql user pass
rhost : remote sql host
rport : remote sql port
rdatabase: remote sql database
rcharset : remote sql charset, default utf8
Returns:
engine
"""
sql_conn_str = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s' % (uname,
upass,
rhost,
rport,
rdatabase,
rcharset)
sql_engine = create_engine(sql_conn_str, echo=True)
return sql_engine
# 创建会话实例
def create_orm_session(sql_engine):
"""Create database session base sql engine.
Returns:
session
"""
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
return sql_session
# 创建表结构
def create_table_struct(sql_engine):
"""Create table struct above.
Args:
sql_engine: sql engine api
Returns:
None
"""
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 一次添加单个对象
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lz')
orm_session.add(usr_lmm)
# 一次添加多个对象
orm_session.add_all([
User(u_name='lmm1', u_fullname='limanman1', u_password='lm_521314_lz1'),
User(u_name='lmm2', u_fullname='limanman2', u_password='lm_521314_lz2'),
User(u_name='lmm3', u_fullname='limanman3', u_password='lm_521314_lz3')
])
# 查询测试LazyLoad策略
orm_session.query(User).filter_by(u_name='lmm').first()
说明:SQLAlchemy采用Lazyload策略,add到orm_session此时usr_lmm只是被标记为Pending准备状态,并没有执行任何可能导致数据库变化的SQL语句,只有查询对象,对象的一个属性或显示调用flush时才会将Pending状态的数据写入数据表,如下我们用Query对象结合日志来说明,Query对象返回的是一个User实例,就是我们之前持久化的.由于我们指定的echo=True,如下的调试信息中可以看到其实先Insert然后再Select,也就是说你在实际操作持久化数据时才会由延迟加载真正触发数据库操作
2016-04-22 19:35:58,393 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,393 INFO sqlalchemy.engine.base.Engine ('lmm', 'limanman', 'lm_521314_lz')
2016-04-22 19:35:58,396 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,397 INFO sqlalchemy.engine.base.Engine ('lmm1', 'limanman1', 'lm_521314_lz1')
2016-04-22 19:35:58,400 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,401 INFO sqlalchemy.engine.base.Engine ('lmm2', 'limanman2', 'lm_521314_lz2')
2016-04-22 19:35:58,403 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,403 INFO sqlalchemy.engine.base.Engine ('lmm3', 'limanman3', 'lm_521314_lz3')
2016-04-22 19:35:58,410 INFO sqlalchemy.engine.base.Engine SELECT users.u_id AS users_u_id, users.u_name AS users_u_name, users.u_fullname AS users_u_fullname, users.u_password AS users_u_password
FROM users
WHERE users.u_name = %s
LIMIT %s
2016-04-22 19:35:58,410 INFO sqlalchemy.engine.base.Engine ('lmm', 1)
<User(lmm, limanman, lm_521314_lz)>
2.如果要将orm缓存数据显式的提交反馈到数据库里,需要调用orm_session.commit()告诉session目前的添加和改动,操作完成后session引用的数据库连接资源被回收到连接池,接下来对于这个session的操作会触发一个新的事物申请新的连接池连接资源
3.session是作为事务来工作,所以我们可以回滚先前所做的更改,查询时首先会在标识映射表中查询,查询不到才会去实体数据库中查找
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 一次添加单个对象
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lz')
# 将usr_lmm在标识映射表中标识为Pending状态
orm_session.add(usr_lmm)
# 更改实例属性
usr_lmm.u_name = 'lzz'
# 依据名称查询,触发持久化操作,将Pending状态的对象写入数据库,但此时并没commit,所以实体数据库中必然没有记录
print '------------------------------------------------------'
print orm_session.query(User).filter_by(u_name='lzz').first()
print '------------------------------------------------------'
# 回滚之前的更改
orm_session.rollback()
# 回滚后数据未持久化,此时查询的数据来自于实体数据库
print orm_session.query(User).filter_by(u_name='lmm').first()
多表关联:
1.实际生产中数据存储多表联查,常用的1-N(一对多)关系(再创建一个邮件表,每个用户可以关联一个或是多个邮件地址)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建会话类
from sqlalchemy.orm import sessionmaker, aliased, relationship, backref
# 创建表结构
from sqlalchemy import Text, Integer, Column, create_engine, func, ForeignKey
Base = declarative_base()
# 定义uses表结构
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
# 省略__init__()构造,Base自动构造
# 获取表结构(比默认打印更直观)
def __repr__(self):
return '<User(%s, %s, %s)>' % (self.u_name,
self.u_fullname,
self.u_password)
# 定义Address表结构
class Address(Base):
__tablename__ = 'addresses'
a_id = Column(Integer, primary_key=True)
a_email = Column(Text, nullable=False)
u_id = Column(Integer, ForeignKey('users.u_id'))
"""
通过relationship实现Address.users获取用户实例集,back_populates='addresses'
必须双向指定,backref='addresses'作用相同但是可以单向指定,双向引用,addresses任意指定
主要是通过User.addresses获取邮件实例集
"""
users = relationship('User', backref='addresses')
def __repr__(self):
return '<Address(%s)>' % (self.a_email)
# 创建数据引擎
def create_sql_engine(uname, upass, rhost, rdatabase,
rport=3306, rcharset='utf8'):
"""Create database engine from common use.
Args:
uname : sql user name
upass : sql user pass
rhost : remote sql host
rport : remote sql port
rdatabase: remote sql database
rcharset : remote sql charset, default utf8
Returns:
engine
"""
sql_conn_str = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s' % (uname,
upass,
rhost,
rport,
rdatabase,
rcharset)
sql_engine = create_engine(sql_conn_str, echo=True)
return sql_engine
# 创建会话实例
def create_orm_session(sql_engine):
"""Create database session base sql engine.
Returns:
session
"""
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
return sql_session
# 创建表结构
def create_table_struct(sql_engine):
"""Create table struct above.
Args:
sql_engine: sql engine api
Returns:
None
"""
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加用户记录
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lm')
# 添加地址对象
usr_lmm.addresses = [
Address(a_email='1784763743@qq.com'),
Address(a_email='xmdevops@vip.qq.com'),
]
# 添加到会话,提交
orm_session.add(usr_lmm)
orm_session.commit()
# 通过正反向关联获取用户信息
last_user = orm_session.query(User).filter(User.u_name=='lmm').all()[-1]
print 'user: %s' % (last_user.u_name)
for cur_a_email in last_user.addresses:
print cur_a_email.a_email
2.实际生产中数据存储多表联查,常用的N-N(多对多)关系(创建一个主机表和主机组表,一个主机可以属于多个表,一个主机组可以包含多个主机)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建会话类
from sqlalchemy.orm import sessionmaker, aliased, relationship, backref
# 创建表结构
from sqlalchemy import (Text, Integer, String, Table,
Column, create_engine, func, ForeignKey)
Base = declarative_base()
# 主机-2-主机组类
# host_id + group_id 唯一不重复(一个主机可以属于多个主机组),关联另外两个表
host_2_group = Table('host_2_group', Base.metadata,
Column('host_id', Integer, ForeignKey('host.id'),
autoincrement=False, primary_key=True),
Column('group_id', Integer, ForeignKey('group.id'),
autoincrement=False, primary_key=True))
# 主机类
class Host(Base):
__tablename__ = 'host'
id = Column(Integer,
unique=True,
nullable=False,
autoincrement=True,
primary_key=True)
hostname = Column(String(32),
nullable=False)
haddress = Column(String(128),
nullable=False)
hostport = Column(Integer,
nullable=False)
password = Column(String(64),
nullable=True)
auth_key = Column(String(256),
nullable=True)
# User基于host_2_group来关联group表,backref反向引用,则Group基于host_2_group来关联host表
groups = relationship('Group', backref='hosts', secondary=host_2_group)
def __repr__(self):
return ('<Host(id: %s hostname: %s haddress: %s password: %s'
'hostport: %s auth_key: %s)>') % ( self.id, self.hostname,
self.haddress, self.hostport,
self.password, self.auth_key)
# 主机组类
class Group(Base):
__tablename__ = 'group'
id = Column(Integer,
unique=True,
nullable=False,
autoincrement=True,
primary_key=True)
groupname = Column(String(32),
nullable=False)
def __repr__(self):
return '<Group(id: %s groupname: %s)>' % (self.id, self.groupname)
# 创建数据引擎
def create_sql_engine(uname, upass, rhost, rdatabase,
rport=3306, rcharset='utf8'):
"""Create database engine from common use.
Args:
uname : sql user name
upass : sql user pass
rhost : remote sql host
rport : remote sql port
rdatabase: remote sql database
rcharset : remote sql charset, default utf8
Returns:
engine
"""
sql_conn_str = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s' % (uname,
upass,
rhost,
rport,
rdatabase,
rcharset)
sql_engine = create_engine(sql_conn_str, echo=True)
return sql_engine
# 创建会话实例
def create_orm_session(sql_engine):
"""Create database session base sql engine.
Returns:
session
"""
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
return sql_session
# 创建表结构
def create_table_struct(sql_engine):
"""Create table struct above.
Args:
sql_engine: sql engine api
Returns:
None
"""
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加主机组
#xm_newnatserver = Group(groupname='newnatserver')
#xm_statushelper = Group(groupname='statushelper')
#orm_session.add_all([
#xm_newnatserver,
#xm_statushelper
#])
# 添加主机
#group_instances = orm_session.query(Group).all()
#for grp_instance in group_instances:
#for i in xrange(1, 3):
#h_hostname = 'xm-server-%s' % (i)
#cur_host = Host(hostname=h_hostname,
#haddress='127.0.0.1',
#hostport=22,
#password='lm_521314_lz',
#auth_key=None)
## 关联主机和主机组
#cur_host.groups = [grp_instance,]
#orm_session.add(cur_host)
# 查询每个主机组中的主机
for cur_grp in orm_session.query(Group).all():
grp_hosts = cur_grp.hosts
print '%s' % (cur_grp),
for cur_host in grp_hosts:
print '''
%s''' % (cur_host),
# 查询每个主机所属主机组
for cur_host in orm_session.query(Host).all():
usr_group = cur_host.groups
print '%s -> %s' % (cur_host, usr_group)
orm_session.commit()
增删查改:
1.Query对象其实返回的是上面添加到session里面的可迭代的实例对象列表,Query对象的每一步操作依然返回Query对象,体现了它串联强大特性
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
for cur_instance in orm_session.query(User).order_by(User.u_id):
print '''%s
u_id : %s
u_name : %s
u_fullname: %s
u_password: %s''' % (cur_instance, cur_instance.u_id,
cur_instance.u_name,
cur_instance.u_fullname, cur_instance.u_password)
# 增加记录项 - add(instance)/add_all([instance1,...,instancen])
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加多项记录
orm_session.add_all([
User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lm'),
User(u_name='lzz', u_fullname='liuzhen', u_password='lz_521314_lz'),
])
orm_session.commit()
# 删除记录项 - filter+delete删除过滤项
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 删除符合条件的记录
from sqlalchemy import or_
orm_session.query(User).filter(or_(User.u_name == 'lmm',
User.u_name == 'lzz',)).delete()
orm_session.commit()
# 查询指定列 - 返回元组类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 查询指定的列时返回的是元组
for (cur_inst,
u_name,
u_fullname,
u_password) in orm_session.query(User, User.u_name,
User.u_fullname, User.u_password):
print '''%s
u_name : %s
u_fullname: %s
u_password: %s''' % (cur_inst, u_name, u_fullname, u_password)
# 设置列别名 - 返回元组类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 查询指定的列时返回的是元组
for u_name in orm_session.query(User.u_name.label('name')).all():
print u_name
# 设置类别名 - 多次引用起别名
from sqlalchemy.orm import aliased
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 起别名
u = aliased(User, name='u')
# 查询指定的列时返回的是元组
for (u_inst,
u_name,
u_fullname,
u_password) in orm_session.query(u, u.u_name,
u.u_fullname,
u.u_password):
print '''%s
u_name : %s
u_fullname: %s
u_password: %s''' % (u, u_name, u_fullname, u_password)
# 排序限制 - 返回列表类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 结合order_by一起使用,数组分片实现
for cur_instance in orm_session.query(User).order_by(User.u_id)[1:]:
print '''%s
u_name : %s
u_fullname: %s
u_password: %s''' % (cur_instance,
cur_instance.u_name,
cur_instance.u_fullname,
cur_instance.u_password)
# 返回所有行
pprint.pprint(orm_session.query(User).filter(User.u_name.like('lmm%')).all())
# 返回首行
pprint.pprint(orm_session.query(User).filter(User.u_name.like('lmm%')).first())
# 筛选过滤 - filter_by关键词参数过滤/filter支持PY自身操作符,更强大
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 相等测试
pprint.pprint(orm_session.query(User).filter(User.u_name=='lmm').all())
# 不等测试
pprint.pprint(orm_session.query(User).filter(User.u_name!='lmm').all())
# like测试
pprint.pprint(orm_session.query(User).filter(User.u_name.like('lmm%')).all())
# in 测试
pprint.pprint(orm_session.query(User).filter(User.u_name.in_(['lmm1', 'lmm2', 'lmm3'])).all())
# not in 测试
pprint.pprint(orm_session.query(User).filter(~User.u_name.in_(['lmm1', 'lmm2', 'lmm3'])).all())
# is null测试
pprint.pprint(orm_session.query(User).filter(User.u_name == None).all())
# is not null测试
pprint.pprint(orm_session.query(User).filter(User.u_name != None).all())
# and 测试
# 方法一: and_
from sqlalchemy import and_
pprint.pprint(orm_session.query(User).filter(and_(User.u_name=='lmm', User.u_fullname=='limanman')).all())
# 方法二: 再次filter Query对象
pprint.pprint(orm_session.query(User).filter(User.u_name=='lmm').filter(User.u_fullname=='limanman').all())
# or 测试
from sqlalchemy import or_
pprint.pprint(orm_session.query(User).filter(or_(User.u_name=='lmm', User.u_name.in_(['lmm1', 'lmm2', 'lmm3']))).all())
# 原义SQL - SQL查询字符串作为查询参数,还支持绑定参数
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 绑定参数基于字符串的SQL指派,使用冒号来标记代替参数,然后用params方法指定相应的值
pprint.pprint(orm_session.query(User).filter('u_name=:name and u_fullname=:fullname').params(name='lmm', fullname='limanman').all())
# 极端SQL - 直接使用SQL语句脱离orm框架,不建议使用
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 绑定参数基于SQL语句,使用冒号来标记代替参数,然后用params方法指定相应的值
pprint.pprint(orm_session.query(User).from_statement('select * from users where u_name=:name and u_fullname=:fullname').params(name='lmm', fullname='limanman').all())
# SQL计数 - 返回整型类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 返回结果集行数
pprint.pprint(orm_session.query(User).filter(User.u_name.in_(['lmm1', 'lmm2', 'lmm3'])).count())
或
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 返回结果集行数
pprint.pprint(orm_session.query(func.count(User.u_id)).scalar())
# 分组统计 - 返回列表类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 按照每个用户分组,统计用户数
pprint.pprint(orm_session.query(User.u_name, func.count(User.u_name).label('u_count')).group_by(User.u_name).all())
# 左右联查 - inner join + group_by
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加用户记录
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lm')
# 添加地址对象
usr_lmm.addresses = [
Address(a_email='1784763743@qq.com'),
Address(a_email='xmdevops@vip.qq.com'),
]
# 添加到会话,提交
orm_session.add(usr_lmm)
orm_session.commit()
# inner join - 引用外键表,自动找外键
pprint.pprint(orm_session.query(User).join(User.addresses).all())
# inner join - 手动指定表,自动找外键
pprint.pprint(orm_session.query(User).join(Address).all())
# 连接 - 排序 - 分组
pprint.pprint(orm_session.query(User.u_name, func.count(Address.a_email)).join(Address).filter(User.u_id==Address.u_id).group_by(User.u_name).all())
# 修改记录项 - update更新过滤项
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 修改记录项
orm_session.query(User).filter(User.u_name=='lmm').update({
'u_password': 'lz_521314_lz',
})
orm_session.commit()
reference:
https://my.oschina.net/pydevops/blog/664872
此模块儿常用来把关系数据库的表结构映射到对象上,允许开发人员首先设计数据模型,并能决定稍候可视化数据的方式(CLI/WEB/GUI),和以往的先绝对如何在框架允许的范围内使用数据模型的开发方法完全相反,它兼容众多数据库(SQLite/MySQL/Postgres/Oracle/MS-SQL/SQLServer/Firebird)等(http://www.sqlalchemy.org/organizations.html)
会话实例:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建引擎
from sqlalchemy import create_engine
# 创建会话类
from sqlalchemy.orm import sessionmaker
def main():
"""Main function.
"""
# mysqldb是数据库引擎,root:root@127.0.0.1分别为帐号:密码@主机,xmdevops为数据库,charset为字符集
sql_connect_str = ('mysql+mysqldb://root:root@127.0.0.1/'
'xmdevops?charset=utf8')
# 通过连接字符串创建一个引擎,echo=True会通过logging模块儿来输出日志
sql_engine = create_engine(sql_connect_str, echo=True)
# 基于数据库引擎创建会话类
SessionCls = sessionmaker(bind=sql_engine)
# 实例化一个会话,其实创建了默认为5个连接的连接池
sql_session = SessionCls()
s.execute(clause, params=None, mapper=None, bind=None, **kw) -> ResultProxy
说明: 执行SQL语句
s.commit() -> None
说明:提交
s.rollback()
说明:回滚
s.close() -> None
说明:关闭内存中的当前会话
s.close_all() -> None
说明:关闭内存中的所有会话
游标实例:
r.keys() -> list
说明:列表形式返回当前解决集的所有列字段
r.closed -> bool
说明:判断游标是否关闭
r.close() -> None
说明:关闭游标
r.fetchall() -> list
说明:游标获取所有结果集
r.fetchone() -> list
说明:游标获取一行结果集
r.fetchmany(size=None) -> list
说明:游标获取多行结果集
r.first() -> list
说明:游标获取结果集第一行
r.rowcount -> int
说明:返回游标结果集行数
r.__dict__
说明:返回游标记过集的字典形式
定义映射:
1.经典映射模式,需要先描述这个表,例如要描述如下表结构
CREATE TABLE [users] ([id] INTEGER PRIMARY KEY,
[name] TEXT NOT NULL,
[fullname] TEXT NOT NULL,
[password] TEXT NOT NULL);
第一步骤:描述表结构
from sqlalchemy.orm import mapper
from sqlalchemy import (Table, MetaData, Column,
CHAR, Text,
VARCHAR, Integer, String)
metadata = MetaData()
user = Table('users',
metadata,
Column('u_id', Integer, primary_key=True),
Column('u_name', Text),
Column('u_fullname', Text),
Column('u_password', Text))
第二步骤:定义映射类
class User(object):
def __init__(self, u_name, u_fullname, u_password):
self.u_name = u_name
self.u_fullname = u_fullname
self.u_password = u_password
第三步骤:绑映射描述
mapper(User, user)
说明:上面的方式是SB的原始方式,当大家都在乐此不疲的定义描述表,定义类再实现ORM的时候,SQLAlchemy团队搞出更加简单的映射方法,就是现代模式,只需要定义映射类就可以一次性完成所有的任务
2.现代映射模式
1.为了定义的类能够被SQLAlchemy管理,所以引入第三方类Declarative,我们所有定义的映射类都必须是它的子类
第一步骤:生成公共基类
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
说明:一个程序内,基类最好的唯一的,建议存储在全局变量如上Base中供所有映射类使用,通过
基类可以定义N多的映射子类,这些子类都能被SQLAlchemy Declarative管理
第二步骤:定义映射类
from sqlalchemy import (Column, CHAR, VARCHAR,
Integer, String, Text)
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
def __init__(self, u_name, u_fullname, u_password):
self.u_name = u_name
self.u_fullname = u_fullname
self.u_password = u_password
def __repr__(self):
return '<User(%s, %s, %s)>' % (self.u_name,
self.u_fullname,
self.u_password)
说明:如上代码就完成了先前经典模式所需三步,代码比原先更简洁和容易管理,同刚才经典模式的Table定义的Column,这个代表数据库表中的列,Text/Integer代表数据库表的字段类型,这样User类就建立起与数据库表的映射,真实的名字可以使用__tablename__指明,然后是表列的集合,包括id,name,fullname,password,通过primary_key=True已经指明id为主键,ORM为了能够实际映射表需要至少一个列被定义为主键列,多列当然也支持,其实上面的魔术方法__init__()/__repr__()都是可选的,可以手动加入任意方法或属性,唯一需要注意的是必须继承Base基类,保证SQLAlchary Declarative可管理这些映射类和数据库表.随着User映射类通过Declaractive_base系统构造成功,我们就拥有了相关的定义信息,例如经典模式中的Table()描述,也包含映射到表中的类,User自身,可以通过User.__table__来获取表描述,User.__mapper__来获取映射对象,Base.metedata来获取metedata对象,如果想将定义好数据结构实现为实体数据库,只需要Base.metedata.create_all(engine)即可
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Text, create_engine
def main():
"""Main function.
"""
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
# 获取经典模式下的Table()表描述信息
pprint.pprint(repr(User.__table__))
# 获取经典模式下mapper()表和类映射关系
pprint.pprint(repr(User.__mapper__))
# 将表结构写入数据库
sql_conn_str = 'mysql+mysqldb://root:root@127.0.0.1/xmdevops?charset=utf8'
sql_engine = create_engine(sql_conn_str, echo=True)
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
main()
会话创建:
1.创建类的实例,declearative会自动检测子类构造方法,如果没有定义会自动定义,主键默认为None,因为数据对象未持久化存储
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Text, create_engine
def main():
"""Main function.
"""
# 创建基类
Base = declarative_base()
# 定义表结构
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
""" 可以省略__init__()构造方法,因为Base基类会检测,缺少构造方法会自动补上,
但其提供的构造方法建议使用键值对的参数访问方式,包含我们用Column定义映射的列
"""
def __init__(self, u_name, u_fullname, u_password):
self.u_name = u_name
self.u_fullname = u_fullname
self.u_password = u_password
# 实例映射类(id也可以传入,通常意义上这类主键由系统自动维护,无需为其赋值)
user_manman = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lz')
# id为None是因为没有对对象持久化存储
print user_manman.u_id
print user_manman.u_name
print user_manman.u_fullname
print user_manman.u_password
if __name__ == '__main__':
main()
2.ORM的操作句柄被称为会话SESSION,为了使用会话需要首先创建会话实例(先建立引擎然基于引擎创建会话,其实先建立会话再绑定引擎也是可以滴)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
def main():
"""Main function.
"""
sql_conn_str = 'mysql+mysqldb://root:root@127.0.0.1/xmdevops?charset=utf8'
# 方法一: 先创建引擎再基于引擎创建会话
sql_engine = create_engine(sql_conn_str)
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
# 方法二: 先创建会话再绑定引擎
SQLSession = sessionmaker()
SQLSession.configure(bind=sql_engine)
sql_session = SQLSession()
if __name__ == '__main__':
main()
说明:到此就获取了由engine维护的数据库连接池,并且会维持内存中的映射数据直到commit提交或是更改或是关闭会话对象
添加对象:
1.要想将映射到实体表中的映射类如User持久化,需要将这个User类建立的对象实例添加到上面创建好的会话实例中
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建会话类
from sqlalchemy.orm import sessionmaker
# 创建表结构
from sqlalchemy import Text, Integer, Column, create_engine
Base = declarative_base()
# 定义uses表结构
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
# 省略__init__()构造,Base自动构造
# 获取表结构(比默认打印更直观)
def __repr__(self):
return '<User(%s, %s, %s)>' % (self.u_name,
self.u_fullname,
self.u_password)
# 创建数据引擎
def create_sql_engine(uname, upass, rhost, rdatabase,
rport=3306, rcharset='utf8'):
"""Create database engine from common use.
Args:
uname : sql user name
upass : sql user pass
rhost : remote sql host
rport : remote sql port
rdatabase: remote sql database
rcharset : remote sql charset, default utf8
Returns:
engine
"""
sql_conn_str = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s' % (uname,
upass,
rhost,
rport,
rdatabase,
rcharset)
sql_engine = create_engine(sql_conn_str, echo=True)
return sql_engine
# 创建会话实例
def create_orm_session(sql_engine):
"""Create database session base sql engine.
Returns:
session
"""
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
return sql_session
# 创建表结构
def create_table_struct(sql_engine):
"""Create table struct above.
Args:
sql_engine: sql engine api
Returns:
None
"""
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 一次添加单个对象
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lz')
orm_session.add(usr_lmm)
# 一次添加多个对象
orm_session.add_all([
User(u_name='lmm1', u_fullname='limanman1', u_password='lm_521314_lz1'),
User(u_name='lmm2', u_fullname='limanman2', u_password='lm_521314_lz2'),
User(u_name='lmm3', u_fullname='limanman3', u_password='lm_521314_lz3')
])
# 查询测试LazyLoad策略
orm_session.query(User).filter_by(u_name='lmm').first()
说明:SQLAlchemy采用Lazyload策略,add到orm_session此时usr_lmm只是被标记为Pending准备状态,并没有执行任何可能导致数据库变化的SQL语句,只有查询对象,对象的一个属性或显示调用flush时才会将Pending状态的数据写入数据表,如下我们用Query对象结合日志来说明,Query对象返回的是一个User实例,就是我们之前持久化的.由于我们指定的echo=True,如下的调试信息中可以看到其实先Insert然后再Select,也就是说你在实际操作持久化数据时才会由延迟加载真正触发数据库操作
2016-04-22 19:35:58,393 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,393 INFO sqlalchemy.engine.base.Engine ('lmm', 'limanman', 'lm_521314_lz')
2016-04-22 19:35:58,396 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,397 INFO sqlalchemy.engine.base.Engine ('lmm1', 'limanman1', 'lm_521314_lz1')
2016-04-22 19:35:58,400 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,401 INFO sqlalchemy.engine.base.Engine ('lmm2', 'limanman2', 'lm_521314_lz2')
2016-04-22 19:35:58,403 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,403 INFO sqlalchemy.engine.base.Engine ('lmm3', 'limanman3', 'lm_521314_lz3')
2016-04-22 19:35:58,410 INFO sqlalchemy.engine.base.Engine SELECT users.u_id AS users_u_id, users.u_name AS users_u_name, users.u_fullname AS users_u_fullname, users.u_password AS users_u_password
FROM users
WHERE users.u_name = %s
LIMIT %s
2016-04-22 19:35:58,410 INFO sqlalchemy.engine.base.Engine ('lmm', 1)
<User(lmm, limanman, lm_521314_lz)>
2.如果要将orm缓存数据显式的提交反馈到数据库里,需要调用orm_session.commit()告诉session目前的添加和改动,操作完成后session引用的数据库连接资源被回收到连接池,接下来对于这个session的操作会触发一个新的事物申请新的连接池连接资源
3.session是作为事务来工作,所以我们可以回滚先前所做的更改,查询时首先会在标识映射表中查询,查询不到才会去实体数据库中查找
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 一次添加单个对象
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lz')
# 将usr_lmm在标识映射表中标识为Pending状态
orm_session.add(usr_lmm)
# 更改实例属性
usr_lmm.u_name = 'lzz'
# 依据名称查询,触发持久化操作,将Pending状态的对象写入数据库,但此时并没commit,所以实体数据库中必然没有记录
print '------------------------------------------------------'
print orm_session.query(User).filter_by(u_name='lzz').first()
print '------------------------------------------------------'
# 回滚之前的更改
orm_session.rollback()
# 回滚后数据未持久化,此时查询的数据来自于实体数据库
print orm_session.query(User).filter_by(u_name='lmm').first()
多表关联:
1.实际生产中数据存储多表联查,常用的1-N(一对多)关系(再创建一个邮件表,每个用户可以关联一个或是多个邮件地址)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建会话类
from sqlalchemy.orm import sessionmaker, aliased, relationship, backref
# 创建表结构
from sqlalchemy import Text, Integer, Column, create_engine, func, ForeignKey
Base = declarative_base()
# 定义uses表结构
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
# 省略__init__()构造,Base自动构造
# 获取表结构(比默认打印更直观)
def __repr__(self):
return '<User(%s, %s, %s)>' % (self.u_name,
self.u_fullname,
self.u_password)
# 定义Address表结构
class Address(Base):
__tablename__ = 'addresses'
a_id = Column(Integer, primary_key=True)
a_email = Column(Text, nullable=False)
u_id = Column(Integer, ForeignKey('users.u_id'))
"""
通过relationship实现Address.users获取用户实例集,back_populates='addresses'
必须双向指定,backref='addresses'作用相同但是可以单向指定,双向引用,addresses任意指定
主要是通过User.addresses获取邮件实例集
"""
users = relationship('User', backref='addresses')
def __repr__(self):
return '<Address(%s)>' % (self.a_email)
# 创建数据引擎
def create_sql_engine(uname, upass, rhost, rdatabase,
rport=3306, rcharset='utf8'):
"""Create database engine from common use.
Args:
uname : sql user name
upass : sql user pass
rhost : remote sql host
rport : remote sql port
rdatabase: remote sql database
rcharset : remote sql charset, default utf8
Returns:
engine
"""
sql_conn_str = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s' % (uname,
upass,
rhost,
rport,
rdatabase,
rcharset)
sql_engine = create_engine(sql_conn_str, echo=True)
return sql_engine
# 创建会话实例
def create_orm_session(sql_engine):
"""Create database session base sql engine.
Returns:
session
"""
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
return sql_session
# 创建表结构
def create_table_struct(sql_engine):
"""Create table struct above.
Args:
sql_engine: sql engine api
Returns:
None
"""
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加用户记录
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lm')
# 添加地址对象
usr_lmm.addresses = [
Address(a_email='1784763743@qq.com'),
Address(a_email='xmdevops@vip.qq.com'),
]
# 添加到会话,提交
orm_session.add(usr_lmm)
orm_session.commit()
# 通过正反向关联获取用户信息
last_user = orm_session.query(User).filter(User.u_name=='lmm').all()[-1]
print 'user: %s' % (last_user.u_name)
for cur_a_email in last_user.addresses:
print cur_a_email.a_email
2.实际生产中数据存储多表联查,常用的N-N(多对多)关系(创建一个主机表和主机组表,一个主机可以属于多个表,一个主机组可以包含多个主机)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建会话类
from sqlalchemy.orm import sessionmaker, aliased, relationship, backref
# 创建表结构
from sqlalchemy import (Text, Integer, String, Table,
Column, create_engine, func, ForeignKey)
Base = declarative_base()
# 主机-2-主机组类
# host_id + group_id 唯一不重复(一个主机可以属于多个主机组),关联另外两个表
host_2_group = Table('host_2_group', Base.metadata,
Column('host_id', Integer, ForeignKey('host.id'),
autoincrement=False, primary_key=True),
Column('group_id', Integer, ForeignKey('group.id'),
autoincrement=False, primary_key=True))
# 主机类
class Host(Base):
__tablename__ = 'host'
id = Column(Integer,
unique=True,
nullable=False,
autoincrement=True,
primary_key=True)
hostname = Column(String(32),
nullable=False)
haddress = Column(String(128),
nullable=False)
hostport = Column(Integer,
nullable=False)
password = Column(String(64),
nullable=True)
auth_key = Column(String(256),
nullable=True)
# User基于host_2_group来关联group表,backref反向引用,则Group基于host_2_group来关联host表
groups = relationship('Group', backref='hosts', secondary=host_2_group)
def __repr__(self):
return ('<Host(id: %s hostname: %s haddress: %s password: %s'
'hostport: %s auth_key: %s)>') % ( self.id, self.hostname,
self.haddress, self.hostport,
self.password, self.auth_key)
# 主机组类
class Group(Base):
__tablename__ = 'group'
id = Column(Integer,
unique=True,
nullable=False,
autoincrement=True,
primary_key=True)
groupname = Column(String(32),
nullable=False)
def __repr__(self):
return '<Group(id: %s groupname: %s)>' % (self.id, self.groupname)
# 创建数据引擎
def create_sql_engine(uname, upass, rhost, rdatabase,
rport=3306, rcharset='utf8'):
"""Create database engine from common use.
Args:
uname : sql user name
upass : sql user pass
rhost : remote sql host
rport : remote sql port
rdatabase: remote sql database
rcharset : remote sql charset, default utf8
Returns:
engine
"""
sql_conn_str = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s' % (uname,
upass,
rhost,
rport,
rdatabase,
rcharset)
sql_engine = create_engine(sql_conn_str, echo=True)
return sql_engine
# 创建会话实例
def create_orm_session(sql_engine):
"""Create database session base sql engine.
Returns:
session
"""
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
return sql_session
# 创建表结构
def create_table_struct(sql_engine):
"""Create table struct above.
Args:
sql_engine: sql engine api
Returns:
None
"""
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加主机组
#xm_newnatserver = Group(groupname='newnatserver')
#xm_statushelper = Group(groupname='statushelper')
#orm_session.add_all([
#xm_newnatserver,
#xm_statushelper
#])
# 添加主机
#group_instances = orm_session.query(Group).all()
#for grp_instance in group_instances:
#for i in xrange(1, 3):
#h_hostname = 'xm-server-%s' % (i)
#cur_host = Host(hostname=h_hostname,
#haddress='127.0.0.1',
#hostport=22,
#password='lm_521314_lz',
#auth_key=None)
## 关联主机和主机组
#cur_host.groups = [grp_instance,]
#orm_session.add(cur_host)
# 查询每个主机组中的主机
for cur_grp in orm_session.query(Group).all():
grp_hosts = cur_grp.hosts
print '%s' % (cur_grp),
for cur_host in grp_hosts:
print '''
%s''' % (cur_host),
# 查询每个主机所属主机组
for cur_host in orm_session.query(Host).all():
usr_group = cur_host.groups
print '%s -> %s' % (cur_host, usr_group)
orm_session.commit()
增删查改:
1.Query对象其实返回的是上面添加到session里面的可迭代的实例对象列表,Query对象的每一步操作依然返回Query对象,体现了它串联强大特性
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
for cur_instance in orm_session.query(User).order_by(User.u_id):
print '''%s
u_id : %s
u_name : %s
u_fullname: %s
u_password: %s''' % (cur_instance, cur_instance.u_id,
cur_instance.u_name,
cur_instance.u_fullname, cur_instance.u_password)
# 增加记录项 - add(instance)/add_all([instance1,...,instancen])
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加多项记录
orm_session.add_all([
User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lm'),
User(u_name='lzz', u_fullname='liuzhen', u_password='lz_521314_lz'),
])
orm_session.commit()
# 删除记录项 - filter+delete删除过滤项
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 删除符合条件的记录
from sqlalchemy import or_
orm_session.query(User).filter(or_(User.u_name == 'lmm',
User.u_name == 'lzz',)).delete()
orm_session.commit()
# 查询指定列 - 返回元组类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 查询指定的列时返回的是元组
for (cur_inst,
u_name,
u_fullname,
u_password) in orm_session.query(User, User.u_name,
User.u_fullname, User.u_password):
print '''%s
u_name : %s
u_fullname: %s
u_password: %s''' % (cur_inst, u_name, u_fullname, u_password)
# 设置列别名 - 返回元组类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 查询指定的列时返回的是元组
for u_name in orm_session.query(User.u_name.label('name')).all():
print u_name
# 设置类别名 - 多次引用起别名
from sqlalchemy.orm import aliased
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 起别名
u = aliased(User, name='u')
# 查询指定的列时返回的是元组
for (u_inst,
u_name,
u_fullname,
u_password) in orm_session.query(u, u.u_name,
u.u_fullname,
u.u_password):
print '''%s
u_name : %s
u_fullname: %s
u_password: %s''' % (u, u_name, u_fullname, u_password)
# 排序限制 - 返回列表类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 结合order_by一起使用,数组分片实现
for cur_instance in orm_session.query(User).order_by(User.u_id)[1:]:
print '''%s
u_name : %s
u_fullname: %s
u_password: %s''' % (cur_instance,
cur_instance.u_name,
cur_instance.u_fullname,
cur_instance.u_password)
# 返回所有行
pprint.pprint(orm_session.query(User).filter(User.u_name.like('lmm%')).all())
# 返回首行
pprint.pprint(orm_session.query(User).filter(User.u_name.like('lmm%')).first())
# 筛选过滤 - filter_by关键词参数过滤/filter支持PY自身操作符,更强大
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 相等测试
pprint.pprint(orm_session.query(User).filter(User.u_name=='lmm').all())
# 不等测试
pprint.pprint(orm_session.query(User).filter(User.u_name!='lmm').all())
# like测试
pprint.pprint(orm_session.query(User).filter(User.u_name.like('lmm%')).all())
# in 测试
pprint.pprint(orm_session.query(User).filter(User.u_name.in_(['lmm1', 'lmm2', 'lmm3'])).all())
# not in 测试
pprint.pprint(orm_session.query(User).filter(~User.u_name.in_(['lmm1', 'lmm2', 'lmm3'])).all())
# is null测试
pprint.pprint(orm_session.query(User).filter(User.u_name == None).all())
# is not null测试
pprint.pprint(orm_session.query(User).filter(User.u_name != None).all())
# and 测试
# 方法一: and_
from sqlalchemy import and_
pprint.pprint(orm_session.query(User).filter(and_(User.u_name=='lmm', User.u_fullname=='limanman')).all())
# 方法二: 再次filter Query对象
pprint.pprint(orm_session.query(User).filter(User.u_name=='lmm').filter(User.u_fullname=='limanman').all())
# or 测试
from sqlalchemy import or_
pprint.pprint(orm_session.query(User).filter(or_(User.u_name=='lmm', User.u_name.in_(['lmm1', 'lmm2', 'lmm3']))).all())
# 原义SQL - SQL查询字符串作为查询参数,还支持绑定参数
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 绑定参数基于字符串的SQL指派,使用冒号来标记代替参数,然后用params方法指定相应的值
pprint.pprint(orm_session.query(User).filter('u_name=:name and u_fullname=:fullname').params(name='lmm', fullname='limanman').all())
# 极端SQL - 直接使用SQL语句脱离orm框架,不建议使用
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 绑定参数基于SQL语句,使用冒号来标记代替参数,然后用params方法指定相应的值
pprint.pprint(orm_session.query(User).from_statement('select * from users where u_name=:name and u_fullname=:fullname').params(name='lmm', fullname='limanman').all())
# SQL计数 - 返回整型类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 返回结果集行数
pprint.pprint(orm_session.query(User).filter(User.u_name.in_(['lmm1', 'lmm2', 'lmm3'])).count())
或
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 返回结果集行数
pprint.pprint(orm_session.query(func.count(User.u_id)).scalar())
# 分组统计 - 返回列表类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 按照每个用户分组,统计用户数
pprint.pprint(orm_session.query(User.u_name, func.count(User.u_name).label('u_count')).group_by(User.u_name).all())
# 左右联查 - inner join + group_by
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加用户记录
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lm')
# 添加地址对象
usr_lmm.addresses = [
Address(a_email='1784763743@qq.com'),
Address(a_email='xmdevops@vip.qq.com'),
]
# 添加到会话,提交
orm_session.add(usr_lmm)
orm_session.commit()
# inner join - 引用外键表,自动找外键
pprint.pprint(orm_session.query(User).join(User.addresses).all())
# inner join - 手动指定表,自动找外键
pprint.pprint(orm_session.query(User).join(Address).all())
# 连接 - 排序 - 分组
pprint.pprint(orm_session.query(User.u_name, func.count(Address.a_email)).join(Address).filter(User.u_id==Address.u_id).group_by(User.u_name).all())
# 修改记录项 - update更新过滤项
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 修改记录项
orm_session.query(User).filter(User.u_name=='lmm').update({
'u_password': 'lz_521314_lz',
})
orm_session.commit()
reference:
https://my.oschina.net/pydevops/blog/664872