如何从队列中并行地读取消息?
情况 我们有一个消息队列.我们想并行处理消息并限制同时处理的消息的数量. 我们下面的试验代码会并行处理消息,但是只有在上一个过程完成后才启动一批新的流程.我们想重新启动任务完成. 换句话说:只要消息队列不是空的,任务的最大数量应始终处于活动状态. 试用代码 static string queue = @".\Private$\concurrenttest"; private static void Process(CancellationToken token) { Task.Factory.StartNew(async () => { while (true) { IEnumerable consumerTasks = ConsumerTasks(); await Task.WhenAll(consumerTasks);
0 2024-04-16
编程技术问答社区
所有的Process*方法和所有的message's过滤器的目的是什么?
我注意到Winforms具有许多方法,可以处理命令或键(Process*())和(pre)过滤系统的消息,但它们各自的目的对我来说仍然不清楚. 正式文档有些晦涩,我没有发现任何清晰而完整的回应. 我谈论以下方法: PreFilterMessage(ref Message m) ProcessCmdKey(ref Message msg, Keys keyData) WndProc(ref Message m) ProcessDialogKey(Keys keyData) PreProcessMessage(ref Message msg) ProcessKeyMessage(ref Message m) ProcessKeyPreview(ref Message m) 有些是用于拦截密钥(例如ProcessCmdKey或ProcessDialogKey),有些是用于拦截消息(彼此).但是为什么是多种方法?他们的目的和用例是什么? 我想每种方法的执
0 2024-04-15
编程技术问答社区
分批消费消息-RabbitMQ
我能够使用以上代码使用多个生产商发送的多个消息,并使用不同的路由键发送到同一交换,并能够将每个消息插入数据库. 这将消耗太多资源,因为消息将彼此插入DB.所以我决定去批处理插入物,发现我可以设置BasicQos 将消息限制设置为基本QoS中的10个,我的期望是Console.WriteLine必须写10条消息,但并不如预期. 我的期望是从队列中消耗n个数字消息,并进行批量插入,并成功发送ACK否则没有ACK 这是我使用的代码. using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_A"); c
0 2024-04-15
编程技术问答社区
使用EasyNetQ for RabbitMQ时,自定义错误队列名称?
而不是让我的例外情况进入 easynetq_default_error_queue 我想知道我是否有一种方法可以明确说明应该用于给定应用程序的错误队列的名称,因此错误t最终都进入了这个 easynetq_default_error_queue ? 我可以看到如何指定常规消息队列名称,但尚未设法找到有关错误队列名称的任何内容. 解决方案 是的,您可以通过覆盖Iconventions对象上的代表来自定义命名约定(或者只需创建自己的IT实现并将其注册为依赖项): 这应该有效: var bus = RabbitHutch.CreateBus("host=localhost"); bus.Advanced.Container.Resolve().ErrorExchangeNamingConvention = info => "MyExchangeNaming"; bus.Advanced.Container.Resolve
0 2024-04-15
编程技术问答社区
在ui线程中执行一个委托(使用消息泵)。
我有一个背景线程,可以与外部服务处理通信.每当背景线程收到一条消息时,我都想将其传递到UI线程以进行进一步处理(向用户显示). 当前,我已经制作了一个线程安全的消息队列,该消息是定期在Timer.tick中汇总并填充在背景线程中的.但是该解决方案是最佳的. 您知道如何使用消息泵将事件从背景线程传递给UI线程? 解决方案 有一些技术. 我发现这种Winforms技术始终易于使用,但是请注意,您需要一些微妙的规则才能正确.我试图捕获一个通用的,有效的实现,该实现正确处理代码段中的规则,我在Stackoverflow上发布的其他位置. 我不需要太多使用此技术,所以我对此并不是说什么有意义.但是,您应该知道它存在.我认为,这是确保在特定线程上下文中被调用的有效方法,即使该线程是不是UI线程. 如果您正在使用WPF,则WPF控件通常会源自DispatcherObject来提供调度器对象.这是一种比Control.Invoke()更丰富的特征同步技术,但也更复杂
0 2024-04-13
编程技术问答社区
使用C#和Apache NMS的ActiveMQ-计算队列中的消息数
我正在使用ActiveMQ使用C#应用程序发送和接收消息.但是,我遇到了一些困难,只是在队列中获取消息.这是我的代码: public int GetMessageCount() { int messageCount = 0; Uri connecturi = new Uri(this.ActiveMQUri); IConnectionFactory factory = new NMSConnectionFactory(connecturi); using (IConnection connection = factory.CreateConnection()) using (ISession session = connection.CreateSession()) { IDestination requestDestination = Ses
0 2024-04-13
编程技术问答社区
信息队列错误:找不到能够读取信息的格式化器
我正在写消息C#中的消息队列: queue.Send(new Message("message")); 我试图读取消息如下: Messages messages = queue.GetAllMessages(); foreach(Message m in messages) { String message = m.Body; //do something with string } 但是,我收到的错误消息说:"找不到能够阅读此消息的格式化者." 我在做什么错? 解决方案 我通过在每个消息中添加一个格式器来解决问题.在队列中添加格式器不起作用. Messages messages = queue.GetAllMessages(); foreach(Message m in messages) { m.Formatter = new XmlMessageFormatter(new String[] { "System.String,msc
0 2024-04-12
编程技术问答社区
如何向运行信息泵的STA线程发布信息?
因此,跟随 this ,我决定在专用的STA线程上明确实例化com对象.实验表明,com对象需要一个消息泵,我通过调用Application.Run()来创建它: private MyComObj _myComObj; // Called from Main(): Thread myStaThread = new Thread(() => { _myComObj = new MyComObj(); _myComObj.SomethingHappenedEvent += OnSomthingHappened; Application.Run(); }); myStaThread.SetApartmentState(ApartmentState.STA); myStaThread.Start(); 如何发布消息来自其他线程的STA线程的消息泵? 注意: 为了简洁起见,我大量编辑了这个问题. @servy答案的某些部分似乎无关,但它们是最初的问题.
0 2024-04-10
编程技术问答社区
检查IPC消息队列是否已经存在,而不创建它
我如何只检查消息队列是否不存在? 是否存在? 使用msgget在O_CREAT | O_EXCL flag的情况下,如果存在,则呼叫将在返回值-1中失败,但是如果不存在,则将创建一个新的消息队列.有什么方法可以检查? 解决方案 IPCS(1)提供有关IPC设施的信息,IPCRM(1)可用于从系统中删除IPC对象. 列表共享内存段: IPCS -M 列表消息队列: IPCS -Q 删除用shmkey创建的共享内存段: ipcrm -m键 删除由SHMID确定的共享存储段: IPCRM -M ID 删除用msgkey创建的消息队列: ipcrm -q键 删除MSGID标识的消息队列: IPCRM -Q ID
4 2024-04-10
编程技术问答社区
通过IPC传递非PODs[普通数据类型]。
我正在编写用于执行IPC的实现.用户打电话,我接管所有这些参数并将其传递到其他过程中. 我已经为基于逻辑的此类功能编写了一个自动代码生成器,该函数的工作原理: 将所有参数拿到结构中. 添加IPC所需的其他信息.将此结构的大小和指针传递给POSIX消息队列. 该地址的数据,直到指定的大小为止,已读取并发送到其他过程. de构造结构以获取参数. 使用这些参数调用实际功能. 当我只有普通的旧数据类型时,这很好.但是,当函数参数是非pods时,我的逻辑就会失败,因为: 如果非POD类型[消息队列要求] ,我无法真正说出总数据的大小 某些类可能包含动态增加的实体,例如向量. 有人可以知道我如何在这种情况下如何处理? 解决方案 您需要决定如何进行序列化. 例如.您可以定义一个代表当事方之间交换的消息的类型,然后实现将对象序列序列到消息的通用函数.当您具有自定义逻辑时,您会专注于序列化函数. 这是一些伪代码: class Message
0 2024-04-09
编程技术问答社区
Linux POSIX消息队列:消息太长
我要做的是,虽然孩子流程在文件上找到素数,但父母应将其写入另一个文件.儿童流程将发送素数作为M数字的消息.但是,我无法接收消息,我会收到Message too long错误. int main() { mqd_t mq; mq = mq_open(MQ_NAME, O_RDWR | O_CREAT, 0666, NULL); if (mq == (mqd_t) - 1) { perror("Cannot create msg queue"); exit(EXIT_FAILURE); } // Start child processes for (int i = 0; i
2 2024-04-09
编程技术问答社区
例子 mq_timedreceive
我找不到如何与MQ_TIMEDRECEIVE正常工作,任何人都可以给我一个例子吗? ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec *abs_timeout); 我希望时间不会花费超过20秒的时间. 非常感谢. 解决方案 struct timespec tm; clock_gettime(CLOCK_REALTIME, &tm); tm.tv_sec += 20; // Set for 20 seconds if( 0 > mq_timedreceive( fd, buf, 4096, NULL, &tm ) ) { ... } 看完整的描述
0 2024-04-09
编程技术问答社区
XPending循环使CPU达到100%。
美好的一天! 我在制作XLIB项目时遇到了一些麻烦.这是我项目的结构: [ Init ] [ Making some stuff ] [ Creating a timer thread (see code below) ] [ Main cycle (see code below) ] 当用户按任何按钮时,我将线程中的标志设置为真实的值,并开始每次n时将CustMonsage发送到窗口. while (warehouse.destroyflag != SML_DEAD) { if (XPending(warehouse.display)) { XNextEvent(warehouse.display, &event); 但是这里有一些问题. 随着主周期的当前实现,我有大约100%CPU负载.但是,当我从代码中删除Xpending线时,负载将为0%.但是在那种情况下,我没有从另一个线程到达的正确的Custommessage.
0 2024-04-09
编程技术问答社区
C语言中的消息队列:实现双向通信
我是C.我需要两个队列或只有一个来完成此操作吗? 我也想知道我可以将数据(在代码中显示)发送到另一个过程,或者我需要将其声明为字符数组. typedef struct msg1 { int mlen; char *data; }M1; typedef struct msgbuf { long mtype; M1 *m; } message_buf; 预先感谢:) 解决方案 我也想知道我可以将数据(在代码中显示)发送到另一个过程,或者我需要将其声明为字符数组 是的,您可以将数据发送到另一个过程 喜欢 #include #include #include #include #include #include #define MAXSIZE 128 v
0 2024-04-08
编程技术问答社区
msgget()和mq_open的区别
我阅读了有关消息队列操作的信息,例如msgget(),msgsnd()和msgrcv().但是,当我在堆栈溢出上搜索消息队列相关的问题时,我知道还有另一组消息队列操作,例如mq_open(),mq_send(),mq_receive().可以让我知道这两种类型的消息队列与哪种类型的消息队列之间有什么区别? 解决方案 基本上,msgget,msgsnd,msgrcv是系统v ipc,而mq_open,mq_send,mq_send,mq_receive是posix ipc. 一个很好的解释:系统v ipc vs posix vs posix ipc 总而言之,POSIX IPC是在系统V IPC之后设计的.因此,许多旧系统仅支持系统V IPC,而新系统也开始支持POSIX IPC.而且,由于POSIX IPC可以从系统V IPC中学习优势和缺点,因此可以更好地设计和实施POSIX IPC.一个值得注意的区别是所有POSIX IPC接口都是线程安全的. 其他解决方案
2 2024-04-08
编程技术问答社区
奇怪的posix消息队列链接问题--有时它不能正确链接
当我构建以下代码时,它会构建正常.如果我更改代码以评论" while",则使用同一命令行,它不会构建(请参见下文) #include #include int main(int argc, char *argv[]) { while (1) { } mq_open("/YouSUCK", O_RDWR | O_CREAT | O_EXCL, S_IRWXU | S_IRWXG, NULL); return 0; } dada@thud:~/RaspberryPI$ gcc -g -Wall -lrt -o mqtest mqtest.c dada@thud:~/RaspberryPI$ #include #include int main(int argc, char *argv[]) { // while (1) { } mq_open("/YouSU
0 2024-04-08
编程技术问答社区
如何偷看Linux(POSIX)的消息队列而不删除一个项目?
我需要在不删除消息的情况下窥视一个消息队列.我将仅在符合某些条件时才删除消息队列项目.这个怎么做?以下是我知道的API-但似乎没有人支持窥视. mq_close() - 关闭消息队列 mq_getattr() - 获取消息队列的当前属性 mq_notify() - 当队列变为非eppripty 时,通知通话过程 mq_open() - 打开或创建消息队列 mq_receive() - 从队列接收一条消息 mq_send() - 在消息队列中输入消息 mq_setattr() - 设置消息队列的标志 mq_unlink() - UNLINK(即删除)消息队列 有没有办法窥视消息而不删除消息? 解决方案 窥视可能是消息队列的坏主意,因为像Sehe一样,类似种族条件的危险.只需假设您已经窥视了一条消息;由于您无法锁定队列,因此您将无法可靠地检索与您窥视的相同消息.如果您有两个从同一队列接收互斥消息的过程,则应考虑将这些消息分为两个
2 2024-04-06
编程技术问答社区
确定一个窗口信息的优先级
有什么方法可以通过编程方式检查窗口消息的优先级? 例如:已知一些窗口消息,WM_PAINT和WM_TIMER的优先级最低,并以最高优先级的消息放置在消息之后. 我正在寻找可以确认两条消息中的哪一条将具有最低或最高优先级,否则将首先发送或最后一条消息? 解决方案 这不是它的工作方式,Windows消息没有优先级.它主要取决于消息的生成方式.消息循环按以下顺序派遣消息: 首先,以sendmessage()生成的任何消息按呼叫的顺序派发 接下来,以postmessage()生成并存储在消息队列中的任何消息,按队列顺序 接下来,从窗口状态合成的任何消息. wm_timer,wm_paint和wm_mousemove适合此类别. "从窗口状态"子句中综合的是使WM_PAINT和WM_TIMER似乎具有较低优先级的原因.以及为什么迅速移动鼠标不会用鼠标消息淹没消息队列.但是,这不是独家的,例如,您可以致电UpdateWindow()强制发送WM_PAINT消息,从
2 2024-04-06
编程技术问答社区
如何模拟线程间的广播信息传递
我使用Python 3.6编写了一个小的并发程序.我有个问题: 我的程序有一个小线程类(模拟线程); 此类中有3种用作子线程的方法: class myThread(Thread): def __init__(self, identifier): super(myThread, self).__init__() def fun1(self): # broadcasts messages def fun2(self): # event that occurs when a message arrives # do something def fun3(self): # event that occurs when a message arrives # do something def run(self): t1 = Thread(target = self.fun1) t2 = Thread
2 2024-04-03
编程技术问答社区
boost::interprocess消息队列timed_receive()内部程序
im当前使用 timed_receive()方法从 boost :: interconcess 库接收数据.由于接收到的消息的时机会有所不同,因此我在 recept()方法上使用了此方法. msgque->timed_receive((void*) &message,sizeof(int),recvd_size,priority, boost::posix_time::ptime(microsec_clock::universal_time()) + boost::posix_time::milliseconds(300)) 问题: 该方法如何知道缓冲区中存在消息?它是一种投票机制还是实施了更复杂的机制? 我阅读了文档,找不到任何详细信息,源代码也没有信息. 已经谢谢. 解决方案 库不需要记录其工作原理,因为这是一个实现细节.理想情况下,您不需要知道,这就是为什么您首先使用库的原因. 您可以期望图书馆以更原始的库构建块来实施:
0 2024-04-02
编程技术问答社区