1. ORM

object-relation mapping,即兑现关系映射, 实现Python 模型对象对关系型数据库的映射

  • 优点

    • 只用 Python 面向对象操作,不需要用 SQL 语言

    • 实现了数据模型与数据库的解耦,实现了不同数据库相同操作

  • 缺点

    • Python 语言与 SQL 语言相互转换,性能慢

    • 不同的 ORM 之间操作不同

2. Flask-SQLAlchemy

是一个关系型数据库框架,提供了高层 ORM 与底层的原生数据库操作。

2.1 安装

flask-sqlalchemy 安装(清华源)

pip install flask-sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple --user


如果使用 MySQL 数据库,需要安装 MySQLdb 驱动

pip install flask-mysqldb -i https://pypi.tuna.tsinghua.edu.cn/simple


2.2 数据库连接设置

class Config(object):
    DEBUG = True
    # 数据库连接配置
    # SQLALCHEMY_DATABASE_URI = "数据库类型://数据库账号:密码@数据库地址:端口/数据库名称?charset=utf8mb4"
    SQLALCHEMY_DATABASE_URI = "mysql://root:123@127.0.0.1:3306/students?charset=utf8mb4"
    # 动态追踪修改设置,如未设置只会提示警告
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    # 查询时会显示原始SQL语句
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)


创建数据库

mysql -uroot -p123
create database students charset=utf8mb4;


2.3 SQLAlchemy字段类型 和约束类型

字段类型

  • integer 普通整型

  • smallinteger

  • biginteger

  • float

  • numeric

  • string

  • text

  • unicode

  • unicodetext

  • boolean

  • date

  • time

  • largebinary

约束类型

  • primary_key

  • unique

  • index

  • nullable

  • default

2.4 数据表的操作

1 创建表

from flask import Flask
app = Flask(__name__)

class Config(object):
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = "mysql://root:123@127.0.0.1:3306/students?charset=utf8mb4"
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    # 查询时会显示原始SQL语句
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)


"""模型类定义"""
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app=app)

# 所有的模型必须直接或间接继承于db.Model
class Student(db.Model):
    """学生信息模型"""
    # 声明与当前模型绑定的数据表名称
    __tablename__ = "db_students"
    # 字段定义
    id = db.Column(db.Integer, primary_key=True,comment="主键")
    name = db.Column(db.String(15), comment="姓名")
    age = db.Column(db.SmallInteger, comment="年龄")
    sex = db.Column(db.Boolean, default=True, comment="性别")
    email = db.Column(db.String(128), unique=True, comment="邮箱地址")
    money = db.Column(db.Numeric(10,2), default=0.0, comment="钱包")

    def __repr__(self):
        return f"{self.name}<Student>"

class Course(db.Model):
    """课程数据模型"""
    __tablename__ = "db_course"
    id = db.Column(db.Integer, primary_key=True, comment="主键")
    name = db.Column(db.String(64), unique=True, comment="课程")
    price = db.Column(db.Numeric(7, 2))
    # repr()方法类似于django的__str__,用于打印模型对象时显示的字符串信息
    def __repr__(self):
        return f'{self.name}<Course>'

class Teacher(db.Model):
    """老师数据模型"""
    __tablename__ = "db_teacher"
    id = db.Column(db.Integer, primary_key=True, comment="主键")
    name = db.Column(db.String(64), unique=True, comment="姓名")
    option = db.Column(db.Enum("讲师", "助教", "班主任"), default="讲师")

    def __repr__(self):
        return f"{self.name}< Teacher >"

@app.route("/")
def index():
    return "ok!"

if __name__ == '__main__':
    with app.app_context():
        # 检测数据库中是否存在和模型匹配的数据表。
        # 如果没有,则根据模型转换的建表语句进行建表。
        # 如果找到,则不会进行额外处理
        db.create_all()
    app.run(debug=True)
 with app.app_context():
    # create_all()方法执行的时候,需要放在模型的后面
    # 检测数据库中是否存在和模型匹配的数据表。
    # 如果没有,则根据模型转换的建表语句进行建表。
    # 如果找到,则不会进行额外处理
    db.create_all()

