用同一个Flask-SQLAlchemy模型使用多个POSTGRES数据库和模式[英] Using multiple POSTGRES databases and schemas with the same Flask-SQLAlchemy model

本文是小编为大家收集整理的关于用同一个Flask-SQLAlchemy模型使用多个POSTGRES数据库和模式的处理方法,想解了用同一个Flask-SQLAlchemy模型使用多个POSTGRES数据库和模式的问题怎么解决?用同一个Flask-SQLAlchemy模型使用多个POSTGRES数据库和模式问题的解决办法?那么可以参考本文帮助大家快速定位并解决问题。

问题描述

我将在这里非常具体,因为已经提出了类似的问题,但没有一个解决方案适用于这个问题.

我正在开发一个有四个 postgres 数据库的项目,但为了简单起见,我们假设有 2 个.即 A &乙

A,B代表两个地理位置,但是数据库中的表和架构是相同的.

示例模型:

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base

db = SQLAlchemy()
Base = declarative_base()

class FRARecord(Base):
    __tablename__ = 'tb_fra_credentials'

    recnr = Column(db.Integer, primary_key = True)
    fra_code = Column(db.Integer)
    fra_first_name = Column(db.String)

这个模型在两个数据库中都有复制,但是使用不同的模式,所以要让它在 A 中工作,我需要这样做:

__table_args__ = {'schema' : 'A_schema'}

我想使用一个单独的内容提供者,该内容提供者可以访问数据库,但具有相同的方法:

class ContentProvider():
    def __init__(self, database):
        self.database = database

    def get_fra_list():
        logging.debug("Fetching fra list")
        fra_list = db.session.query(FRARecord.fra_code)

两个问题是,我如何决定指向哪个 db,以及如何不为不同的模式复制模型代码(这是 postgres 特有的问题)

这是我迄今为止尝试过的:

1) 我为每个模型制作了单独的文件并继承了它们,所以:

class FRARecordA(FRARecord):
    __table_args__ = {'schema' : 'A_schema'}

这似乎不起作用,因为我收到错误:

"Can't place __table_args__ on an inherited class with no table."

意味着在 db.Model(在其父级中)已经声明后我无法设置该参数

2) 所以我尝试对多重继承做同样的事情,

class FRARecord():
    recnr = Column(db.Integer, primary_key = True)
    fra_code = Column(db.Integer)
    fra_first_name = Column(db.String)

class FRARecordA(Base, FRARecord):
    __tablename__ = 'tb_fra_credentials'
    __table_args__ = {'schema' : 'A_schema'}

但得到了可预测的错误:

"CompileError: Cannot compile Column object until its 'name' is assigned."

显然我无法将 Column 对象移动到 FRARecordA 模型,而不必为 B 重复它们(实际上有 4 个数据库和更多模型).

3) 最后,我正在考虑进行某种分片(这似乎是正确的方法),但我找不到如何进行此操作的示例.我的感觉是我只会使用这样的单个对象:

class FRARecord(Base):
    __tablename__ = 'tb_fra_credentials'

    @declared_attr
    def __table_args__(cls):
        #something where I go through the values in bind keys like
        for key, value in self.db.app.config['SQLALCHEMY_BINDS'].iteritems():
            # Return based on current session maybe? And then have different sessions in the content provider?

    recnr = Column(db.Integer, primary_key = True)
    fra_code = Column(db.Integer)
    fra_first_name = Column(db.String)

为了清楚起见,我访问不同数据库的意图如下:

app.config['SQLALCHEMY_DATABASE_URI']='postgresql://%(user)s:\
%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES_A

app.config['SQLALCHEMY_BINDS']={'B':'postgresql://%(user)s:%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES_B,
                                  'C':'postgresql://%(user)s:%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES_C,
                                  'D':'postgresql://%(user)s:%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES_D
                                 }

POSTGRES 字典包含连接数据的所有键

我假设对于继承的对象,我只需像这样连接到正确的对象(这样 sqlalchemy 查询会自动知道):

class FRARecordB(FRARecord):
    __bind_key__ = 'B'
    __table_args__ = {'schema' : 'B_schema'}

推荐答案

终于找到了解决办法.

基本上,我没有为每个数据库创建新类,我只是为每个数据库使用不同的数据库连接.

这种方法本身很常见,棘手的部分(我找不到示例)是处理架构差异.我最终这样做了:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

Session = sessionmaker()

class ContentProvider():

    db = None
    connection = None
    session = None

    def __init__(self, center):
        if center == A:
            self.db = create_engine('postgresql://%(user)s:%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES_A, echo=echo, pool_threadlocal=True)
            self.connection = self.db.connect()
            # It's not very clean, but this was the extra step. You could also set specific connection params if you have multiple schemas
            self.connection.execute('set search_path=A_schema')
        elif center == B:
            self.db = create_engine('postgresql://%(user)s:%(pw)s@%(host)s:%(port)s/%(db)s' % POSTGRES_B, echo=echo, pool_threadlocal=True)
            self.connection = self.db.connect()
            self.connection.execute('set search_path=B_schema')

    def get_fra_list(self):
        logging.debug("Fetching fra list")
        fra_list = self.session.query(FRARecord.fra_code)
        return fra_list

本文地址:https://www.itbaoku.cn/post/1763892.html