KafkaConsumer取不到数据
结论
- consumer如果不指定group.id,Kafka会抛出如下错误log
org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
- 如果多个consumer的group.id相同,消费的topic也相同,consumer可能会取不到数据,此处自定义了一个与业务使用的groupid不同的id;
- 再次验证时无该问题
consumer需要指定client.id,否则consumer group不会被创建,不影响producer生产数据,但consumer取不到数据。可通过kafka-consumer-group --list参数进行验证。
代码
只贴关键部分代码
以官方demo为例,创建Properties
Java
Properties props = new Properties();
// Kafka Server可能有多个,将KafkaServer配置用逗号拼接
props.put("bootstrap.servers", Strings.join(kafkaServers, ","));
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// org.apache.kafka.clients.admin.AdminClientConfig#REQUEST_TIMEOUT_MS_CONFIG
props.put(REQUEST_TIMEOUT_MS_CONFIG, 1000);
生产数据代码
Java
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// TEST_TOPIC为自定义的topic名,此处TEST_TOPIC = "pacm-topic"
Future<RecordMetadata> sendFuture = producer.send(new ProducerRecord(TEST_TOPIC, "ping"));
long now = System.currentTimeMillis();
// producer中未发现阻塞send方法,所以循环中判断,做5秒超时。
while (!sendFuture.isDone()) {
if (System.currentTimeMillis() > now + 1000 * 5) {
response.setProduce(0);
break;
}
}
// 此处判断是否完成
sendFuture.isDone() ? 1 : 0;
producer.close();
消费数据代码
Java
// 不加group.id 会报错 org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
props.put("group.id", Const.Queue.PRODUCE_AND_CONSUME_MONITOR_GROUP_ID);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList(TEST_TOPIC));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
Iterator<ConsumerRecord<String, String>> consumerRecordIterator = records.records(TEST_TOPIC).iterator();
while (consumerRecordIterator.hasNext()) {
ConsumerRecord<String, String> record = consumerRecordIterator.next();
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata oam = new OffsetAndMetadata(record.offset());
Map<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
map.put(partition, oam);
consumer.commitSync(map);
response.setConsume(1);
}
consumer.close();
参考内容
https://stackoverflow.com/questions/38024514/understanding-kafka-topics-and-partitions