2 删除表

db.drop_all()

if __name__ == '__main__':
    with app.app_context():
        # db.drop_all()
        # 检测数据库中是否存在和模型匹配的数据表。
        # 如果没有,则根据模型转换的建表语句进行建表。
        # 如果找到,则不会进行额外处理
        db.create_all()
    app.run(debug=True)

2.5 数据操作

1 创建一条数据

方法 1:

# 在 student 类中执行
 @classmethod
    def add(cls):
        student = cls(name="小明", sex=True, age=17, email="123456@qq.com", money=100)
        db.session.add(student)
        db.session.commit()
        return student

方法 2:Student.add()

@app.route("/add")
def add():
    """添加一条数据"""
    student = Student.add()
    print(student)

    student = Student(name="小红",age=18,sex=False, email="xiaohong@qq.com", money=1000)
    db.session.add(student)
    db.session.commit()
    return "ok!"

2 创建多条数据

  • add_all() 方法

@app.route("/add_all")
def add_all():
    """添加多条数据"""
    st1 = Student(name='wang', email='wang@163.com', age=22, money=1000, sex=True)
    st2 = Student(name='zhang', email='zhang@189.com', age=22, money=1000, sex=True)
    st3 = Student(name='chen', email='chen@126.com', age=22, money=1000, sex=True)
    st4 = Student(name='zhou', email='zhou@163.com', age=22, money=1000, sex=True)
    st5 = Student(name='tang', email='tang@163.com', age=22, money=1000, sex=True)
    st6 = Student(name='wu', email='wu@gmail.com', age=22, money=1000, sex=True)
    st7 = Student(name='qian', email='qian@gmail.com', age=22, money=1000, sex=True)
    st8 = Student(name='liu', email='liu@163.com', age=22, money=1000, sex=True)
    st9 = Student(name='li', email='li@163.com', age=22, money=1000, sex=True)
    st10 = Student(name='sun', email='sun@163.com', age=22, money=1000, sex=True)
    db.session.add_all([st1,st2,st3,st4,st5,st6,st7,st8,st9,st10])
    db.session.commit()

    return "ok"

3 删除数据

@app.route("/delete")
def delete():
    """删除数据"""
    # 先查询出来
    student = Student.query.first()
    print(student)
    # 再进行删除
    db.session.delete(student)
    db.session.commit()

    # 直接条件删除[ 性能更好!!! ]
    Student.query.filter(Student.id > 5).delete()
    db.session.commit()
    return "ok"

4 更新数据

@app.route("/update")
def update():
    """更新数据"""
    # 先查询数据,然后进行更新,2条语句
    stu = Student.query.first()
    stu.name = 'dong'
    db.session.commit()

    # 直接根据条件更新,一条语句[乐观锁]
    Student.query.filter(Student.name == 'chen').update({'money': 1998})
    db.session.commit()

    # 字段引用[利用当前一条数据的字典值进行辅助操作,实现类似django里面F函数的效果]
    Student.query.filter(Student.name == "zhang").update({"money":Student.money+1000 * Student.age})
    db.session.commit()
    return "ok"

总代码

from flask import Flask
app = Flask(__name__)

class Config(object):
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = "mysql://root:123@127.0.0.1:3306/students?charset=utf8mb4
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    SQLALCHEMY_ECHO = True
app.config.from_object(Config)

"""模型类定义"""
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app=app)


class Student(db.Model):
    __tablename__ = "db_students"
   
    id = db.Column(db.Integer, primary_key=True,comment="主键")
    name = db.Column(db.String(15), comment="姓名")
    age = db.Column(db.SmallInteger, comment="年龄")
    sex = db.Column(db.Boolean, default=True, comment="性别")
    email = db.Column(db.String(128), unique=True, comment="邮箱地址")
    money = db.Column(db.Numeric(10,2), default=0.0, comment="钱包")

    def __repr__(self):
        return f"{self.name}<Student>"

    @classmethod
    def add(cls):
        student = cls(name="小明", sex=True, age=17, email="123456@qq.com", money=100)
        db.session.add(student)
        db.session.commit()
        return student

