基本查询
# 查询所有用户数据User.query.all() # 返回列表, 元素为模型对象# 查询有多少个用户User.query.count()# 查询第1个用户User.query.first() # 返回模型对象/None# 查询id为4的用户[3种方式]User.query.get(4) # 根据id查询,返回模型对象/None# BaseQuery对象可以续接具体查询条件 all/count/firstUser.query.filter_by(id=4).all() # 简单查询器 关键字实参设置字段值 返回BaseQuery对象User.query.filter(User.id == 4).first() # 复杂查询器 参数为比较运算/函数引用等 返回BaseQuery对象# 查询名字结尾字符为g的所有用户[开始 / 包含]User.query.filter(User.name.endswith("g")).all()User.query.filter(User.name.startswith("g")).all()User.query.filter(User.name.contains("n")).all()User.query.filter(User.name.like("w%n%g")).all() # 模糊查询# 查询名字和邮箱都以li开头的所有用户[2种方式]User.query.filter(User.name.startswith('li'), User.email.startswith('li')).all()from sqlalchemy import and_User.query.filter(and_(User.name.startswith('li'), User.email.startswith('li'))).all()# 查询age是25 或者 `email`以`itheima.com`结尾的所有用户from sqlalchemy import or_User.query.filter(or_(User.age==25, User.email.endswith("itheima.com"))).all()# 查询名字不等于wang的所有用户[2种方式]from sqlalchemy import not_User.query.filter(not_(User.name == 'wang')).all()User.query.filter(User.name != 'wang').all()# 查询id为[1, 3, 5, 7, 9]的用户User.query.filter(User.id.in_([1, 3, 5, 7, 9])).all()# 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个User.query.order_by(User.age, User.id.desc()).limit(5).all()# 查询年龄从小到大第2-5位的数据 2 3 4 5User.query.order_by(User.age).offset(1).limit(4).all()# 分页查询, 每页3个, 查询第2页的数据 paginate(页码, 每页条数)pn = User.query.paginate(2, 3)pn.pages 总页数 pn.page 当前页码 pn.items 当前页的数据 pn.total 总条数# 查询每个年龄的人数 # select age, count(name) from t_user group by age 分组聚合from sqlalchemy import funcdb.session.query(User.age, func.count(User.id)).group_by(User.age).all()# 只查询所有人的姓名和邮箱 优化查询 默认使用select *from sqlalchemy.orm import load_onlyUser.query.options(load_only(User.name, User.email)).all()for item in data:print(item.name, item.email)##原生sqlalchemy查询db.session.query(User.name, User.email).all()# 聚合查询,并且打上标签from sqlalchemy import funcdata = db.session.query(User.age, func.count(User.id).label("count")).group_by(User.age).all()for item in data:print(item[0], item[1])print(item.age, item.count) # 建议根据字段名取值
数据更新
# 更新方式1: 先查询后更新# 缺点: 并发情况下, 容易出现更新丢失问题 (Lost Update)# 先查询数据good = Good.query.filter(Good.name == '方便面').first()if good.count > 0:# 再更新字段值good.count = good.count - 1# 提交会话db.session.commit()# 更新方式2: update子查询 可以避免超卖和更新丢失问题 update t_good set count = count - 1 where name = '方便面' and count > 0;Good.query.filter(Good.name == '方便面', Good.count > 0).update({
'count': Good.count - 1})# 提交会话db.session.commit()
数据删除
方式一:good = Good.query.filter(Good.name == '方便面').first()# 删除数据db.session.delete(good)# 提交会话 增删改都要提交会话db.session.commit()方式二:Good.query.filter(Good.name == '方便面').delete()db.session.commit()
数据增加
good = Good(name='方便面', count=1)db.session.add(good)db.session.commit()
注意
- 对Session的增删改操作只是将数据变化保存到了内存中, 只有执行了flush操作数据库才会进行同步
db.session.flush()
- session.commit和查询操作内部都会自动执行flush操作
多表查询
user1 = User(name='张三')db.session.add(user1)db.session.flush() # 需要手动执行flush操作, 让主表生成主键, 否则外键关联失败# db.session.commit() # 有写场景下, 为了保证数据操作的原子性不能分成多个事务进行操作adr1 = Address(detail='中关村3号', user_id=user1.id)adr2 = Address(detail='华强北5号', user_id=user1.id)db.session.add_all([adr1, adr2, user1])db.session.commit()"""查询多表数据 需求: 查询姓名为"张三"的所有地址信息"""# 先根据姓名查找用户主键user1 = User.query.filter_by(name='张三').first()# 再根据主键到从表查询关联地址adrs = Address.query.filter_by(user_id=user1.id).all()for adr in adrs:print(adr.detail)# sqlalchemy的join查询data = db.session.query(Address.detail, User.id).join(User, Address.user_id == User.id).filter(User.name == '张三').all()for item in data:print(item.detail, item.id)