Skip to content

KafkaConsumer取不到数据

结论

  1. consumer如果不指定group.id,Kafka会抛出如下错误
    log
        org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
  2. 如果多个consumer的group.id相同,消费的topic也相同,consumer可能会取不到数据,此处自定义了一个与业务使用的groupid不同的id;
  3. 再次验证时无该问题consumer需要指定client.id,否则consumer group不会被创建,不影响producer生产数据,但consumer取不到数据。可通过kafka-consumer-group --list参数进行验证。

代码

只贴关键部分代码

以官方demo为例,创建Properties

Java
Properties props = new Properties();
// Kafka Server可能有多个,将KafkaServer配置用逗号拼接
props.put("bootstrap.servers", Strings.join(kafkaServers, ","));
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// org.apache.kafka.clients.admin.AdminClientConfig#REQUEST_TIMEOUT_MS_CONFIG
props.put(REQUEST_TIMEOUT_MS_CONFIG, 1000);

生产数据代码

Java
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// TEST_TOPIC为自定义的topic名,此处TEST_TOPIC = "pacm-topic"
Future<RecordMetadata> sendFuture = producer.send(new ProducerRecord(TEST_TOPIC, "ping"));
long now = System.currentTimeMillis();
// producer中未发现阻塞send方法,所以循环中判断,做5秒超时。
while (!sendFuture.isDone()) {
    if (System.currentTimeMillis() > now + 1000 * 5) {
        response.setProduce(0);
        break;
    }
}
// 此处判断是否完成
sendFuture.isDone() ? 1 : 0;
producer.close();

消费数据代码

Java
// 不加group.id 会报错 org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
props.put("group.id", Const.Queue.PRODUCE_AND_CONSUME_MONITOR_GROUP_ID);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList(TEST_TOPIC));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
Iterator<ConsumerRecord<String, String>> consumerRecordIterator = records.records(TEST_TOPIC).iterator();
while (consumerRecordIterator.hasNext()) {
    ConsumerRecord<String, String> record = consumerRecordIterator.next();
    TopicPartition partition = new TopicPartition(record.topic(), record.partition());
    OffsetAndMetadata oam = new OffsetAndMetadata(record.offset());
    Map<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
    map.put(partition, oam);
    consumer.commitSync(map);
    response.setConsume(1);
}
consumer.close();

参考内容

https://stackoverflow.com/questions/38024514/understanding-kafka-topics-and-partitions