celery 'Worker-n'pid:xxxx在我导入hmmlearn时以'exitcode 1'退出了。
在我的tasks.py文件中,当我导入hmmlearn时, from hmmlearn import hmm 开始我的芹菜工人,我会收到以下错误 [2017-06-14 09:18:27,638: INFO/MainProcess] Received task: sm.tasks.mytask[4e46806e-6f0f-420f-baac-c727c2a382d4] [2017-06-14 09:18:27,716: ERROR/MainProcess] Process 'Worker-4' pid:5264 exited with 'exitcode 1' [2017-06-14 09:18:29,857: ERROR/MainProcess] Process 'Worker-7' pid:3172 exited with 'exitcode 1' [2017-06-14 09:18:29,857: ERROR/MainProcess] Process 'Wor
0 2024-01-26
编程技术问答社区
celery consume send_task response
在django应用程序中,我需要调用一个外部兔子,在Windows服务器上运行并使用其中的某些应用程序,Django应用程序在Linux Server上运行. 我目前可以使用芹菜send_task: 将任务添加到队列中 app.send_task('tasks', kwargs=self.get_input(), queue=Queue('queue_async', durable=False)) 我的设置看起来像: CELERY_BROKER_URL = CELERY_CONFIG['broker_url'] BROKER_TRANSPORT_OPTIONS = {"max_retries": 3, "interval_start": 0, "interval_step": 0.2, "interval_max": 0.5} CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER =
0 2024-01-26
编程技术问答社区
对方重置了AMQP连接,但是celery连接了
我有一个瓶应用程序,将芹菜与兔子作为经纪人. 我已经按照此答案答案开始. 我有两台机器. 机器A(RabbitMQ运行)发送了芹菜在机器上消耗的任务b. 我的经纪URL和后端结果URL相同:amqp://remote:***@12.345.678.999:5672/remote_host. 这两台机器上都有烧瓶应用程序的副本.兔子已配置,以便用户遥控器具有授予的所有权限".*.*.*".当兔子和芹菜之间的所有通信都可以正常运行,它们都在机器上的Localhost上运行. 我用celery worker -l info -A app.celery在机器B上启动芹菜,一切看起来都很好: -------------- celery@ip-XXX-XX-XX-XXX v4.0.0 (latentcall) ---- **** ----- --- * *** * -- Linux-4.4.0-45-generic-x86_64-with-Ubuntu
0 2024-01-26
编程技术问答社区
芹菜 为什么任务会停留在队列中
所以我正在将芹菜与兔子使用.我有一个静止的API,可以注册用户.我正在使用远程芹菜工人异步发送注册电子邮件,以便我的API可以返回快速响应. from .tasks import send_registration_email def register_user(user_data): # save user to the database etc send_registration_email.delay(user.id) return {'status': 'success'} 这很好.电子邮件是以一种非阻止异步方式发送的(如果失败很酷,可以重新发送).问题是当我查看RabbitMQ管理控制台时.我可以看到send_registration_email创建了一个随机队列.类似: 我可以看到该任务已成功执行.那么,为什么随机队列永远留在兔子中呢?这是任务有效载荷: {"status": "SUCCESS", "traceback":
8 2024-01-26
编程技术问答社区
Python Celery-lookup task by pid
也许是一个非常简单的问题 - 我经常看到在系统上运行的芹菜任务过程,当我使用celery.task.control.inspect()'s active()方法时找不到.通常,这个过程会运行几个小时,我担心这是某种僵尸.通常它也用很多内存. 有没有办法通过Linux PID查找任务?芹菜或AMPQ结果后端是否保存? 如果没有,还有其他方法可以弄清楚哪个任务是坐在饮食中的任务? ----更新: 当active()告诉我特定框上没有任务时,我该怎么办,但是盒子的内存已完全使用,HTOP表明这些工作池线程是使用它的,但是在同时使用0%CPU?如果事实证明这与我当前的Rackspace设置的某些怪癖有关,没有人可以回答,我仍然接受Loren的. 谢谢〜 解决方案 我将假设通过'任务'您的意思是"工人".否则这个问题就毫无意义. 在某些情况下,重要的是要了解芹菜工人池的过程层次结构.一个工人池是共享相同配置的一组工艺过程(或线程)(同一队列等的过程消息等
2 2024-01-26
编程技术问答社区
我如何在运行时创建celery队列,使发送到该队列的任务被工人接走?
我正在使用Django 1.4,芹菜3.0,兔子 要描述问题,我在系统中有许多内容网络,我想要一个以处理与这些网络相关的任务的队列. 但是,当系统实时时,即时创建内容,因此我需要即时创建队列并让现有工人开始拾取它们. 我已经尝试以以下方式调度任务(其中内容为django模型实例): queue_name = 'content.{}'.format(content.pk) # E.g. queue_name = content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba add_content.apply_async(args=[content], queue=queue_name) 这创建一个带有名称content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba的队列,创建一个带有名称content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba的新交易所和路由键content.
0 2024-01-26
编程技术问答社区
有没有人成功地使用芹菜与铁塔
我有一个基于塔的WebApp,我很想在一段时间内使用芹菜 +兔子.我看了看芹菜塔项目,但我还没有成功使用它. 我对芹菜的主要问题是:我在哪里放置芹菜Config.py文件,或者还有其他方法来指定芹菜选项. BROKER_HOST之类 基本上,我研究了2个选项:使用芹菜作为独立项目并使用芹菜塔,都没有成功. 事先感谢您的帮助. 解决方案 我目前正在这样做,尽管我一段时间以来没有更新芹菜.我认为我仍然处于2.0.0. 我所做的是在我的塔式应用程序中创建一个Celery_App目录. (因此,与数据,控制器等相同的目录) 在该目录中是我的CeleryConfig.py,Tasks.py和Pylons_tasks.py. pylons_tasks.py只是一个初始化塔塔应用程序的文件,因此我可以将塔型模型等加载到芹菜任务中.因此,它可以完成塔INIT然后导入任务.Py. 然后将芹菜夹设置为使用myApp.celery_app.pylons_tasks作为
0 2024-01-26
编程技术问答社区
Celery任务状态取决于CELERY_TASK_RESULT_EXPIRES
从我所看到的,任务状态完全取决于celery_task_result_expires的值设置 - 如果我在任务完成执行后在此间隔内检查任务状态,则状态返回: AsyncResult(task_id).state 是正确的.如果没有,该州将不会更新,并且将永远待定. 谁能解释我为什么会发生这种情况?这是功能还是错误? 为什么任务状态取决于结果的到期时间,即使我忽略了结果? (芹菜版本:3.0.23,结果后端:AMQP) 解决方案 状态和结果相同.结果后端最初用于存储返回值,然后将其扩展到存储任意状态.该术语结果不够,因为它意味着任务已经完成. ignore_result应该是ignore_state,但我们还没有机会重命名.我有一个计划清理此处使用的术语,但要往后兼容. .
14 2024-01-26
编程技术问答社区
Celery设计帮助:如何防止并发执行的任务
我是芹菜/AMQP的新手 我有多种类型的"每个用户"任务:例如Taska,Taskb,Taskc.这些"每个用户"任务中的每一个都为系统中一个特定用户读取/写数据.因此,在任何给定时间,我可能需要创建任务user1_taska,user1_taskb,user1_taskc,user2_taska,user2_taskb等.我需要确保,对于每个任务,每个任务 键入同时执行.我想要一个系统,没有工人可以在任何其他工人执行user1_taskb或user1_taskc的同时执行User1_taska,但是尽管User1_taska正在执行User1_taska,但不应从同时执行User2_taska,User3_taska等. > 我意识到可以使用某种外部锁定机制(例如,在DB中)来实现这一点,但我希望有一个更优雅的任务/队列/工人设计可以使用. 我想一个可能的解决方案是将队列作为用户存储桶实现,以便在启动工人时,有配置指定要创建多少存储桶,每个"桶工人"恰好符合一个桶.然后,"中
2 2024-01-26
编程技术问答社区
芹菜(Django)率限制
我正在使用芹菜处理多个数据挖掘任务.这些任务之一连接到远程服务,该服务最多允许每个用户(或换句话说,它 can )超过10个连接,但它在全球范围内,但它 每个个人工作都不能超过10个连接). i 思考 token bucket(速率限制)是我想要的,但我似乎找不到任何实现. 解决方案 经过大量研究,我发现芹菜没有明确提供限制此类并发实例的数量,此外,这样做通常被认为是不良实践. 更好的解决方案是在单个任务中同时下载,并使用redis或memcached存储和分发以进行其他任务进行处理. 其他解决方案 芹菜具有限制速率,并包含通用令牌桶实现. 设定任务的费率限制: /Tasks.html#task.rate_limit 或运行时: 最新/userguide/worker.html#rate-limits 令牌存储桶实现位于Kombu 其他解决方案 尽管可能是不良的做法,但您可以使用专用队列并限制工人,例如: # ./
2 2024-01-26
编程技术问答社区
RabbitMQ/Celery/Django内存泄漏?
我最近接管了我公司正在从事的项目的另一部分,并发现了我们的RabbitMQ/Celery设置中似乎是记忆泄漏的项目. 我们的系统具有2GB的内存,在任何给定时间都有大约1.8GB.我们有多个处理大量数据并将其添加到我们的数据库中的任务. 当这些任务运行时,它们会消耗大量内存,从而迅速将我们的可用内存坠落到16MB和300MB之间的任何地方.问题是,在完成这些任务后,内存不会返回. 我们正在使用: 兔子v2.7.1 AMQP 0-9-1/0-9/0-8(从 RABBITMQ startup_log) 芹菜2.4.6 django 1.3.1 Amqplib 1.0.2 django-celery 2.4.2 kombu 2.1.0 Python 2.6.6 Erlang 5.8 我们的服务器正在运行Debian 6.0.4. 我是此设置的新手,因此,如果您需要的其他信息可以帮助我确定此问题的来源,请让我知道. 所有任务都有返回值,所
4 2024-01-26
编程技术问答社区
Celery 和 RabbitMQ 作为 docker 容器运行。收到未注册的任务类型'...'。
我对Docker,Celery和Rabbitmq相对较新. 在我们的项目中,我们目前有以下设置: 1个物理主机,运行多个Docker容器: 1x兔子:3管理容器 # pull image from docker hub and install docker pull rabbitmq:3-management # run docker image docker run -d -e RABBITMQ_NODENAME=my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management 1x芹菜容器 # pull docker image from docker hub docker pull celery # run celery container docker run --link some-rabbit:rabbit --name some-celery -d cele
0 2024-01-26
编程技术问答社区
芹菜可以用,但用花就不行了
我已经安装了芹菜,rabitmq和花.我能够浏览到花港.我有以下简单的工人可以附加到芹菜上,并打电话给Python程序: # -*- coding: utf-8 -*- """ Created on Sat Dec 12 16:37:33 2015 @author: idf """ from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost//') @app.task def add(x, y): return x + y 此程序称其为 # -*- coding: utf-8 -*- """ Created on Sat Dec 12 16:40:16 2015 @author: idf """ from tasks import add add.delay(36, 5) 我这样开始芹菜: idf@DellInsp:~/Document
0 2024-01-26
编程技术问答社区
为什么CELERY_ROUTES同时有一个 "队列 "和一个 "routing_key"?
我对AMQP的理解是消息只有以下组件: 消息正文 路由键 交换 队列附在交换上.消息不知道队列.他们只是发布到交换,然后根据交换类型和路由键,将消息路由到一个或多个队列. 在芹菜中,推荐的路由任务的方法是通过CELERY_ROUTES设置.从文档中,CELERY_ROUTES是... 路由器列表或用于将任务路由排队的单个路由器. #消息路由 ,其中包括一个示例... 要将任务路由到feed_tasks队列,您可以在 CELERY_ROUTES设置: CELERY_ROUTES = { 'feeds.tasks.import_feed': { 'queue': 'feed_tasks', 'routing_key': 'feed.import', }, } 但是请等待一分钟 - 根据AMQP,消息仅带有路由键! "队列"在那里做什么? 此外,还有一个默认队列的概念.如果您调用未被CELE
0 2024-01-26
编程技术问答社区
我如何从 rabbitMQ 中永久地删除一个 celery 任务?
我在当前的芹菜设置上有大约10,000个计划的任务.我没有意识到计划的任务是什么,因此决定使用它们提前几个月发送后续电子邮件. 回头看,将来将任务安排超过1小时可能永远不是一个好主意内存. 我的问题是,如果我必须撤销任务,它不仅会删除它.该任务保留在内存中,但撤销队列现在包含任务的ID.当它要执行时,芹菜检查是否被撤销,如果是,它将在此时撤销. 但是,该任务仍将一直保持在记忆中,直到那时我会随时重新启动我的工人,撤销队列将被清除,因为我没有持续. 如何从芹菜工人中永久删除任务?我本质上只需要将认可的返回兔子送回兔子,所以兔子一劳永逸地将其清除,如果我重新开始芹菜,它就不会回来. 我已经查看了文档和源代码,并试图自己在外壳中进行操作,但我无法找出适当的位置,可以将任务纳入RabbitMQ,然后将其永远弹出. 解决方案 如果您仅使用一个队列或一个任务,这很容易: 来自文档: 答案:您可以使用芹菜清除命令清除所有配置的任务队列: $ ce
2 2024-01-26
编程技术问答社区
芹菜不释放内存
看来芹菜完成任务后不会释放内存.每次任务完成时,都会有5m-10m内存泄漏.因此,有成千上万的任务,很快它将消耗所有内存. BROKER_URL = 'amqp://user@localhost:5672/vhost' # CELERY_RESULT_BACKEND = 'amqp://user@localhost:5672/vhost' CELERY_IMPORTS = ( 'tasks.tasks', ) CELERY_IGNORE_RESULT = True CELERY_DISABLE_RATE_LIMITS = True # CELERY_ACKS_LATE = True CELERY_TASK_RESULT_EXPIRES = 3600 # maximum time for a task to execute CELERYD_TASK_TIME_LIMIT = 600 CELERY_DEFAULT_ROUTING_KEY = "default" CELERY
2 2024-01-26
编程技术问答社区
我如何设置Celery在运行我的任务之前调用一个自定义的初始化函数?
我有一个django项目,我正在尝试使用芹菜提交背景处理任务(> http://ask.github.com/celery/introduction.html ).芹菜与Django融为一体,我能够提交自定义任务并取得结果. 唯一的问题是我找不到在守护程序过程中执行自定义初始化的理智方法.在开始处理任务之前,我需要调用一个昂贵的功能,该功能加载了很多内存,并且每次都无法调用该功能. 以前有人遇到过这个问题吗?有什么想法在不修改芹菜源代码的情况下如何工作? 谢谢 解决方案 您可以编写自定义加载程序,也可以使用信号. 加载程序具有on_task_init方法,当即将执行任务时,该方法被称为 和on_worker_init由芹菜+芹菜主过程调用. 使用信号可能是最简单的,可用的信号为: 0.8.x: task_prerun(task_id, task, args, kwargs) 当工人即将执行任务时(或本地执行任务) 如果使用apply
8 2024-01-26
编程技术问答社区
我怎样才能从其他通道恢复未被确认的AMQP消息,而不是我自己的连接?
看来,我将RabbitMQ服务器运行的时间越长,未被认可的消息遇到的麻烦就越多.我很想要求他们.实际上,似乎有一个AMQP命令可以执行此操作,但它仅适用于您的连接使用的频道.我构建了一些Pika脚本以至少尝试一下,但是我要么缺少某些东西,要么不能这样做(使用Rabbitmqctl?) 怎么样 import pika credentials = pika.PlainCredentials('***', '***') parameters = pika.ConnectionParameters(host='localhost',port=5672,\ credentials=credentials, virtual_host='***') def handle_delivery(body): """Called when we receive a message from RabbitMQ""" print body def on_connected(con
0 2024-01-26
编程技术问答社区
在Elastic Beanstalk上启动SQS celery工作者
我试图在EB上启动芹菜工人,但会出现一个错误的解释. .ebextensions dir中的配置文件中的命令: 03_celery_worker: command: "celery worker --app=config --loglevel=info -E --workdir=/opt/python/current/app/my_project/" 列出的命令在我的本地计算机上正常工作(只需更改WorkDir参数). EB的错误: 活动执行失败,因为:/ept/python/run/venv/local/lib/lib/python3.6/site-packages/celery/platforms.pys.pys.pys.pys.pys.pys.pys.py.py.py.py.py.py:796:runtimewarning:您正在使用超级useprileileges:这是 绝对不建议! 和 启动新的HTTPS连接(1):eu-west-1
2 2024-01-26
编程技术问答社区
在ECS Fargate中操作芹菜工人
我正在使用AWS ECS从事一个项目.我想将芹菜用作分布式任务队列.芹菜工人可以成为EC2类型,但是由于实例处于空闲状态的大量时间,我认为AWS Fargate运行工作并立即辞职将是具有成本效益的. 您是否有有关如何在AWS云中有效使用芹菜工人的建议? 解决方案 Fargate发射类型要比EC2启动类型要花费的时间更长,因为AWS在您启动任务时为您做所有"主机",包括臭名昭著的ENI的慢速附件和可能是从Docker Repo下载图像.目前没有比赛,EC2发射类型每次都更快. 因此,这实际上取决于您希望工人做的工作类型.您可以期望出于上述原因,将需要一项新的Fargate任务几分钟才能进入运行状态.另一方面,EC2启动,因为ENI已经在您的主机上已经存在,并且图像已经下载(最多)或大多下载(可能最糟糕的是),将从待定转移到非常快的运行. 编辑:正如 @rocket04在下面的评论中指出的那样,AWS似乎改进了Fargate启动时间来扩展应用程序.哇! 使用E
4 2024-01-26
编程技术问答社区