丽水市城市建设投资有限责任公司网站免费seo公司
文章目录
- SQLAlchemy
- 介绍
- SQLAlchemy入门
- 使用原生sql
- 使用orm
- 外键关系
- 一对多关系
- 多对多关系
- 基于scoped_session实现线程安全
- 简单表操作
- 实现方案
- CRUD
- Flask 集成 sqlalchemy
SQLAlchemy
介绍
SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
pip3 install sqlalchemy
组成部分:
Engine,框架的引擎
Connection Pooling ,数据库连接池
Dialect,选择连接数据库的DB API种类
Schema/Types,架构和类型
SQL Exprression Language,SQL表达式语言
SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
MySQL-Pythonmysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>pymysqlmysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]MySQL-Connectormysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>cx_Oracleoracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
扩充:
orm 框架,除django内置的 和 sqlalchemy 外,还有 peewee 轻量级框架,这些都是同步 orm 框架
搭配异步web 框架 fastapi sanic 的异步orm框架有 peewee-async
虽然异步框架搭配 同步 orm 框架也能用,不过局限性比较大。一般来讲,一旦用了异步,后续所有都需要用异步
SQLAlchemy入门
使用原生sql
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from urllib import parse
import threadinguser = "root"
password = "xxx@000"
pwd = parse.quote_plus(password) # 解决密码中含@符导致报错
host = "127.0.0.1:"
# 第一步: 创建engine
engine = create_engine(f"mysql+pymysql://{user}:{pwd}@{host}3306/test1?charset=utf8",max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 池中没有线程最多等待的时间,否则报错pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)# 第二步:使用
def task():conn = engine.raw_connection() # 从连接池中取一个连接cursor = conn.cursor()sql = "select * from signer"cursor.execute(sql)print(cursor.fetchall())if __name__ == '__main__':for i in range(20):t = threading.Thread(target=task)t.start()
使用orm
import datetime
from sqlalchemy.ext.declarative import declarative_base
from model import engine # 用的简单使用里面的engine
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, IndexBase = declarative_base() # 基类# 表模型
class Users(Base):__tablename__ = 'users' # 数据库表名称, 必须写id = Column(Integer, primary_key=True) # id 主键name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空email = Column(String(32), unique=True)# datetime.datetime.now不能加括号,加了括号,以后永远是当前时间ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = ( # 可选UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一Index('ix_id_name', 'name', 'email'), # 索引)def init_db():"""根据类创建数据库表:return:"""Base.metadata.create_all(engine)def drop_db():"""根据类删除数据库表:return:"""Base.metadata.drop_all(engine)if __name__ == '__main__':init_db() # 创建表# drop_db() # 删除表
总结:
- 生成一个基类,所有表模型都要继承这个基类(base)
- 继承父类写表模型,写字段
- 迁移通过表模型生成表
注意:
- 不管是 sqlalchemy 还是 django 的orm 都不支持对数据库的操作,只能对表进行操作
- sqlalchemy 只支持 创建和删除表,不支持修改表(django orm支持)。sqlalchemy 需要借助第三方实现
外键关系
外键关系,实际上就是一个 ForeignKey ,指定 一对一,一对多,多对多三种关系
一对多关系
class Hobby(Base):__tablename__ = 'hobby'id = Column(Integer, primary_key=True)caption = Column(String(50), default='篮球')class Person(Base):__tablename__ = 'person'nid = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=True)# hobby指的是tablename而不是类名hobby_id = Column(Integer, ForeignKey("hobby.id"))# 跟数据库无关,不会新增字段,只用于快速链表操作# 类名,backref用于反向查询 参数 uselist=False , 设置就变成了一对一,其他和一对多一样hobby=relationship('Hobby',backref='pers')# hobby=relationship('Hobby',backref='pers', uselist=False) # 一对一关系
多对多关系
class Boy2Girl(Base):__tablename__ = 'boy2girl'id = Column(Integer, primary_key=True, autoincrement=True)girl_id = Column(Integer, ForeignKey('girl.id'))boy_id = Column(Integer, ForeignKey('boy.id'))class Girl(Base):__tablename__ = 'girl'id = Column(Integer, primary_key=True)name = Column(String(64), unique=True, nullable=False)class Boy(Base):__tablename__ = 'boy'id = Column(Integer, primary_key=True, autoincrement=True)hostname = Column(String(64), unique=True, nullable=False)# 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以servers = relationship('Girl', secondary='boy2girl', backref='boys')'''
girl_id = Column(Integer, ForeignKey("hobby.id", ondelete='SET NULL')) # 一般用SET NULL
外键约束1. RESTRICT:若子表中有父表对应的关联数据,删除父表对应数据,会阻止删除。默认项2. NO ACTION:在MySQL中,同RESTRICT。3. CASCADE:级联删除。4. SET NULL:父表对应数据被删除,子表对应数据项会设置为NULL。
'''
扩充
在 django 中,外键管理有个参数 db_contraint=False
用来在逻辑上关联表,但实体不建立约束。同样在SQLAlchemy
中也可以通过配值 relationship
参数来实现同样的效果
class Boy2Girl(Base):__tablename__ = 'boy2girl'id = Column(Integer, primary_key=True, autoincrement=True)girl_id = Column(Integer) # 不用ForeignKeyboy_id = Column(Integer)gitl = db.relationship("Girl",# uselist=False, # 一对一设置backref=backref("to_course", uselist=False), # backref用于反向查询 uselist 作用同上lazy="subquery", # 懒加载 用来指定sqlalchemy 什么时候加载数据primaryjoin="Girl.id==Boy2Girl.girl_id", # 指定对应关系foreign_keys="Boy2Girl.girl_id" # 指定表的外键字段)
'''
lazy 可选值select:就是访问到属性的时候,就会全部加载该属性的数据 默认值joined:对关联的两个表使用联接subquery:与joined类似,但使用子子查询dynamic:不加载记录,但提供加载记录的查询,也就是生成query对象
'''
基于scoped_session实现线程安全
简单表操作
基于sqlalchemy orm 来简单实现一条数据的添加
from sqlalchemy.orm import sessionmakerfrom model import engine
from db_model import Users# 定义一个 session, 以后操作数据都用 session 来执行
Session = sessionmaker(bind=engine)
session = Session()
# 创建User对象
usr = Users(name="yxh", email="152@11.com", extra="xxx")
# 通过 user对象 添加到session中
session.add(usr)
# 提交,才会刷新到数据库中,不提交不会执行
session.commit()
从上面demo中,可以发现一点,session 如果是一个全局对象。那么在多线程的情况下,并发使用同一个变量 session 是不安全的,解决方案如下:
- 将session定义在局部,每一个view函数都定义一个session。 代码冗余,不推荐
- 基于scoped_session 实现线程安全。原理同 request对象,g对象一致。也是基于local,给每一个线程创造一个session
实现方案
from sqlalchemy.orm import sessionmaker, scoped_session
from model import engine
from db_model import Users# 定义一个 session
Session = sessionmaker(bind=engine)
# session = Session()
session = scoped_session(Session) # 后续使用这个session就是线程安全的
# 创建User对象
usr = Users(name="yxh", email="152@11.com", extra="xxx")
# 通过 user对象 添加到session中
session.add(usr)
# 提交,才会刷新到数据库中,不提交不会执行
session.commit()
CRUD
基础
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.sql import text
from model import engine
from db_model import Users# 定义一个 session
Session = sessionmaker(bind=engine)
# session = Session()
session = scoped_session(Session) # 后续使用这个session就是线程安全的# 1 增加操作
obj1 = Users(name="yxh003")
session.add(obj1)
# 增加多个,不同对象
session.add_all([Users(name="yxh009"),Users(name="yxh008"),
])
session.commit()# 2 删除操作---》查出来再删---》
session.query(Users).filter(Users.id > 2).delete()
session.commit()# 3 修改操作--》查出来改 传字典
session.query(Users).filter(Users.id > 0).update({"name": "yxh"})
# 类似于django的F查询
# 字符串加
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
# 数字加
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
session.commit()# 4 查询操作----》
r1 = session.query(Users).all() # 查询所有
# 只取age列,把name重命名为xx
# select name as xx,age from user;
r2 = session.query(Users.name.label('xx'), Users.email).all()# filter传的是表达式,filter_by传的是参数
r3 = session.query(Users).filter(Users.name == "yxh").all()
r3 = session.query(Users).filter(Users.id >= 1).all()
r4 = session.query(Users).filter_by(name='yxh').all()
r5 = session.query(Users).filter_by(name='yxh').first()
# :value 和:name 相当于占位符,用params传参数
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='yxh').order_by(Users.id).all()
# 自定义查询sql
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='yxh').all()# 执行原生sql
# 查询
cursor = session.execute("select * from users")
result = cursor.fetchall()
# 添加
cursor = session.execute('insert into users(name) values(:value)', params={"value": 'yxh'})
session.commit()
print(cursor.lastrowid)
进阶
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.sql import text, func
from sqlalchemy import and_, or_
from model import engine
from db_model import Users, Person, Favor# 定义一个 session
Session = sessionmaker(bind=engine)
# session = Session()
session = scoped_session(Session) # 后续使用这个session就是线程安全的# 条件
# select * form user where name =lqz
ret = session.query(Users).filter_by(name='lqz').all()# 表达式,and条件连接
# select * from user where id >1 and name = lqz
ret = session.query(Users).filter(Users.id > 1, Users.name == 'lqz').all()# select * from user where id between 1,3 and name = lqz
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'lqz').all()# # 注意下划线
# select * from user where id in (1,3,4)
ret = session.query(Users).filter(Users.id.in_([1, 3, 4])).all()
# # ~非,除。。外# select * from user where id not in (1,3,4)
ret = session.query(Users).filter(~Users.id.in_([1, 3, 4])).all()# # 二次筛选
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='lqz'))).all()# # or_包裹的都是or条件,and_包裹的都是and条件
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2,and_(Users.name == 'eric', Users.id > 3),Users.extra != "")).all()# 通配符,以e开头,不以e开头
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()# # 限制,用于分页,区间
ret = session.query(Users)[1:2]# # 排序,根据name降序排列(从大到小)
ret = session.query(Users).order_by(Users.id.desc()).all()
# # 第一个条件重复后,再按第二个条件升序排
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()# 分组
# select * from user group by user.extra;
ret = session.query(Users).group_by(Users.extra).all()
# # 分组之后取最大id,id之和,最小id
# select max(id),sum(id),min(id) from user group by name ;
ret = session.query(func.max(Users.id),func.sum(Users.id),func.min(Users.id)).group_by(Users.name).all()# haviing筛选
# select max(id),sum(id),min(id) from user group by name having min(id)>2;
ret = session.query(func.max(Users.id),func.sum(Users.id),func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2).all()# select max(id),sum(id),min(id) from user where id >=1 group by name having min(id)>2;
ret = session.query(func.max(Users.id),func.sum(Users.id),func.min(Users.id)).filter(Users.id >= 1).group_by(Users.name).having(func.min(Users.id) > 2).all()# 连表(默认用orm中forinkey关联)# select * from user,favor where user.id=favor.id
ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()# join表,默认是inner join
# select * from Person inner join favor on person.favor=favor.id;
ret = session.query(Person).join(Favor).all()
# isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可
ret = session.query(Person).join(Favor, isouter=True).all()
ret = session.query(Favor).join(Person, isouter=True).all()# 打印原生sql
aa = session.query(Person).join(Favor, isouter=True)
# print(aa)# 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
# select * from person left join favor on person.id=favor.id;
ret = session.query(Person).join(Favor, Person.id == Favor.id, isouter=True).all()# 组合 UNION 操作符用于合并两个或多个 SELECT 语句的结果集 多用于分表后 上下连表
# union和union all union 去重, union all 不去重
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()
Flask 集成 sqlalchemy
flask 中集成 sqlalchemy 可以选择直接配置第三方库,来的更方便 比如:
-
flask_sqlalchemy 用来操作数据库
-
flask_migrate 用来实现表迁移(类似django)
'''
flask_migrate 中使用了flask_sqlalchemy 下载时,会自动帮你下载flask_sqlalchemy
flask_migrate 3.0之前和之后使用方法有区别。这里以2.x 做演示
'''
# flask_migrate使用步骤
from flask_sqlalchemy import SQLAlchemy
# Flask_SQLAlchemy给你包装了基类,和session,以后拿到db
db = SQLAlchemy() # 全局SQLAlchemy,就是线程安全的,内部就是上述那么实现的
app = Flask(__name__)
# SQLAlchemy 连接数据库配置是在 config 配置字典中获取的,所以需要我们将配置添加进去
app.config.from_object('DevelopmentConfig')
'''
基本写这些就够了
"SQLALCHEMY_DATABASE_URI"
"SQLALCHEMY_POOL_SIZE"
"SQLALCHEMY_POOL_TIME"
"SQLALCHEMY_POOL_RECYCLE"
"SQLALCHEMY_TRACK_MODIFICATIONS"
"SQLALCHEMY_ENGINE_OPTIONS"
'''
# 将db注册到app中,加载配置文件,flask-session,用一个类包裹一下app
db.init_app(app)
# 下面三句会创建出两个命令:runserver db 命令(flask_migrate)
manager=Manager(app)
Migrate(app, db)
manager.add_command('db', MigrateCommand ) # 添加一个db命令使用命令:1. python xxx.py db init # 初始化,刚开始干,生成一个migrate文件夹2. python xxx.py db migrate # 同django makemigartions3. python xxx.py db upgrade # 同django migrate
补充:
如果使用flask_sqlalchemy, 那么建表orm 的继承 db.Model 即可(和django越来越像了)