Here is a simple Kafka producer Code:
package com.bonc.rdpe.spark.kafka08
import java.io.{BufferedReader, FileReader}
import java.util.Properties
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
/**
* Author: YangYunhe
* Description:
* Create: 2018/7/24 19:33
*/
object Kafka08Producer {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "jed:9095,jed:9096,jed:9097")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val br = new BufferedReader(new FileReader("D:\\data\\news_profile_data.txt"))
var line = ""
while((line = br.readLine()) != null) {
val record = new ProducerRecord[String, String]("topic001", line)
producer.send(record, new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if(recordMetadata != null) {
println(s"topic: ${recordMetadata.topic()}, partition: ${recordMetadata.partition()}, offset: ${recordMetadata.offset()}")
}
if(e != null) {
e.printStackTrace()
}
}
})
Thread.sleep(1000)
}
producer.close()
}
}
Program error after running:
java.net.ConnectException: Connection timed out: no further information
Solution:
turn off Linux Firewall
CentOS 7
[root@jed bin]# systemctl stop firewalld.service
[root@jed bin]# systemctl disable firewalld.service
CentOS 6
[root@jed bin]# servcie iptables stop
[root@jed bin]# chkconfig iptables off