精华内容
下载资源
问答
  • flask sql查询 分页加查询总数 apply_obj = ApplyInfo.query.order_by(ApplyInfo.add_time).paginate(int(page_index), per_page=int(page_size), error_out=False) #模型对象+排序(对时间)第几页,每页显示几...

    flask sql查询

    分页加查询总数

     apply_obj = ApplyInfo.query.order_by(ApplyInfo.add_time).paginate(int(page_index), per_page=int(page_size),
                                                                              error_out=False)
              #模型对象+排序(对时间)第几页,每页显示几条
    page_total = apply_obj.total #获取总数
            data = [
                {'apply_id': i.apply_id, 'name': i.name, 'start_time': i.start_time.strftime('%Y-%m-%d'),
                 'end_time': i.end_time.strftime('%Y-%m-%d')} for i in apply_obj.items]
                 #对象取值使用. 时间格式化
    oday_page_view = PageInfoRecord.query.with_entities(func.count('*'),
                                                                 func.count(PageInfoRecord.user_id.distinct())).filter(
                and_(PageInfoRecord.into_time > today_time, PageInfoRecord.apply_id == apply_id)).first()
    
                #按时间取值 计算数量 计算去重后user_id的数量 多条件查询
     crop_type = '%{}%'.format(crop_type).replace(' ', '') #对like语句进行封装
     
     query_list = [UserBaseInfo.apply_id == apply_id] #封装sql语句 ==表名.apply_id=apply_id
     query = UserBaseInfo.phone.like("%" + user_msg + "%") #对like语句进行封装
    
    

    原生SQL中的 DAY HOURS YEARS MOUNH 使用 对时间进行进行分组查询需要数据

    page_view_size = db.session.execute(
                    'SELECT HOUR ( a.add_time ) AS days,count( a.page_id ) AS count FROM apply_page_record a WHERE a.add_time>= :start_time and a.add_time<=:end_time and a.apply_id=:apply_id AND DATE_FORMAT( add_time, "%y-%M-%d" ) GROUP BY days',
                    {'start_time': start_time, 'end_time': end_time, 'apply_id': apply_id})
                    
                    # HOUR 代表小时 查询 page_id出现的次数   DATE_FORMAT() 对时间进行格式化
    
    page_view_size = db.session.execute( # 按天进行查找
                    'SELECT DAY ( a.add_time ) AS days,count( a.page_id ) AS count FROM apply_page_record a WHERE a.add_time>= :start_time and a.add_time<=:end_time and a.apply_id=:apply_id AND DATE_FORMAT( add_time, "%y-%M-%d" ) GROUP BY days',
                    {'start_time': start_time, 'end_time': end_time, 'apply_id': apply_id})
                    
                    
     page_view_size = db.session.execute(# 按周去显示时间
                    'SELECT WEEK ( a.add_time ) AS days,count( a.page_id ) AS count FROM apply_page_record a WHERE a.add_time>= :start_time and a.add_time<=:end_time and a.apply_id=:apply_id AND DATE_FORMAT( add_time, "%y-%M-%d" ) GROUP BY days',
                    {'start_time': start_time, 'end_time': end_time, 'apply_id': apply_id})
                    
    

    批量插入数据

                apply_info = ApplyInfo(**args) 
                db.session.add(apply_info) 
                db.session.flush() #获取到新插入数据的id 并未生成到数据库
                apply_id = apply_info.apply_id 
                banner_img = ApplyBanner(
                    info_award=info_banner_url, apply_award=apply_banner_url, apply_id=apply_id)
                db.session.add(banner_img)
                award_list = pack_award_message(apply_award, info_award, apply_id) # 使用pack_award_message生成sql对象
                db.session.bulk_insert_mappings(ApplyAwardInfo, award_list) #bulk_insert_mappings进行批量插入 第一个对象为 插入的实例模型,第二个必须为列表
                db.session.commit()
    
    展开全文
  • SQLAlchemy 高级查询 # 先排序,再限制条数 News.query.order_by(News.clicks.desc()).limit(10 ) filter = [1,2,3 ] # 先过滤器,再排序(时间),最后分页paginate(当前页码,一页多少条,是否抛出异常) ...
    源码上是这么说的:
      对于每个修改服务器上内容的请求,你应该使用一次性令牌,并存储在 cookie里,并且在发送表单数据的同时附上它。在服务器再次接收数据之后,你要比较两个令牌,并确保它们相等
    1, 后端生成,csrf_token,跨域请求针对的是所有的请求!
    from flask_wtf.csrf import generate_csrf
        # 调用函数生成 csrf_token
        csrf_token = generate_csrf()
    
        # 请求钩子,每次请求都会设置 csrf_token值
        @app.after_request
        def after_request(responses):
            # 生成 csrf_token值 存储到 cookie 之中
            csrf_token = generate_csrf()
            # wtf 扩展会自动将 csrf_token 存在 redis 之中
            responses.set_cookie('csrf_token', csrf_token)
            return responses
    
        app.add_template_filter(do_index_class, 'db')
    2,前端请求后端视图,后端将生成的 csrf_token 设置到 cookie 之中
    前端会取出浏览器的的csrf_token 值,并且以 ajax的形式发送到后端!
    $.ajax({
                url: "/passport/register",
                type: "post",
                data: JSON.stringify(params),
                contentType: "application/json",
                headers: {
                    "X-CSRFToken": getCookie("csrf_token")
                },
                success: function (resp) {
                    if (resp.errno == "0") {
                        // 刷新当前界面
                        location.reload()
                    } else {
                        $("#register-password-err").html(resp.errmsg)
                        $("#register-password-err").show()
                    }
                }
            })
    3,后端会进行csrf_token 对比,对比通过说明是当前网站访问
    CSRFProtect(app)

     二,SQLAlchemy 高级查询

    # 先排序,再限制条数
    News.query.order_by(News.clicks.desc()).limit(10)
    filter = [1,2,3]
    # 先过滤器,再排序(时间),最后分页paginate(当前页码,一页多少条,是否抛出异常)
    News.query.filter(*filter).order_by(News.create_time.desc()).paginate(page, per_page, False)

     

    转载于:https://www.cnblogs.com/shi-qi/articles/9114281.html

    展开全文
  • Flask-sqlalchemy高级机制之多表查询 一、外键关联查询 1.数据关联 2.关联查询 对于一对多关系, 外键在多的一方, 即 从表 中 sqalchemy 中也可以通过 外键字段 实现数据关联 及 关联查询 1.数据关联 数据关联步骤...

    Flask-sqlalchemy高级机制之多表查询

    一、外键关联查询
    1.数据关联
    2.关联查询

    • 对于一对多关系, 外键在多的一方, 即 从表 中
    • sqalchemy 中也可以通过 外键字段 实现数据关联 及 关联查询

    在这里插入图片描述
    1.数据关联
    数据关联步骤:

    • 从表模型类中 定义外键字段
    • 从表模型对象 的外键字段 记录 主表主键
    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    
    app = Flask(__name__)
    
    # 相关配置
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    app.config['SQLALCHEMY_ECHO'] = False
    
    # 创建组件对象
    db = SQLAlchemy(app)
    
    
    # 用户表  主表(一)   一个用户可以有多个地址
    class User(db.Model):
        __tablename__ = 't_user'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(20))
    
    
    # 地址表   从表(多)
    class Address(db.Model):
        __tablename__ = 't_adr'
        id = db.Column(db.Integer, primary_key=True)
        detail = db.Column(db.String(20))
        user_id = db.Column(db.Integer)  # 定义外键
    
    
    @app.route('/')
    def index():
        """添加并关联数据"""
        user1 = User(name='张三')
        db.session.add(user1)
        db.session.flush()  # 需要手动执行flush操作, 让主表生成主键, 否则外键关联失败
        # db.session.commit()  # 有些场景下, 为了保证数据操作的原子性不能分成多个事务进行操作
    
        adr1 = Address(detail='中关村3号', user_id=user1.id)
        adr2 = Address(detail='华强北5号', user_id=user1.id)
        db.session.add_all([adr1, adr2])
        db.session.commit()
    
        return "index"
    
    
    
    if __name__ == '__main__':
        db.drop_all()
        db.create_all()
    
        app.run(debug=True)
    

    2.关联查询
    关联查询步骤: (以主查从为例)
    1.先查询主表数据
    2.再通过外键字段查询关联的从表数据

    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    
    app = Flask(__name__)
    
    # 相关配置
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    app.config['SQLALCHEMY_ECHO'] = False
    
    # 创建组件对象
    db = SQLAlchemy(app)
    
    
    # 用户表  一   一个用户可以有多个地址
    class User(db.Model):
        __tablename__ = 't_user'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(20))
    
    
    # 地址表   多
    class Address(db.Model):
        __tablename__ = 't_adr'
        id = db.Column(db.Integer, primary_key=True)
        detail = db.Column(db.String(20))
        user_id = db.Column(db.Integer)  # 定义外键
    
    
    @app.route('/demo')
    def demo():
        """查询多表数据  需求: 查询姓名为"张三"的所有地址信息"""
    
        # 1.先根据姓名查找到主表主键
        user1 = User.query.filter_by(name='张三').first()
    
        # 2.再根据主键到从表查询关联地址
        adrs = Address.query.filter_by(user_id=user1.id).all()
        for adr in adrs:
            print(adr.detail)
    
        return "demo"
    
    
    @app.route('/')
    def index():
        """添加并关联数据"""
    
        user1 = User(name='张三')
        db.session.add(user1)
        db.session.flush()  
        adr1 = Address(detail='中关村3号', user_id=user1.id)
        adr2 = Address(detail='华强北5号', user_id=user1.id)
        db.session.add_all([adr1, adr2])
        db.session.commit()
    
        return "index"
    
    
    if __name__ == '__main__':
        db.drop_all()
        db.create_all()
    
        app.run(debug=True)
    

    注意点:

    • 普通字段就可以作为外键使用, 只要其记录了对应的主表主键就可以关联查询
    • 如果需要 主从表数据 在同一个事务中添加并关联, 则可能需要 先手动调用session.flush方法 来执行插入操作并生成主键, 否则无法关联

    二、关系属性

    • 关系属性是 sqlalchemy 封装的一套查询关联数据的语法, 其目的为 让开发者使用 面向对象的形式 方便快捷的获取关联数据
    • 关系属性的 本质仍是外键
    • 关系属性使用步骤:
    1. 定义关系属性
    关系属性名 = db.relationship('关联数据所在的模型类')
    
    1. 外键字段设置外键参数
    外键字段 = db.Column(字段类型, db.ForeignKey(主表名.主键名))
    
    1. 通过关系属性获取关联数据
    模型对象.关系属性
    

    代码示例:

    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    
    app = Flask(__name__)
    
    # 相关配置
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    app.config['SQLALCHEMY_ECHO'] = False
    
    # 创建组件对象
    db = SQLAlchemy(app)
    
    
    # 用户表  一   一个用户可以有多个地址
    class User(db.Model):
        __tablename__ = 't_user'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(20))
        addresses = db.relationship('Address')  # 1.定义关系属性 relationship("关联数据所在的模型类")
    
    
    # 地址表   多
    class Address(db.Model):
        __tablename__ = 't_adr'
        id = db.Column(db.Integer, primary_key=True)
        detail = db.Column(db.String(20))
        # 2. 外键字段设置外键参数  db.ForeignKey('主表名.主键')
        user_id = db.Column(db.Integer, db.ForeignKey('t_user.id'))  
    
    
    @app.route('/')
    def index():
        """添加数据"""
        user1 = User(name='张三')
        db.session.add(user1)
        db.session.flush()
        adr1 = Address(detail='中关村3号', user_id=user1.id)
        adr2 = Address(detail='华强北5号', user_id=user1.id)
        db.session.add_all([adr1, adr2])
        db.session.commit()
    
        """查询多表数据  需求: 查询姓名为"张三"的所有地址信息"""
        # 先根据姓名查找用户主键
        user1 = User.query.filter_by(name='张三').first()
    
        # 3.使用关系属性获取关系数据
        for address in user1.addresses:
            print(address.detail)
    
        return "index"
    
    
    
    if __name__ == '__main__':
        # 重置所有继承自db.Model的表
        db.drop_all()
        db.create_all()
    
        app.run(debug=True)
    

    注意点:

    • 关系属性的本质还是外键, 所以数据关联还是通过外键来完成
    • 设置了外键参数的外键字段, 在创建表时会自动生成 外键约束
    • 对多关系属性返回值为列表, 元素为关联的模型对象
    • 关系属性和外键关联都可以查询关联数据, 但是 关系属性会取出关联表中的所有字段(效率较低), 所以实际开发中 推荐使用外键关联查询

    三、连接查询

    • 开发中有联表查询需求时, 一般会使用 join连接查询
    • sqlalchemy 也提供了对应的查询语法
    db.session.query(主表模型字段1, 主表模型字段2, 从表模型字段1, xx.. ).join(从表模型类, 主表模型类.主键 == 从表模型类.外键)
    
    • join语句 属于查询过滤器, 返回值也是 BaseQuery 类型对象
    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    
    app = Flask(__name__)
    
    # 相关配置
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    app.config['SQLALCHEMY_ECHO'] = False
    
    # 创建组件对象
    db = SQLAlchemy(app)
    
    
    # 用户表  一   
    class User(db.Model):
        __tablename__ = 't_user'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(20))
    
    
    # 地址表   多
    class Address(db.Model):
        __tablename__ = 't_adr'
        id = db.Column(db.Integer, primary_key=True)
        detail = db.Column(db.String(20))
        user_id = db.Column(db.Integer)  # 定义外键
    
    
    @app.route('/demo')
    def demo():
        """查询多表数据  需求: 查询姓名为"张三"的用户id和地址信息"""
    
        # sqlalchemy的join查询
        data = db.session.query(User.id, Address.detail).join(Address, User.id == Address.user_id).filter(User.name == '张三').all()
        for item in data:
            print(item.detail, item.id)
    
        return "demo"
    
    
    @app.route('/')
    def index():
        """添加数据"""
        user1 = User(name='张三')
        db.session.add(user1)
        db.session.flush()
        adr1 = Address(detail='中关村3号', user_id=user1.id)
        adr2 = Address(detail='华强北5号', user_id=user1.id)
        db.session.add_all([adr1, adr2, user1])
        db.session.commit()
    
        return 'index'
    
    
    if __name__ == '__main__':
        db.drop_all()
        db.create_all()
    
        app.run(debug=True)
    

    关联查询的性能优化:

    • 可以发现 无论使用 外键 还是 关系属性 查询关联数据, 都需要查询两次, 一次查询用户数据, 一次查询地址数据
    • 两次查询就需要发送两次请求给数据库服务器, 如果数据库和web应用不在一台服务器中, 则 网络IO会对查询效率产生一定影响
    • 我们可以考虑使用 连接查询 join 使用一条语句就完成关联数据的查询
      例:
    # 使用join语句优化关联查询
    adrs = Address.query.join(User, Address.user_id == User.id).filter(User.name == '张三').all()  # 列表中包含地址模型对象
    

    关于拆分join语句:

    • 使用 JOIN连接查询 还是 拆分join为多条简单语句 并不是一个可以一概而论的问题, 查询性能受 联表数量、索引设计、缓存处理、数据库拆分、SQL优化器等多方面因素影响, 每个场景不尽相同
    • 一般可以先考虑使用 JOIN连接查询, 在发现性能问题后再考虑进一步的优化尝试
    展开全文
  • falsk_sqlalchemy高级

    2019-07-09 11:04:24
    关联查询 基本查询 from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 配置数据库的连接地址 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1...

     

     关联查询

    • 基本查询
    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    
    app = Flask(__name__)
    
    # 配置数据库的连接地址
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'
    
    # 配置是否监听数据库变化, 性能较差, 不属于sqlalchemy本体
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    # 一旦开启, 可以显示底层执行的SQL语句
    app.config['SQLALCHEMY_ECHO'] = True
    
    # 创建数据库连接
    db = SQLAlchemy(app)
    
    ​
    
    # 用户表  一
    class User(db.Model):
        __tablename__ = 't_user'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(20), unique=True)
    
    
    # 地址表  多
    class Address(db.Model):
        __tablename__ = 't_address'
        id = db.Column(db.Integer, primary_key=True)
        detail = db.Column(db.String(20))
        user_id = db.Column(db.Integer)  #  逻辑外键
    
    
    @app.route('/')
    def index():
        # 添加数据
        user1 = User(name='zs')
        db.session.add(user1)
        db.session.flush()  # 主动发送sql
        # 关联数据
        adr1 = Address(detail='上海中心', user_id=user1.id)
        adr2 = Address(detail='陆家嘴1号', user_id=user1.id)
        db.session.add_all([adr1, adr2])
        db.session.commit()
         
        # 关联查询
        user1 = User.query.filter(User.name=='zs').first()
        adrs = Address.query.filter(Address.user_id==user1.id).all()
        for adr in adrs:
            print(adr.detail)
        
        return 'index'
    
    if __name__ == '__main__'
        db.drop_all()
        db.create_all()
        app.run(debug=True, host='0.0.0.0')
    
    • 关系属性(代码中配置部分这里偷懒不写)
    # 用户表  一
    class User(db.Model):
        __tablename__ = 't_user'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(20), unique=True)
        # 2. 定义关系属性  relationship('关联的表对应的类名')
        addresses = db.relationship('Address')
    
    # 地址表  多
    class Address(db.Model)
        __tablename__ = 't_address'
        id = db.Column(db.Integer, primary_key=True)
        detail = db.Column(db.String(20)
        # 1. 设置外键参数
        user_id = db.Column(db.Integer, db.ForeignKey('t_user')  # 逻辑外键, 测试用
       
    @app.route('/')
    def index():
        # 添加数据
        user1 = User(name='zs')
        db.session.add(user1)
        db.session.flush()  # 主动发sql
        # 关联数据
        adr1 = Address(detail='上海中心', user_id=user1.id)
        adr2 = Address(detail='陆家嘴1号', user_id=user.id)
        db.session.add_all([adr1, adr2])
        db.session.commit()
    
        # 使用关系属性来关联查询 1> 定义外键参数 2> 定义关系属性  本质还是通过外键查询
        user1 = User.query.filter(User.name == 'zs').first()
        # adrs = Address.query.filter(Address.user_id==user1.id).all()
        for adr in user1.addresses:
            print(adr.detail)
    
        return 'index'
    
    
    if __name__ == '__main__':
        db.drop_all()
        db.create_all()
        app.run(debug=True, host='0.0.0.0')
    
    • 反向引用
    # 用户表 一
    class User(db.Model):
        __tablename__ = 't_user'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(20), unique=True)
        # 关系属性  可以设置backref='反向属性名' 来代替反向关系属性
        addresses = db.relationship('Address', backref='user')
    
    
    # 地址表 多
    class Address(db.Model):
        __tablename__ = 't_address'
        id = db.Column(db.Integer, primary_key=True)
        detail = db.Column(db.String(20))
        user_id = db.Column(db.Integer, db.ForeignKey('t_user.id'))  # 逻辑外键
        # user = db.relationship('User')  # 反向关系属性
    
    
    @app.route('/')
    def index():
        # 添加数据
        user1 = User(name='zs')
        db.session.add(user1)
        db.session.flush()
        # 关联数据
        adr1 = Address(detail='中关村1号', user_id=user1.id)
        adr2 = Address(detail='陆家嘴1号', user_id=user1.id)
        db.session.add_all([adr1, adr2])
        db.session.commit()
    
        # 使用关系属性来关联查询
        # user1= User.query.filter(User.name=='zs').first()
    
        # for adr in user1.addresses:
        #     print(adr.detail)
    
        # 反向查询
        print(adr1.user.name)
        # print(user1)
        # print(user1.addresses)
        # print(type(user1))
        return 'index'
    
    
    if __name__ == '__main__':
        db.drop_all()
        db.create_all()
        app.run(debug=True, host='0.0.0.0')
    
    • 动态查询
    # 用户表 一
    class User(db.Model):
        __tablename__ = 't_user'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(20), unique=True)
        # 关系属性  设置lazy='dynamic'开启动态查询, 关系属性不再返回列表, 而是返回AppenderBaseQuery对象
        # AppenderBaseQuery对象: 1> 类似BaseQuery, 可以续接查询条件  2> 保留了列表的特性 支持遍历和索引取值
        addresses = db.relationship('Address', backref='user', lazy='dynamic')
    
    
    # 地址表 多
    class Address(db.Model):
        __tablename__ = 't_address'
        id = db.Column(db.Integer, primary_key=True)
        detail = db.Column(db.String(20))
        user_id = db.Column(db.Integer, db.ForeignKey('t_user.id'))  # 逻辑外键
    
    
    @app.route('/')
    def index():
        # 添加数据
        user1 = User(name='zs')
        db.session.add(user1)
        db.session.flush()
        # 关联数据
        adr1 = Address(detail='中关村1号', user_id=user1.id)
        adr2 = Address(detail='陆家嘴1号', user_id=user1.id)
        db.session.add_all([adr1, adr2])
        db.session.commit()
    
        # 使用关系属性来关联查询
        user1 = User.query.filter(User.name == 'zs').first()
        # ret = user1.addresses.filter(Address.detail.startswith('中关村')).all()
    
        # 通过遍历索引取值  __iter__
        # for address in user1.addresses:
        #     print(address)
        adr = user1.addresses[0]
        print(adr.detail)
    
        return 'index'
    
    
    if __name__ == '__main__':
        db.drop_all()
        db.create_all()
        app.run(debug=True, host='0.0.0.0')
    
    • 优化关联查询
    from sqlalchemy.orm import Load
    
    
    # 用户表 一
    class User(db.Model):
        __tablename__ = 't_user'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(20), unique=True)
        # 关系属性  设置lazy='dynamic'开启动态查询, 关系属性不再返回列表, 而是返回AppenderBaseQuery对象
        # AppenderBaseQuery对象: 1> 类似BaseQuery, 可以续接查询条件  2> 保留了列表的特性 支持遍历和索引取值
        addresses = db.relationship('Address', backref='user', lazy='dynamic')
    
    
    # 地址表 多
    class Address(db.Model):
        __tablename__ = 't_address'
        id = db.Column(db.Integer, primary_key=True)
        detail = db.Column(db.String(20))
        user_id = db.Column(db.Integer, db.ForeignKey('t_user.id'))  # 逻辑外键
    
    
    @app.route('/')
    def index():
        # 添加数据
        user1 = User(name='zs')
        db.session.add(user1)
        db.session.flush()
        # 关联查询
        adr1 = Address(detail='中关村1号', user_id=user1.id)
        adr2 = Address(detail='陆家嘴1号', user_id=user1.id)
        db.session.add_all([adr1, adr2])
        db.session.commit()
    
        # sqlalchemy默认就采用懒查询机制, 不使用关系属性, 不会主动查询关联数据, 优点: 减少不必要的查询, 优化性能
        # user1 = User.query.filter(User.name == 'zs').first()
        # 使用关系属性时, 才查询关联数据 (一共查询了两次), 第一次查询用户数据, 第二次是查用户关联的地址数据
        # print(user1.addresses)
    
        # 需求: 直接查询出关联数据 join关联查询 select t_user.name, t_adr.detail from t_user join t_adr on t_adr.user_id=t_user.id where t_user.name='zs'
        # 一次join的性能一般会比分成两次查询的效率高, 网络IO会影响性能
        # ret = db.session.query(User, Address).join(Address, Address.user_id == User.id).filter(User.name == 'zs').all()
    
        # 只查询用户和地址的指定字段
        ret = db.session.query(User, Address).options(Load(User).load_only(User.id), Load(Address).load_only(Address.detail)).join(Address, Address.user_id==User.id).filter(User.name=='zs').all()
        print(ret)
        return 'index'
    
    
    if __name__ == '__main__':
        db.drop_all()
        db.create_all()
        app.run(debug=True, host='0.0.0.0')
    
    • 一对一关系
    # 用户表 一
    class UserAccount(db.Model):
        __tablename__ = 't_useraccount'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(40), unique=True)
        # 关系属性  关系属性默认是基于一对多关系生成的, 返回的是列表
        # 如果是对一关系, 可以设置uselist=False, 则直接返回模型对象
        info = db.relationship('UserInfo', uselist=False)
    
    
    # 用户信息表  一
    class UserInfo(db.Model):
        __tablename__ = 't_userinfo'
        id = db.Column(db.Integer, primary_key=True)
        age = db.Column(db.Integer)
        height = db.Column(db.Float)
        user_id = db.Column(db.Integer, db.ForeignKey('t_useraccount.id'))  # 外键
    
    
    @app.route('/')
    def index():
        user1 = UserAccount(name='zs')
        db.session.add(user1)
        db.session.flush()
        # 关联数据
        info1 = UserInfo(age=20, height=1.9, user_id=user1.id)
        db.session.add(info1)
        db.session.commit()
    
        # 关联查询
        user_obj = UserAccount.query.filter(UserAccount.name == 'zs')
        info = user1.info
        print(info)  # 关系属性中添加uselist=False后, 返回的是对象
        print(info.age, info.height)
    
        return 'index'
    
    
    if __name__ == '__main__':
        db.drop_all()
        db.create_all()
        app.run(debug=True, host='0.0.0.0')
    
    • 同表的多个关系
    # 用户表  用户表和地址表有两个关联, 一个用户有一个家庭地址, 还有一个工作地址
    class User(db.Model):
        __tablename__ = 't_user'
        id = db.Column(db.Integer, primary_key=True)  # 主键
        name = db.Column(db.String(40), unique=True)
        work_adr_id = db.Column(db.Integer, db.ForeignKey('t_address.id'))
        home_adr_id = db.Column(db.Integer, db.ForeignKey('t_address.id'))
    
        # 关系属性  两个表如果有多个关系, 需要通过primaryjoin来进行区分
        work_adr = db.relationship('Address', primaryjoin='User.work_adr_id==Address.id')
        home_adr = db.relationship('Address', primaryjoin='User.home_adr_id==Address.id')
    
    
    # 地址表
    class Address(db.Model):
        __tablename__ = 't_address'
        id = db.Column(db.Integer, primary_key=True)  # 主键
        detail = db.Column(db.String(40), unique=True)
    
    
    @app.route('/')
    def index():
        adr1 = Address(detail='陆家嘴1号院')
        adr2 = Address(detail='万科翡翠滨江')
        db.session.add_all([adr1, adr2])
        db.session.flush()
        # 关联数据
        user1 = User(name='zs', work_adr_id=adr1.id, home_adr_id=adr2.id)
        db.session.add(user1)
        db.session.commit()
    
        # 关联查询
        print(user1.work_adr.detail, user1.home_adr.detail)
    
        return 'index'
    
    
    if __name__ == '__main__':
        db.drop_all()
        db.create_all()
        app.run(debug=True, host='0.0.0.0')
    
    • 多对多关系
    # 定义关系表, 记录关联
    class StudentCourse(db.Model):
        __tablename__ = 't_stu_cur'
        id = db.Column(db.Integer, primary_key=True)
        stu_id = db.Column(db.Integer, db.ForeignKey('t_student.id'))  # 记录学生的主键
        cur_id = db.Column(db.Integer, db.ForeignKey('t_course.id'))  # 记录课程的主键
    
    
    # 学生表 多
    class Student(db.Model):
        __tablename__ = 't_student'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(40), unique=True)
        # 定义关系属性, 多对多关系属性也需要设置primarilyjoin来指定关联的字段
        courses = db.relationship('StudentCourse', primaryjoin='StudentCourse.stu_id==Student.id')
    
    
    # 课程表  多
    class Course(db.Model):
        __tablename__ = 't_course'
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(40), unique=True)
    
    
    @app.route('/')
    def index():
        stu1 = Student(name='zs')
        cur1 = Course(name='python')
        cur2 = Course(name='java')
        db.session.add_all([stu1, cur1, cur2])
        db.session.flush()
        # 关联数据
        sc1 = StudentCourse(stu_id=stu1.id, cur_id=cur1.id)
        sc2 = StudentCourse(stu_id=stu1.id, cur_id=cur2.id)
        db.session.add_all([sc1, sc2])
        db.session.commit()
    
        # 关联查询
        # print(stu1.courses)
        # 一次join(优化)
        ret = db.session.query(StudentCourse, Student, Course).join(Student, Student.id == StudentCourse.stu_id).join(
            Course, Course.id == StudentCourse.cur_id).filter(Student.name == 'zs').all()
        print(ret)
    
        return 'index'
    
    
    if __name__ == '__main__':
        db.drop_all()
        db.create_all()
        app.run(debug=True, host='0.0.0.0')
    
    • 自关联一对多关系
    # 评论表  一条评论可以有多条子评论
    class Comment(db.Model):
        __tablename__ = 'info_comment'
        id = db.Column(db.Integer, primary_key=True)  # 评论编号
        content = db.Column(db.Text, nullable=False)  # 评论内容
        parent_id = db.Column(db.Integer, db.ForeignKey('info_comment.id'))  # 父评论id
        # 定义关系属性
        # 为了区分自关联一对多关系属性, 需要给多对一的关系属性设置remote_side=[主键]参数
        children = db.relationship('Comment')  # 取所有的子评论, 一对多关系
        parent = db.relationship('Comment', remote_side=[id])  # 取父评论  多对一关系
    
        # 反向引用版
        # children = db.relationship('Comment', db.backref('parent', remote_side=[id]))
    
    
    @app.route('/')
    def index():
        comment1 = Comment()
        comment1.content = '哈哈'
        comment2 = Comment()
        comment2.content = '子评论'
        # 关联数据
        comment2.parent = comment1
        db.session.add_all([comment1, comment2])
        db.session.commit()
    
        # 关联查询
        # print(comment1.children)  # 获取所有的子评论
        # print(comment2.parent)  # 获取父评论
        return 'index'
    
    
    if __name__ == '__main__':
        db.drop_all()
        db.create_all()
        app.run(debug=True, host='0.0.0.0')
    
    • 事务
    # 用户表
    class User(db.Model):
        __tablename__ = 't_user'
        id = db.Column(db.Integer, primary_key=True)  # 主键
        name = db.Column(db.String(20), unique=True)
        age = db.Column(db.Integer)
    
    
    @app.route('/')
    def index():
        """
        1. sqlalchemy会自动创建隐式事务, 将sql操作添加到事务中
        2. 事务提交失败, 会自动回滚
        3. 也可以手动回滚, 也可以设置mysql的锁
        """
        # 需求: 当数据量<10时, 才添加数据, 否则回滚
        # 添加数据
        user1 = User(name='zs', age=20)
        user2 = User(name='ls', age=20)
        user3 = User(name='ws', age=20)
        user4 = User(name='ys', age=20)
        db.session.add_all([user1, user2, user3, user4])
        """
        会先插入数据, 然后查询, 将查询结果进行比较,如果不符合条件则会滚, 符合则commit
        """
        # with_for_update()  设置排它锁  with_for_update(read=True) 设置共享锁
        if User.query.with_for_update().count() < 3:
    
            ret = User.query.count()
            print(ret)
    
            db.session.commit()
        else:
    
            db.session.rollback()  # 不满足条件, 主动回滚
    
        return 'index'
    
    
    if __name__ == '__main__':
        db.drop_all()
        db.create_all()
        app.run(debug=True, host='0.0.0.0')

     

    展开全文
  • Flask-Sqlalchemy—聚合、高级查询

    千次阅读 2019-07-17 15:39:25
    Flask-Sqlalchemy—聚合、高级查询 文章目录Flask-Sqlalchemy—聚合、高级查询聚合高级查询sqlalchemy写法flask-sqlalchemy写法 本文中语法均为本人结合flask-sqlalchemysqlalchemy文档总结,如果有更简洁的语法...
  • 关联查询 sys_user_list = SysPermission.query.join(OrgRolePermission, OrgRolePermission.sys_permission_id == SysPermission.id).filter(OrgRolePermission.role_name == user_cache["role"]).all() ...
  • order_by,filter的语法。 用久了才会熟悉。 Session = sessionmaker(bind=engine) session = Session() ...print(session.query(Cookie.cookie_name, Cookie.quantity).first()) ...for cookie in session.query...
  • Flask-sqlalchemy高级机制之刷新数据flush Session机制 被设计为数据操作的执行者, 会先将操作产生的数据保存到内存中 在执行 flush刷新操作后, 数据操作才会同步到数据库(内存)中 有两种情况下会 隐式(自动)...
  • sqlalchemy中的排序,就是在查询的后面加上.order_by(依据排序的字段) ,默认排序是升序。倒序就是在排序字段的后面加上 .desc() 或者在排序字段的前面加上负号-。如果不想在查询的时候排序,可以在模型里面设置, ...
  • # 高级查询操作,厉害了哦 #老规矩 from my_create_table import User,engine from sqlalchemy.orm import sessionmaker Session = sessionmaker(engine) db_session = Session() # 查询数据表操作 # ...
  • 它允许我们使用原始的 SQL 执行查询,同时也提供了高级的方法来查询和更新数据库。 本文仅简要介绍使用原生 SQL 执行查询部分。 使用 SQLAlchemy 查询 MySQL 的数据 首先需要安装 sqlalchemy 库和 pymysql 库; ...
  • # 切片 articles = session.query(Article).all()[2:5] for article in articles: print(article) 二、高级查询 测试数据 2.1、group_by 根据某个字段进行分组。比如想要根据性别进行分组,来统计每个分组...

空空如也

空空如也

1 2 3 4 5 6
收藏数 115
精华内容 46
关键字:

sqlalchemy高级查询