Today, when using the code to write the Kafka producer to send a message, it’s funny because of my mistake
The same code and Kafka were executed not long ago, and there is no problem. The code is as follows
package streaming.utils
import java.util
import java.util.{Date, Properties, UUID}
import com.alibaba.fastjson.JSONObject
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.util.Random
/**
* Author: Michael PK QQ: 1990218038
*
* Kafka数据生产者
*/
object ProducerApp {
def main(args: Array[String]): Unit = {
val props = new Properties
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("bootstrap.servers", ParamsConf.brokers)
props.put("request.required.acks","1")
val topic = ParamsConf.topic
val producer = new KafkaProducer[String,String](props)
val random = new Random()
val dateFormat = FastDateFormat.getInstance("yyyyMMddHHmmss")
for(i <- 1 to 100){
val time = dateFormat.format(new Date())+""
val userid = random.nextInt(1000)+""
val courseid = random.nextInt(500)+""
val fee = random.nextInt(400)+""
val result = Array("0","1") // 0未成功支付,1成功支付
val flag = result(random.nextInt(2))
var orderid = UUID.randomUUID().toString
val map = new util.HashMap[String, Object]()
map.put("time", time)
map.put("userid",userid)
map.put("courseid",courseid)
map.put("fee", fee)
map.put("flag", flag)
map.put("orderid",orderid)
val json = new JSONObject(map)
producer.send(new ProducerRecord[String,String](topic(0),json.toJSONString))
}
println("PK Kafka Producer finished producing data ...")
}
}
The code is simple. It’s just used to simulate production data
Kafka client version 2.0 has always been used
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
But when I execute the above code today. There will be no messages and no error messages. The program is not closed either
It is found that the card is in dosend code by debugging
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
Although it throws an exception, it cannot enter
if (metadata.isClosed()) Inside the logic
The outer layer does not capture its exception. Through debug, the exception e is failed to update metadata after 60000 Ms. failed to update metadata after 60000 Ms
Considering that its version is different from the server version, try to reduce the version
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
When it is executed, it will keep prompting: because it is in a dead loop
lientId=producer-1] Error while fetching metadata with correlation id
And tested: telnet 192.168.0.205 9092 is no problem
It is possible to use commands to produce consumption messages on the local server
That’s strange. So I switched to another topic to test and found that there was no problem
at the same time, I noticed that the topic name I wrote was followed by a space
low level error! But is the space behind really a space?My own test hand knocks the space, after the test does not have the question. That is, if there is a space after the theme is OK
that is, what invisible content comes with the topic name. I remember that I copied the name of this topic to estimate what other content was copied