问题描述
我正在尝试运行kafka-0.8 log4j appender,但我无法做到. 我希望我的应用程序通过log4j appender直接将日志发送到kafka.
这是我的log4j.properties. 我无法找到任何适当的编码器,因此我只是将其配置为使用默认编码器. (例如我评论了这条线.)
log4j.rootLogger=INFO, stdout, KAFKA log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout log4j.appender.KAFKA.layout.ConversionPattern=%-5p: %c - %m%n log4j.appender.KAFKA.BrokerList=hnode01:9092 log4j.appender.KAFKA.Topic=DKTestEvent #log4j.appender.KAFKA.SerializerClass=kafka.log4j.AppenderStringEncoder
这是我的示例应用程序.
import org.apache.log4j.Logger; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.PropertyConfigurator; public class HelloWorld { static Logger logger = Logger.getLogger(HelloWorld.class.getName()); public static void main(String[] args) { PropertyConfigurator.configure(args[0]); logger.info("Entering application."); logger.debug("Debugging!."); logger.info("Exiting application."); } }
我使用maven进行编译. 我在我的pom.xml
中包括kafka_2.8.2-0.8.0和log4j_1.2.17我遇到了这些错误:
INFO [main] (Logging.scala:67) - Verifying properties INFO [main] (Logging.scala:67) - Property metadata.broker.list is overridden to hnode01:9092 INFO [main] (Logging.scala:67) - Property serializer.class is overridden to kafka.serializer.StringEncoder INFO [main] (HelloWorld.java:14) - Entering application. INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 0 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 1 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 2 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 3 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 4 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 5 for 1 topic(s) Set(DKTestEvent) . . . INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 60 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 61 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 62 for 1 topic(s) Set(DKTestEvent) INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 63 for 1 topic(s) Set(DKTestEvent) INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 64 for 1 topic(s) Set(DKTestEvent) INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 65 for 1 topic(s) Set(DKTestEvent) INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 66 for 1 topic(s) Set(DKTestEvent) INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 67 for 1 topic(s) Set(DKTestEvent) . . . INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 534 for 1 topic(s) Set(DKTestEvent) ERROR [main] (Logging.scala:67) - ERROR [main] (Logging.scala:67) - ERROR [main] (Logging.scala:67) - ERROR [main] (Logging.scala:67) - ERROR [main] (Logging.scala:67) - ERROR [main] (Logging.scala:67) - java.lang.StackOverflowError at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:643) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) at java.net.URLClassLoader.access$000(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:212) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:323) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:268) at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:643) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) at java.net.URLClassLoader.access$000(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:212) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:323) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:268) at org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87) at org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413) at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313) at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) at org.apache.log4j.Category.callAppenders(Category.java:206) at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.error(Category.java:322) at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) at kafka.utils.Utils$.swallow(Utils.scala:189) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.Producer.send(Producer.scala:76) at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96) at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) at org.apache.log4j.Category.callAppenders(Category.java:206) at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.info(Category.java:666) at kafka.utils.Logging$class.info(Logging.scala:67) at kafka.client.ClientUtils$.info(ClientUtils.scala:31) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) at kafka.utils.Utils$.swallow(Utils.scala:187) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.Producer.send(Producer.scala:76) at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96) at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) . . .
如果我不终止程序,我会不断地超越错误.
如果我错过了什么,请告诉我.
推荐答案
我认为乔纳斯已经确定了这个问题,那就是kafka生产商的记录也正在登录到kafka appender,从而导致无限环路和最终的堆栈溢出(无需双关语) 您可以配置所有kafka日志以转到其他appender.以下节目将输出发送到stdout:
log4j.logger.kafka=INFO, stdout
因此,您应该在log4j.properties
中获得以下内容log4j.rootLogger=INFO, stdout, KAFKA log4j.logger.kafka=INFO, stdout log4j.logger.HelloWorld=INFO, KAFKA
其他推荐答案
我已经能够通过kafka中的log4j生成事件0.8.2.2.这是我的log4j配置:
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> <appender name="console" class="org.apache.log4j.ConsoleAppender"> <param name="Target" value="System.out" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%-5p %c{1} - %m%n" /> </layout> </appender> <appender name="fileAppender" class="org.apache.log4j.RollingFileAppender"> <param name="Threshold" value="INFO" /> <param name="MaxBackupIndex" value="100" /> <param name="File" value="/tmp/agna-LogFile.log" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d %-5p [%c{1}] %m %n" /> </layout> </appender> <appender name="kafkaAppender" class="kafka.producer.KafkaLog4jAppender"> <param name="Topic" value="kafkatopic" /> <param name="BrokerList" value="localhost:9092" /> <param name="syncSend" value="true" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n" /> </layout> </appender> <logger name="org.apache.kafka"> <level value="error" /> <appender-ref ref="console" /> </logger> <logger name="com.example.kafkaLogger"> <level value="debug" /> <appender-ref ref="kafkaAppender" /> </logger> <root> <priority value="debug" /> <appender-ref ref="console" /> </root> </log4j:configuration>
这是源代码:
package com.example; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class JsonProducer { static Logger defaultLogger = LoggerFactory.getLogger(JsonProducer.class); static Logger kafkaLogger = LoggerFactory.getLogger("com.example.kafkaLogger"); public static void main(String args[]) { JsonProducer obj = new JsonProducer(); String str = obj.getJsonObjAsString(); // Use the logger kafkaLogger.info(str); try { // Construct and send message obj.constructAndSendMessage(); } catch (InterruptedException e) { defaultLogger.error("Caught interrupted exception " + e); } catch (ExecutionException e) { defaultLogger.error("Caught execution exception " + e); } } private String getJsonObjAsString() { JSONObject obj = new JSONObject(); obj.put("name", "John"); obj.put("age", new Integer(55)); obj.put("address", "123 MainSt, Palatine, IL"); JSONArray list = new JSONArray(); list.add("msg 1"); list.add("msg 2"); list.add("msg 3"); obj.put("messages", list); return obj.toJSONString(); } private void constructAndSendMessage() throws InterruptedException, ExecutionException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); boolean sync = false; String topic = "kafkatopic"; String key = "mykey"; String value = "myvalue1 mayvalue2 myvalue3"; ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value); if (sync) { producer.send(producerRecord).get(); } else { producer.send(producerRecord); } producer.close(); } }
整个项目在以下链接下可用:
https://github.com/ypant/kafka-json-json-producer.git
其他推荐答案
尝试设置Appender Async,如这样: log4j.appender.kafka.producertype = async
似乎是合理的,因为Kafka生产商本身有记录.
问题描述
I am trying to run Kafka-0.8 Log4j appender and I am unable to make it. I want my application to send log directly to kafka via Log4j appender.
Here is my log4j.properties. I couldn`t find any proper encoder, so I just configure it to use default encoder. (e.g I commented the line.)
log4j.rootLogger=INFO, stdout, KAFKA log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout log4j.appender.KAFKA.layout.ConversionPattern=%-5p: %c - %m%n log4j.appender.KAFKA.BrokerList=hnode01:9092 log4j.appender.KAFKA.Topic=DKTestEvent #log4j.appender.KAFKA.SerializerClass=kafka.log4j.AppenderStringEncoder
And this is my sample application.
import org.apache.log4j.Logger; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.PropertyConfigurator; public class HelloWorld { static Logger logger = Logger.getLogger(HelloWorld.class.getName()); public static void main(String[] args) { PropertyConfigurator.configure(args[0]); logger.info("Entering application."); logger.debug("Debugging!."); logger.info("Exiting application."); } }
I used maven for compiling. I included kafka_2.8.2-0.8.0 and log4j_1.2.17 in my pom.xml
And I am getting these error:
INFO [main] (Logging.scala:67) - Verifying properties INFO [main] (Logging.scala:67) - Property metadata.broker.list is overridden to hnode01:9092 INFO [main] (Logging.scala:67) - Property serializer.class is overridden to kafka.serializer.StringEncoder INFO [main] (HelloWorld.java:14) - Entering application. INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 0 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 1 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 2 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 3 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 4 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 5 for 1 topic(s) Set(DKTestEvent) . . . INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 60 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 61 for 1 topic(s) Set(DKTestEvent) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 62 for 1 topic(s) Set(DKTestEvent) INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 63 for 1 topic(s) Set(DKTestEvent) INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 64 for 1 topic(s) Set(DKTestEvent) INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 65 for 1 topic(s) Set(DKTestEvent) INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 66 for 1 topic(s) Set(DKTestEvent) INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 67 for 1 topic(s) Set(DKTestEvent) . . . INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 534 for 1 topic(s) Set(DKTestEvent) ERROR [main] (Logging.scala:67) - ERROR [main] (Logging.scala:67) - ERROR [main] (Logging.scala:67) - ERROR [main] (Logging.scala:67) - ERROR [main] (Logging.scala:67) - ERROR [main] (Logging.scala:67) - java.lang.StackOverflowError at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:643) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) at java.net.URLClassLoader.access$000(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:212) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:323) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:268) at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:643) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) at java.net.URLClassLoader.access$000(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:212) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:323) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:268) at org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87) at org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413) at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313) at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) at org.apache.log4j.Category.callAppenders(Category.java:206) at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.error(Category.java:322) at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) at kafka.utils.Utils$.swallow(Utils.scala:189) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.Producer.send(Producer.scala:76) at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96) at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) at org.apache.log4j.Category.callAppenders(Category.java:206) at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.info(Category.java:666) at kafka.utils.Logging$class.info(Logging.scala:67) at kafka.client.ClientUtils$.info(ClientUtils.scala:31) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) at kafka.utils.Utils$.swallow(Utils.scala:187) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.Producer.send(Producer.scala:76) at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96) at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) . . .
I am getting above error continuously if i don`t terminate the program.
If I miss something, kindly let me know.
推荐答案
I think Jonas has identified the problem, that is the Kafka producer logging is also getting logged to the Kafka appender causing an infinite loop and eventual stack overflow (no pun intended) You can configure all Kafka logs to go to a different appender. The following shows sending the output to stdout:
log4j.logger.kafka=INFO, stdout
So you should end up with the following in your log4j.properties
log4j.rootLogger=INFO, stdout, KAFKA log4j.logger.kafka=INFO, stdout log4j.logger.HelloWorld=INFO, KAFKA
其他推荐答案
I have been able to generate events via log4j in Kafka 0.8.2.2. Here is my log4j configuration:
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> <appender name="console" class="org.apache.log4j.ConsoleAppender"> <param name="Target" value="System.out" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%-5p %c{1} - %m%n" /> </layout> </appender> <appender name="fileAppender" class="org.apache.log4j.RollingFileAppender"> <param name="Threshold" value="INFO" /> <param name="MaxBackupIndex" value="100" /> <param name="File" value="/tmp/agna-LogFile.log" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d %-5p [%c{1}] %m %n" /> </layout> </appender> <appender name="kafkaAppender" class="kafka.producer.KafkaLog4jAppender"> <param name="Topic" value="kafkatopic" /> <param name="BrokerList" value="localhost:9092" /> <param name="syncSend" value="true" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n" /> </layout> </appender> <logger name="org.apache.kafka"> <level value="error" /> <appender-ref ref="console" /> </logger> <logger name="com.example.kafkaLogger"> <level value="debug" /> <appender-ref ref="kafkaAppender" /> </logger> <root> <priority value="debug" /> <appender-ref ref="console" /> </root> </log4j:configuration>
Here is the source code:
package com.example; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class JsonProducer { static Logger defaultLogger = LoggerFactory.getLogger(JsonProducer.class); static Logger kafkaLogger = LoggerFactory.getLogger("com.example.kafkaLogger"); public static void main(String args[]) { JsonProducer obj = new JsonProducer(); String str = obj.getJsonObjAsString(); // Use the logger kafkaLogger.info(str); try { // Construct and send message obj.constructAndSendMessage(); } catch (InterruptedException e) { defaultLogger.error("Caught interrupted exception " + e); } catch (ExecutionException e) { defaultLogger.error("Caught execution exception " + e); } } private String getJsonObjAsString() { JSONObject obj = new JSONObject(); obj.put("name", "John"); obj.put("age", new Integer(55)); obj.put("address", "123 MainSt, Palatine, IL"); JSONArray list = new JSONArray(); list.add("msg 1"); list.add("msg 2"); list.add("msg 3"); obj.put("messages", list); return obj.toJSONString(); } private void constructAndSendMessage() throws InterruptedException, ExecutionException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); boolean sync = false; String topic = "kafkatopic"; String key = "mykey"; String value = "myvalue1 mayvalue2 myvalue3"; ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value); if (sync) { producer.send(producerRecord).get(); } else { producer.send(producerRecord); } producer.close(); } }
The whole project is a available under the following link:
https://github.com/ypant/kafka-json-producer.git
其他推荐答案
Try to set the appender async, like this: log4j.appender.KAFKA.ProducerType=async
Seems reasonable that it goes in to an infinite loop because the kafka producer has logging in itself..