在 Snowflake 中更新混合和嵌套对象
我有一个带有一个变体列的雪花表(raw). 此表中的每个行都是复杂的(词典和数组)和嵌套(多个层次结构). 我想做的是能够在某些数组中更新特定项目. 使用示例更容易理解它,因此将其视为表中的一行: { "id": "1234" "x_id": [ { "y_id": "790437306684007491", "y_state": "some_state" } ], "comments": { "1": [ { "comment_id": "bb288743-3b73-4423-b76b-f26b8c37f7d4", "comment_timestamp": "2021-02-10 14:53:25.667564", "comment_text": "Hey" }, { "comment_id": "7
18 2024-04-22
编程技术问答社区
NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:snowflake
我已经安装了所有必要的软件包: pip install --upgrade snowflake-sqlalchemy 我正在从雪花文档中运行此测试代码: from sqlalchemy import create_engine engine = create_engine( 'snowflake://{user}:{password}@{account}/'.format( user='', password='', account='', ) ) try: connection = engine.connect() results = connection.execute('select current_version()').fetchone() print(resul
20 2024-04-22
编程技术问答社区
利用负指示加速 sqlalchemy 或 m 的动态关系切分
我具有以下SQLA模型和关系.我每秒记录每个通道的测量值,因此DB中有很多测量值. class Channel( Model ) : __tablename__ = 'channel' id = Column( Integer, primary_key=True ) #! --- Relationships --- measurements = relationship( 'Measurement', back_populates='channel', lazy='dynamic' ) class Measurement( Model ) : __tablename__ = 'measurement' id = Column( Integer, primary_key=True ) timestamp = Column( DateTime, nul
32 2024-04-22
编程技术问答社区
在延迟的 Celery 任务中处理 SQLAlchemy 会话
我正在使用SQLalchemy使用关系数据库.我想产生一份使用芹菜来处理数据库的工作.有一个代码: from sqlalchemy.orm.session import Session from celery.task import task from myapp.user import User @task def job(user): # job... session = Session.object_session(user) with user.begin(): user.value = result_value def ordinary_web_request_handler(uid): assert isinstance(session, Session) user = session.query(User).get(int(uid)) # deals with user... job.dela
10 2024-04-22
编程技术问答社区
FastAPI 中的会话
我想用fastapi和jinja作为我的模板建造购物车 因此,我需要在会话中为每个匿名用户保存数据. Django和Flask具有一个内置的会话功能,我们可以轻松地做到这一点. 其中一种解决方案可以使用SQLalchemy会话,但是SQLalchemy会话不支持匿名用户,我们必须为每个会话分开创建令牌.然后,我们应该使用存储的令牌保存每个数据. 还有其他类似于Django和Flask的内置功能的方法吗? 解决方案 首先,我们应该在购物车应用程序中创建购物车文件,然后使用所需功能构建购物车类. secret_key='cart' class Cart(object): def __init__(self, request,db): self.session = request.session cart = self.session.get(secret_key) if not cart:
34 2024-04-22
编程技术问答社区
使用 flask alchemy 在 SQLlite 中无法设置外键约束
我在SQLalchemy中定义的表格架构下面,其中User_id是外键参考用户的USER_ID表 class Manufacturer(Model): __tablename__='Manufacturer' Manufacturer_id=Column(Integer,primary_key=True,autoincrement=True) name=Column(String(length=100),nullable=False) description=Column(String(length=1000),nullable=False) user_id=Column(Integer,ForeignKey(User.user_id),nullable=False) 但是,当在sqllite db外键创建表时未添加到表,即使用户表不具有用户表的值 我搜索了此问题,并找到了我们需要设置的这个问题 PRAGMA foreign_keys = ON; 但是我无法弄清楚如何使
8 2024-04-22
编程技术问答社区
为什么 Sqlalchemy Session.close 不能记录 "回滚"?
# I've set echo=True when doing create_engine, so I can see all the sql stmt # DBSession is ScopeSession(thread_local) and autocommit is False session = DBSession() session.add(somemodel) # try: session.flush() raise Exception() session.commit() except SQLAlchemyError as e: session.rollback() finally: session.close() 对sqlalchemy文档的影响: The close() method issues a expunge_all(), and releases any transactional/connection r
18 2024-04-22
编程技术问答社区
SQLAlchemy插入时返回多个值的主键
我正在尝试将sqlalchemy的插入和执行方法与词典列表作为值,并恢复插入的ID.数据库是Postgres,它应该支持返回. 代码看起来或多或少类似: table = MyThing.__table__ insert_stmt = table.insert().values({"name": bindparam("name_val")}).returning(table.c.id) values = [{"name_val": "a"}, {"name_val": "b"}] result = session.execute(insert_stmt, values).fetchall() (修剪为可读性和一些混淆) 当值列表大于一个(如上示例中)时,我会得到: sqlalchemy.exc.programmingerror :( psycopg2.programmingerror)没有结果 但是,当列表只有一个值时,新ID被正确返回. 我知道我可
6 2024-04-05
编程技术问答社区
使用 bulk_insert_mappings
我正在尝试大量插入该表格的大量字典: results = [{'attribute': u'SEX', 'value_d': 0.0, 'value_s': u'M', 'sid': 1L}, {'attribute': u'SEX', 'value_d': 0.0, 'value_s': u'M', 'sid': 2L}, {'attribute': u'SEX', 'value_d': 0.0, 'value_s': u'M', 'sid': 3L}, {'attribute': u'SEX', 'value_d': 0.0, 'value_s': u'M', 'sid': 4L}, ... ] 阅读了有关"执行人"和bulk_insert_mappings后,我决定以后尝试,因为它看起来更简单. 这是执行此操作的3行代码,使用幼稚的假设,即由于我有一个字典列表,因此
4 2024-04-05
编程技术问答社区
SQLAlchemy原始sql与表达式语言语句的对比
通过SQLA-Expression-Language语句在MySQL-DB中插入多行时,F.E. Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}]) 与执行同一任务的" RAW" SQL语句相比,这是非常慢的,即. engine.execute("insert into foo (bar) values (1),(2),(3)") 这样做的原因是什么? SQLA可以生成单个批量插入语句并因此执行多个插入物吗?由于ORM的速度限制,我需要一种快速的方法来一次添加数千行,但是SQLA-Expression-Language-version太慢了.那么,我需要一个人写下原始的SQL吗?该文档对此不太清楚. 我使用ORM插入物进行了速度测试,带有预签名的PK和SQLA散装插件的ORM(请参阅 SQLA散装插入速度"( https://gist.github.com/3341940 ):
6 2024-04-05
编程技术问答社区
如何使用SQLAlchemy将查询表达式中的SQL文件转储到DBMS中批量插入?
当我解释问题时,请忍受我,我如何尝试解决问题, 我关于如何改进它的问题是 我有一个100,000线CSV文件,来自离线批处理作业,我需要 将其插入数据库中作为适当的模型.通常,如果这是一个相当简单的负载,则可以通过仅弹出CSV文件以适合模式来琐碎地加载.但是,我必须进行一些需要查询的外部处理,并且使用SQLalchemy来生成我想要的数据更加方便. 我想要的数据是3个模型,代表3个预审阅表 在数据库中,每个后续模型都取决于上一个模型. 例如: Model C --> Foreign Key --> Model B --> Foreign Key --> Model A 因此,模型必须以A,B和C的顺序插入. 采用生产者/消费者的方法: - instantiate a multiprocessing.Process which contains a threadpool of 50 persister threads that have a threadlo
4 2024-04-05
编程技术问答社区
python sqlalchemy在一个元组数据结构中插入多行数据
我一直在研究如何插入一个约500个元组(行)列表,该列表将有7个元素(列)插入数据库中.我已经阅读了有关Stackoverflow以及其他论坛的各种帖子.我找到了以下内容,建议使用"执行人()"方法,但对我来说并不清楚.我需要掩盖从元组到字典的对象吗?问题是我没有名称:数据结构的值类型. 如何使用SQLalchemy将SQL文件从查询表达式转储到批量插入到DBMS中? Here is an example: engine = create_engine('sqlite:///:memory:', echo=True) metadata = MetaData() hockey= Table('hockey', metadata, Column('team', String(16), primary_key=True), Column('jersey_colour', String(16)), Column('stadium', String(32))
14 2024-04-05
编程技术问答社区
sqlalchemy批量插入比构建原始SQL要慢
我正在通过INSERT INTO mytable (col1, col2, col3) VALUES (1,2,3), (4,5,6) ..... --- up to 1000 of these 和此RAW SQL的插入物类似于: MySession.execute(''' insert into MyTable (e, l, a) values {} '''.format(",".join(my_insert_str))) 使用这种方法,我在10-11秒内提高了50倍+的性能+次. 这是使用Build-In-In Lib的方法的代码. class MyClass(Base): __tablename__ = "MyTable" e = Column(String(256), primary_key=True) l = Column(String(6)) a = Column(String(20), primary_key=Tr
6 2024-04-05
编程技术问答社区
Sqlalchemy在MySQL中的批量更新工作非常缓慢
我正在使用SQLAlchemy 1.0.0,并且想制作一些UPDATE ONLY(如果匹配的主键更新,则无需执行任何操作). . 我进行了一些实验,发现散装更新看起来比散装插入或散装upsert 慢得多. 您能帮助我指出为什么它如此慢,或者有其他方法/想法来制作BULK UPDATE (not BULK UPSERT) with SQLAlchemy? 下面是mysql中的表格: CREATE TABLE `test` ( `id` int(11) unsigned NOT NULL, `value` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; 和测试代码: from sqlalchemy import create_engine, text import time
8 2024-04-05
编程技术问答社区
SQLAlchemy:防止自动关闭
我需要通过sqlalchemy插入/更新散装行.并插入行. 我尝试使用session.execute: >>> posts = db.session.execute(Post.__table__.insert(), [{'title': 'dfghdfg', 'content': 'sdfgsdf', 'topic': topic}]*2) >>> posts.fetchall() ResourceClosedError Traceback (most recent call last) 和引擎: In [17]: conn = db.engine.connect() In [18]: result = conn.execute(Post.__table__.insert(), [{'title': 'title', 'content': 'content', 'topic': topic}]*2) In
10 2024-04-05
编程技术问答社区
内存效率高的大数据集流向S3
我正在尝试使用SQL炼金术在S3大型数据集(大于RAM)上复制. 我的约束是: 我需要使用sqlalchemy 我需要将记忆压力保持最低 我不想将本地档案系统用作中介步骤将数据发送到S3 我只想以记忆有效的方式从DB到S3 的管道数据. 我可以通过数据集(使用以下逻辑)进行正常工作,但是在较大的数据集中,我遇到了一个缓冲区问题. 我解决的第一个问题是执行查询通常会在内存中缓冲结果.我使用fetchmany()方法. engine = sqlalchemy.create_engine(db_url) engine.execution_options(stream_results=True) results=engine.execute('SELECT * FROM tableX;') while True: chunk = result.fetchmany(10000) if not chunk: break 在另一侧,我有一个使用F
8 2024-04-04
编程技术问答社区
用Boto3进行IAM认证的SQLAlchemy可刷新凭证
我使用boto3生成的auth-token与Sqlalchemy连接到Amazon RDS: self.client = boto3.client("rds", region_name="eu-central-1") self.token = self.client.generate_db_auth_token( self.endpoint, self.port, self.user ) connection_string = ( f"postgresql://{urlquote(self.user)}:" f"{urlquote(self.token)}@{self.endpoint}:{self.port}/dummy" ) self.engine = db.create_engine(connection_string) 问题在于,提供的令牌仅在15分钟内有效(我想使用临时凭据!),我现在没有如何告诉SQLalchemy在旧的验证量到期时自动创建一个
22 2024-04-03
编程技术问答社区
SQLAlchemy: 使用`and`和`or`时出现意外结果
我有一个声明的基类News: class News(Base): __tablename__ = "news" id = Column(Integer, primary_key = True) title = Column(String) author = Column(String) url = Column(String) comments = Column(Integer) points = Column(Integer) label = Column(String) 我也有一个函数f(title),它将获得一个字符串并返回3个字符串变体之一:'good',''也许'或"从不". 我尝试获得过滤的行: rows = s.query(News).filter(News.label == None and f(News.title) == 'good').all() 但是程序失败,引起了此错误:
10 2024-04-01
编程技术问答社区
为存储在SQLAlchemy LargeBinary列中的图像提供服务
我想上传文件并将其存储在数据库中.我创建了一个大型列. logo = db.Column(db.LargeBinary) 我读取上传的文件并将其存储在数据库中. files = request.files.getlist('file') if files: event.logo = files[0].file.read() 这是将图像作为二进制数据库中存储在数据库中的正确方法吗?如何再次将二进制数据转换为图像以显示它? 解决方案 如果您绝对需要将图像存储在数据库中,那么是的,这是正确的.通常,文件存储在文件系统中,该路径存储在数据库中.这是更好的解决方案,因为Web服务器通常具有从文件系统中提供文件的有效方法,而不是动态发送大量数据的应用程序. 要使用图像,写一个视图,以获取图像数据并将其作为响应发送. @app.route('/event//logo') def event_logo(id): event = E
18 2024-04-01
编程技术问答社区
用SQLAlchemy执行查询时避免参数绑定
我正在使用sqlalchemy在teradata上执行查询.我执行的查询之一是DDL语句替换存储过程: REPLACE PROCEDURE DEV_MIGRATION_TOOL.UNIT_TEST_NEW_STORED_PROCEDURE() UNIT_TEST_NEW_STORED_PROCEDURE: BEGIN DECLARE V_VAR VARCHAR(50); SELECT 'Hello World!' INTO :V_VAR; END; 此SQL语句分配给变量query,并通过SQLalchemy执行会话执行方法: def execute_sql_statement(self, query): """Generic method to execute a SQL statement on target environment.""" self.target_environment.db_session.execute(q
10 2024-03-31
编程技术问答社区