Pika BlockingConnection & RabbitMQ : 连接已关闭
我在集群负载中有2个兔子,由Azure内部负载平衡器平衡.客户端使用BlockingConnection连接到LB. 当客户交换消息时,一切正常.但是,当没有活动时,我的客户似乎已断开连接,并且无法再接收消息. 我想知道是否有解决这个问题的方法?我假设负载平衡器或兔子由于不活动而关闭了连接.我想让皮卡(Pika)触发一些心跳对兔子(Rabbitmq)(以便负载平衡器保持连接打开),但找不到任何好的解决方案.你能建议吗? 编辑1 Pika BlockingConnections似乎不支持心跳. heart beat beat beat beat block blocking blocking connection 谢谢. 解决方案 根据Pika doc URLParameters 指定的heart_interval(例如amqps://www-data:rabbit_pwd@rabbit1/web_messages?heartbeat_interval=30
34 2024-03-23
编程技术问答社区
在python中使用pika的SparkStreaming、RabbitMQ和MQTT
只是为了使事情变得棘手,我想从兔子队列中消耗消息.现在我知道兔子上有一个MQTT的插件. 但是,我似乎无法做一个示例工作,因为Spark消耗了Pika产生的消息. 例如,我在此处使用简单的wordcount.py程序( https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html )查看是否可以通过以下方式看到消息 producter : import sys import pika import json import future import pprofile def sendJson(json): connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(que
14 2024-02-29
编程技术问答社区
如果连接处于空闲状态/长时间不消费,停止channel.basic_consume。
我有一个用例 用户酶: 如果最后一个处理的时间大于阈值时间(例如:1小时).我希望消费者退出或有回调方法来决定我需要做什么?喜欢向用户发送通知. 有什么办法做到这一点吗? 解决方案 皮卡(Pika超时回调. 您可以在每个消息收据的末尾添加此回调,保留对此的引用,并在每个消息收据开始时将其删除. def close_connec(): # close here timer_id = None def on_message(chan, method, props, body): if timer_id is not None: chan.connection.remove_timeout(timer_id) # process message timer_id = chan.connection.add_timeout(3600, close_connec)
14 2024-01-26
编程技术问答社区
Pika SelectConnection 适配器的 close() 方法没有关闭连接
我有一个简单的AMQP/RABBITMQ的简单异步消费者,使用Pika库编写,并基于异步消费者示例来自皮卡文档.主要区别是我想在线程中运行我的矿山,我希望它正确关闭连接,然后在一定时间间隔后退出(即终止线程).这是我打开连接并设置超时的方法.我还打开一个频道,创建一个交换并绑定一个队列...所有工作正常. def connect(self): LOGGER.info('OPEN connection...') return pika.SelectConnection(self._parameters, self.on_connection_open, stop_ioloop_on_close=False) def on_connection_open(self, unused_connection): LOGGER.info('Connection opened') self.add_on_connection_close_callback() self._c
14 2024-01-26
编程技术问答社区
RabbitMQ断管错误或丢失消息
使用Pika库的BlockingConnection连接到RabbitMQ,发布消息时偶尔会出现错误: 致命的插座错误:错误(32,'断管') 这是一个非常简单的子过程,该子过程将一些信息从内存队列中获取,并将小的JSON消息发送到AMQP中.仅当系统几分钟没有发送任何消息时,该错误似乎才会出现. 设置: connection = pika.BlockingConnection(parameters) channel = self.connection.channel() channel.exchange_declare( exchange='xyz', exchange_type='fanout', passive=False, durable=True, auto_delete=False ) 顾问代码捕获任何连接错误并重新检索: def _enqueue(self, message_id, data):
42 2024-01-26
编程技术问答社区
有谁能告诉我,python 中的 pika 和 kombu 消息库有什么区别?
我想在应用程序中使用消息传递库与RabbitMQ进行交互.谁能解释一下皮卡(Pika)和kombu库之间的差异? 解决方案 kombu和pika是两个不同的python库,它们从根本上提供相同的目的:向/从消息经纪发表消息. kombu的抽象水平高于皮卡.皮卡仅支持AMQP 0.9.1协议,而KOMBU可以支持其他运输(例如Redis).更一般而言,Kombu比Pika更丰富.它支持重新连接策略,连接汇总,故障转移策略等.其中一些功能是必不可少的(如果选择在一个严肃的项目中使用皮卡,则必须重新实现或解决),其他一些也很不错.缺点:越复杂的是一个库,您的行为使您的行为感到惊讶,而推理和跟踪错误的难度就越大.皮卡的代码库相对较小且易于进入.另一方面,kombu是为芹菜而量身定制的这是一个巨大的项目.芹菜的文档相当不错,但相比之下,康布的文档却很差.感觉就像芹菜是要暴露的项目,而不是kombu. 在引擎盖下,当使用AMQP作为运输时,kombu使用 py-amqp 库 li
38 2024-01-26
编程技术问答社区
在用pika发布消息时,如何添加一个头的键:值对
我正在编写自动测试以测试消费者.到目前为止,我不需要在发布消息时包括标头,但现在我确实可以.而且似乎缺乏文档. 这是我的发布者: class RMQProducer(object): def __init__(self, host, exchange, routing_key): self.host = host self.exchange = exchange self.routing_key = routing_key def publish_message(self, message): connection = pika.BlockingConnection(pika.ConnectionParameters(self.host)) channel = connection.channel() message = json.dumps(message)
20 2024-01-26
编程技术问答社区
rabbitmq在一个队列上有多个消费者,但只有一个能得到消息。
我实施了多个消费者,他们正在从单个队列中获取消息,我正在使用类似于任何想法我如何防止所有消费者之间的竞争,因为只有一个消费者会收到消息,而另一个消费者将继续进行民意调查,直到另一个消息传来? 我尝试实现一种逻辑,在该逻辑中,一旦收到消息,我就会删除消息,但似乎其他一些队列在第一个队列获得ACK并将其删除之前设法收到了消息. 因此每个消费者收到消息. 预先感谢 解决方案 有什么想法我如何防止所有消费者之间的比赛,因为只有一个消费者会收到消息,而另一个消费者将继续进行民意调查,直到另一个消息来临? 您无法设置东西的方式. RabbitMQ将向消费者旋转消息,但只有一个消费者会从队列中收到消息.这是在兔子中的设计,当您在一个队列上有多个消费者时. 如果您需要所有消费者接收所有消息,则需要更改配置,以便每个消费者都有自己的队列.然后,您需要通过交易所发布您的消息,该交换将消息传递给所有消费者的所有队列. 最简单的方法是使用粉丝交换类型.
18 2024-01-26
编程技术问答社区
没有找到记录仪 "pika.adapters.blocking_connection "的处理程序。
类似的问题似乎都是基于使用自定义记录仪的,我很高兴只使用默认值/根本不使用.我的Pika Python应用程序运行和接收消息,但是几秒钟后用No handlers could be found for logger "pika.adapters.blocking_connection"崩溃,有任何想法吗? import pika credentials = pika.PlainCredentials('xxx_apphb.com', 'xxx') parameters = pika.ConnectionParameters('bunny.cloudamqp.com', 5672, 'xxx_apphb.com', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare('messages') def m
22 2024-01-26
编程技术问答社区
是否可以在 RabbitMQ 队列之间移动/合并消息?
我想知道可以将消息从一个队列移动到另一个队列. 例如: main-queue包含消息['cat-1','cat-2','cat-3','cat-4','dog-1','dog-2','cat-5'] dog-queue包含消息['dog-1, dog-2, dog-3, dog-4] 所以问题是(假设两个队列在同一集群,vhost)可以使用dog-queue使用rabbitmqctl? 所以最后我希望得到类似的东西: 理想情况下: main-queue:['cat-1','cat-2','cat-3','cat-4','dog-1','dog-2','cat-5', dog-3, dog-4] 但这也可以: main-queue:['cat-1','cat-2','cat-3','cat-4','dog-1','dog-2','cat-5', 'dog-1, dog-2, dog-3, dog-4] 解决方案 您/正在寻找的是"铲子"插
30 2024-01-26
编程技术问答社区
我怎样才能从其他通道恢复未被确认的AMQP消息,而不是我自己的连接?
看来,我将RabbitMQ服务器运行的时间越长,未被认可的消息遇到的麻烦就越多.我很想要求他们.实际上,似乎有一个AMQP命令可以执行此操作,但它仅适用于您的连接使用的频道.我构建了一些Pika脚本以至少尝试一下,但是我要么缺少某些东西,要么不能这样做(使用Rabbitmqctl?) 怎么样 import pika credentials = pika.PlainCredentials('***', '***') parameters = pika.ConnectionParameters(host='localhost',port=5672,\ credentials=credentials, virtual_host='***') def handle_delivery(body): """Called when we receive a message from RabbitMQ""" print body def on_connected(con
8 2024-01-26
编程技术问答社区
将 url 传递给 scrapy 中从 RabbitMQ 消耗的解析方法
我正在使用砂纸从兔子中消耗消息(URL),但是当我使用产量调用解析方法将我的URL作为参数作为参数时.程序不在回调方法内.我的蜘蛛的代码 # -*- coding: utf-8 -*- import scrapy import pika from scrapy import cmdline import json class MydeletespiderSpider(scrapy.Spider): name = 'Mydeletespider' allowed_domains = [] start_urls = [] def callback(self,ch, method, properties, body): print(" [x] Received %r" % body) body=json.loads(body) url=body.get('url') yield scrapy.Request(url=url,ca
20 2023-12-25
编程技术问答社区
AttributeError: 'module'对象没有属性 'AsyncoreConnection';
我正在使用scrapy-rabbitmq将rabbitmq的URL获取到我的零食中.我在我的settings.py file 中使用以下内容 RABBITMQ_CONNECTION_PARAMETERS = {"credentials": pika.PlainCredentials('test', 'test'),'host': '10.0.12.103', 'port': 5672} 但我无法连接到兔子. AttributeError: 'module' object has no attribute 'AsyncoreConnection' 我问了另一个问题,即如何使用scrapy 解决方案 我必须在其源代码(Connection.py)上评论此行. connection = { 'blocking': pika.BlockingConnection, #'asyncore': pika.AsyncoreConnection, #
30 2023-12-02
编程技术问答社区
pika rabbitmq docker with tls-gen connection reset, no logs
我已经配置了一个docker w rabbitmq,从 tls-gen复制了证书在那里,停止/启动,我只能在发送TLS客户端Hello后立即建立连接和服务器重置. SSL没有有用的消息.记录全部空.故障排除TLS指南无济于事.感谢您的任何帮助. docker run -d -it --hostname=[REDACTED] -e RABBITMQ_LOGS=/var/log/rabbitmq/ -e RABBITMQ_SASL_LOGS=/var/log/rabbitmq/ -e RABBITMQ_DEFAULT_USER=[REDACTED] -e RABBITMQ_DEFAULT_PASS=[REDACTED] --name rabbitmq1 -p [REDACTED]:5672 -p [REDACTED]:15672 -v /scratch/databases/rabbitmq/var/lib/rabbitmq:/var/lib/rabbitmq rabbitmq:3-ma
28 2023-10-23
编程技术问答社区
使用 ExternalCredentials 验证 rabbitmq
我有一个兔子服务器,并使用带Python的皮卡库来生产/消费消息.出于开发目的,我只是使用 credentials = pika.PlainCredentials(, ) 我想将其更改为使用pika.externalcredentials或tls. 我已经设置了兔子服务器,以在端口5671上收听TLS,并正确配置了它.我能够与Localhost的RabbitMQ进行交流,但是当我尝试从Localhost以外的地方与之交流时,我并非如此.我感觉我的"凭据"是基于兔子中的"访客"用户. rabbitmq.config %% -*- mode: erlang -*- [ {rabbit, [ {ssl_listeners, [5671]}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN', 'EXTERNAL']}, {ssl_options, [{cacertfil
24 2023-10-23
编程技术问答社区
Rabbitmq consumer_timeout 行为未按预期运行?
我很难证明consumer_timeout设置正在按预期工作. 我可能做错了事或误解了消费者_timeout行为. 我所有的测试代码都提供: 基本上,我有一个consumer_timeout设置为10000ms(10sec),然后我尝试用回电消耗消息 在尝试确认消息之前,睡眠比超时值(20秒)更长. 由于超时,我应该有一个预言_FAIL的例外,但事实并非如此. 如果我在receive_timeout.py中设置SLEEP_DURATION的方式比consumer_timeout> 60秒的值都多. 引用来自超时 如果消费者的交付不超过超时值(默认情况下为30分钟),则其频道将使用Precondition_failed Channel exception关闭. 如果有人可以帮助我理解我做错了什么,那太好了,谢谢! 解决方案 一些有用的提示: 动态配置 您可以通过在RabbitMQ服务器上运行以下命令来动态设置consumer_time
26 2023-10-21
编程技术问答社区
Pika blocking_connection.py 连接到 RabbitMQ 的随机超时
我有一个兔子MQ在机器上运行 客户端和兔子都在同一网络上运行 RabbitMQ有许多客户 我可以从兔子和返回 ping客户端 计算机之间测量的最长延迟为12.1 ms 网络详细信息:标准交换机网络(在单个物理机上运行的虚拟机的网络 - 使用VMware VC) 我在初始化RPC连接时会随机超时 /usr/lib/python2.6/site-packages/pika-0.9.5-py2.6.egg/pika/pika/adapters/blocking_connection.py 问题是超时不一致,并且不时发生. 手动测试此问题并运行blocking_connection.py从同一台计算机上失败的计算机1000次. 这是我失败时遇到的错误: 2013-04-23 08:24:23,396 runtest-trigger.24397 24397 DEBUG producer_rabbit initiate_rpc_connec
24 2023-08-12
编程技术问答社区
如何使用 Python 中的 RabbitMQ pika.basic_consume 更改超时
使用RabbitMQ Python客户端运行subscriber.py: import pika, time credentials = pika.PlainCredentials('user', 'pass') parameters = pika.ConnectionParameters(host='localhost', port=6672, credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.basic_qos(prefetch_count=1) channel.queue_declare(queue='my_queue') def callback(ch, method, properties, body): ch.b
18 2023-07-28
编程技术问答社区
RabbitMQ 中的密钥感知消费者
让我们考虑一个系统,其中成千上万的客户数据已发布到RabbitMQ Exchange(client_id在此阶段已知).交换将它们路由到一个队列.最后,消息被单个应用程序消耗.效果很好. 但是,随着时间的流逝,消费应用程序成为瓶颈,需要水平缩放.问题是系统要求考虑特定客户端的消息被应用程序的同一实例所消耗. 我可以创建很多队列:每个客户端一个或使用主题交换并根据某些client_id前缀进行路由.不过,我看不到如何设计消费者应用程序的优雅方法,以便可以水平缩放它(因为它需要说明它明确消耗的排队). 我正在寻找解决此问题的兔子方法. 解决方案 rabbitMQ具有例如,系统可以具有以下拓扑:每个应用程序都可以定义一个独占阵容仅由一个连接和quee nefence and nection 我认为,在这种特殊情况下拥有分布式的缓存层是一个好主意,但是RabbitMQ提供了解决此类问题的插件.
40 2023-07-28
编程技术问答社区
如何使用 python 中的 pika (RabbitMQ) 为消费者添加多处理功能
我有非常基本的生产者 - 消费者代码,并在Python中编写了Pika Framework.问题是 - 消费者端在队列中的消息太慢.我进行了一些测试,发现我可以通过多处理将工作流程加快27次.问题是 - 我不知道在我的代码中添加多处理功能的正确方法是什么. import pika import json from datetime import datetime from functions import download_xmls def callback(ch, method, properties, body): print('Got something') body = json.loads(body) type = body[-1]['Type'] print('Object type in work currently ' + type) cnums = [x['cadnum'] for x in body[:-1]]
20 2023-07-28
编程技术问答社区