# 所有的模型必须直接或间接继承于db.Model
class Course(db.Model):
    """课程数据模型"""
    __tablename__ = "db_course"
    id = db.Column(db.Integer, primary_key=True, comment="主键")
    name = db.Column(db.String(64), unique=True, comment="课程")
    price = db.Column(db.Numeric(7, 2))
    # repr()方法类似于django的__str__,用于打印模型对象时显示的字符串信息
    def __repr__(self):
        return f'{self.name}<Course>'

class Teacher(db.Model):
    """老师数据模型"""
    __tablename__ = "db_teacher"
    id = db.Column(db.Integer, primary_key=True, comment="主键")
    name = db.Column(db.String(64), unique=True, comment="姓名")
    option = db.Column(db.Enum("讲师", "助教", "班主任"), default="讲师")

    def __repr__(self):
        return f"{self.name}< Teacher >"

@app.route("/add")
def add():
    """添加一条数据"""
    # student = Student.add()
    # print(student)

    # student = Student(name="小红",age=18,sex=False, email="xiaohong@qq.com", money=1000)
    # db.session.add(student)
    # db.session.commit()
    return "ok!"

@app.route("/add_all")
def add_all():
    """添加多条数据"""
    st1 = Student(name='wang', email='wang@163.com', age=22, money=1000, sex=True)
    st2 = Student(name='zhang', email='zhang@189.com', age=22, money=1000, sex=True)
    st3 = Student(name='chen', email='chen@126.com', age=22, money=1000, sex=True)
    st4 = Student(name='zhou', email='zhou@163.com', age=22, money=1000, sex=True)
    st5 = Student(name='tang', email='tang@163.com', age=22, money=1000, sex=True)
    st6 = Student(name='wu', email='wu@gmail.com', age=22, money=1000, sex=True)
    st7 = Student(name='qian', email='qian@gmail.com', age=22, money=1000, sex=True)
    st8 = Student(name='liu', email='liu@163.com', age=22, money=1000, sex=True)
    st9 = Student(name='li', email='li@163.com', age=22, money=1000, sex=True)
    st10 = Student(name='sun', email='sun@163.com', age=22, money=1000, sex=True)
    db.session.add_all([st1,st2,st3,st4,st5,st6,st7,st8,st9,st10])
    db.session.commit()

    return "ok"

@app.route("/delete")
def delete():
    """删除数据"""
    # 先查询出来
    student = Student.query.first()
    print(student)
    # 再进行删除
    db.session.delete(student)
    db.session.commit()

    # 直接条件删除[ 性能更好!!! ]
    Student.query.filter(Student.id > 5).delete()
    db.session.commit()
    return "ok"


@app.route("/update")
def update():
    """更新数据"""
    # 先查询数据,然后进行更新,2条语句
    stu = Student.query.first()
    stu.name = 'dong'
    db.session.commit()

    # 直接根据条件更新,一条语句[乐观锁]
    Student.query.filter(Student.name == 'chen').update({'money': 1998})
    db.session.commit()

    # 字段引用[利用当前一条数据的字典值进行辅助操作,实现类似django里面F函数的效果]
    Student.query.filter(Student.name == "zhang").update({"money":Student.money+1000 * Student.age})
    db.session.commit()
    return "ok"


@app.route("/get")
def get():
    # 根据主键获取一条数据
    student = Student.query.get(4)
    print(student)

    # # 返回所有结果数据数据
    # student_list = Student.query.all()
    # print(student_list)

    # # 返回第一个结果数据
    # first_student = Student.query.filter(Student.id<5).first()
    # print(first_student)

    # 返回结果的数量
    ret = Student.query.filter(Student.id<5).count()
    print(ret)
    return "ok"
    
if __name__ == '__main__':
    with app.app_context():
        # 检测数据库中是否存在和模型匹配的数据表。
        # 如果没有,则根据模型转换的建表语句进行建表。
        # 如果找到,则不会进行额外处理

        db.create_all()
    app.run(debug=True)

2.6 数据的查询

查询过滤器

过滤器说明
filter()把过滤器添加到原查询上,返回一个新查询
filter_by()把等值过滤器添加到原查询上,返回一个新查询
limit()使用指定的值限定原查询返回的结果数量
offset()设置结果范围的开始位置,偏移原查询返回的结果,返回一个新查询
order_by()根据指定条件对原查询结果进行排序,返回一个新查询
group_by()根据指定条件对原查询结果进行分组,返回一个新查询

查询结果的方法

方法说明
all()以列表形式返回查询的所有结果
first()返回查询的第一个结果,如果未查到,返回None
first_or_404()返回查询的第一个结果,如果未查到,返回404
get()返回指定主键对应的行,如不存在,返回None
get_or_404()返回指定主键对应的行,如不存在,返回404
count()返回查询结果的数量
paginate()返回一个Paginate分页器对象,它包含指定范围内的结果
having返回结果中符合条件的数据,必须跟在group by后面,其他地方无法使用。
@app.route("/get")
def get():
    # 根据主键获取一条数据
    student = Student.query.get(4)
    print(student)

    # 返回所有结果数据数据
    student_list = Student.query.all()
    print(student_list)

    # 返回第一个结果数据
    first_student = Student.query.filter(Student.id<5).first()
    print(first_student)

    # 返回结果的数量
    ret = Student.query.filter(Student.id<5).count()
    print(ret)

    return "ok"
from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

class Config(object):
    DEBUG = True
    # 数据库连接配置
    # SQLALCHEMY_DATABASE_URI = "数据库类型://数据库账号:密码@数据库地址:端口/数据库名称?charset=utf8mb4"
    SQLALCHEMY_DATABASE_URI = "mysql://root:123@127.0.0.1:3306/students?charset=utf8mb4"
    # 动态追踪修改设置,如未设置只会提示警告
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    # 查询时会显示原始SQL语句
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)


"""模型类定义"""
db = SQLAlchemy(app=app)
# 等同于
# db = SQLAlchemy()
# db.init_app(app)

class Student(db.Model):
    """学生信息模型"""
    # 声明与当前模型绑定的数据表名称
    __tablename__ = "db_students"
    # 字段定义
    """
    create table db_student(
      id int primary key auto_increment comment="主键",
      name varchar(15) comment="姓名",
    )
    """
    id = db.Column(db.Integer, primary_key=True,comment="主键")
    name = db.Column(db.String(15), comment="姓名")
    age = db.Column(db.SmallInteger, comment="年龄")
    sex = db.Column(db.Boolean, default=True, comment="性别")
    email = db.Column(db.String(128), unique=True, comment="邮箱地址")
    money = db.Column(db.Numeric(10,2), default=0.0, comment="钱包")

    def __repr__(self):
        return f"{self.name}<Student>"

    @classmethod
    def add(cls):
        student = cls(name="小明", sex=True, age=17, email="123456@qq.com", money=100)
        db.session.add(student)
        db.session.commit()
        return student

# 所有的模型必须直接或间接继承于db.Model
class Course(db.Model):
    """课程数据模型"""
    __tablename__ = "db_course"
    id = db.Column(db.Integer, primary_key=True, comment="主键")
    name = db.Column(db.String(64), unique=True, comment="课程")
    price = db.Column(db.Numeric(7, 2))
    # repr()方法类似于django的__str__,用于打印模型对象时显示的字符串信息
    def __repr__(self):
        return f'{self.name}<Course>'

class Teacher(db.Model):
    """老师数据模型"""
    __tablename__ = "db_teacher"
    id = db.Column(db.Integer, primary_key=True, comment="主键")
    name = db.Column(db.String(64), unique=True, comment="姓名")
    option = db.Column(db.Enum("讲师", "助教", "班主任"), default="讲师")

    def __repr__(self):
        return f"{self.name}< Teacher >"

@app.route("/filter")
def filter():
    # 模糊查询
    # 使用163邮箱的所有用户
    student_list = Student.query.filter(Student.email.endswith("@163.com")).all()
    print(student_list)

    # 姓名以"zh"开头的
    student_list = Student.query.filter(Student.name.startswith("zh")).all()
    print(student_list)

    # 名字中带有"a"字母的数据
    student_list = Student.query.filter(Student.name.contains("a")).all()
    print(student_list)


    # 条件比较
    student_list = Student.query.filter(Student.age>18).all()
    print(student_list)

    # 多条件比较
    # 要求多个条件都要满足
    student_list = Student.query.filter(Student.age>18, Student.sex==True).all()
    print(student_list)

    return "ok"

@app.route("/filter_by")
def filter_by():
    """filter_by判断值相等情况"""
    # 单条件
    student_list = Student.query.filter_by(age=22).all()
    print(student_list)

    # 多条件
    student_list = Student.query.filter_by(age=22,sex=True).all()
    print(student_list)
    return "ok"


from sqlalchemy import and_,or_,not_
@app.route("/")
def multi():
    """filter多条件运算方法"""
    """and_ 与,并且的意思,必须所有条件都满足则返回结果"""
    # and_(条件1,条件2,....)  等价于  filter(条件1,条件2,.....)
    # age > 18 and email like "%163.com"
    # student_list = Student.query.filter(Student.age > 18, Student.email.endswith("163.com")).all()

    student_list = Student.query.filter(
        and_(
            Student.age > 18,
            Student.email.endswith("163.com")
        )
    ).all()

    """or_ 或,或者的意思,多个条件只有一个满足则返回结果"""
    # 查询性别为True,或者年龄大于18
    sex = 1 or age > 18
    student_list = Student.query.filter(
        or_(
            Student.sex==True,
            Student.age>18
        )
    ).all()
    print(student_list)



    # 复合条件的查询情况
    # 查询18岁的女生或者22岁的男生
    # (age=18 and sex=0) or (age = 22 and sex=1)
    student_list = Student.query.filter(
        or_(
            and_(Student.age==18, Student.sex==False),
            and_(Student.age==22, Student.sex==True),
        )
    ).all()

    print( student_list )

    # not_ 非,结果取反
    # 查询年龄不等于22
    student_list = Student.query.filter(Student.age != 22).all()
    print(student_list)
    student_list = Student.query.filter(not_(Student.age==22)).all()
    print(student_list)

    return "ok"

def filter():
    """范围查询"""

    # 查询id是 1 3 5 的学生信息
    student_list = Student.query.filter(Student.id.in_([1, 3, 5])).all()
    print(student_list)

    # # 查询id不是 1 3 5 的学生信息
    student_list = Student.query.filter(not_(Student.id.in_([1, 3, 5]))).all()
    print( student_list )

    """排序"""
    # 倒序[值从大到小]
    student_list = Student.query.order_by(Student.id.desc()).all()
     # 升序[值从小到大]
    student_list = Student.query.order_by(Student.id.asc()).all()
    
    # 多字段排序[第一个字段值一样时,比较第二个字段,进行排序]
    student_list = Student.query.order_by(Student.age.desc(), Student.id.asc() ).all()
    print(student_list)

    """限制结果数量"""
    student_list = Student.query.limit(2).all()
    print(student_list)

    """结果返回的开始下标位置,从0开始"""
    student_list = Student.query.offset(0).limit(2).all()
    print(student_list)
    student_list = Student.query.limit(2).offset(2).all()
    print(student_list)
    return "ok"

if __name__ == '__main__':
    with app.app_context():
        # 检测数据库中是否存在和模型匹配的数据表。
        # 如果没有,则根据模型转换的建表语句进行建表。
        # 如果找到,则不会进行额外处理

        db.create_all()
    app.run(debug=True)

参考链接:https://juejin.cn/post/6966490555027030047