Kafka Consumer API

高级API

在控制台创建发送者

$ bin/kafka-console-producer.sh \
--broker-list hadoop000:9092 --topic first

>hello world

创建消费者(过时API)

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class CustomConsumer {

  @SuppressWarnings("deprecation")
  public static void main(String[] args) {
     Properties properties = new Properties();
     properties.put("zookeeper.connect", "hadoop000:2181");
     properties.put("group.id", "g1");
     properties.put("zookeeper.session.timeout.ms", "500");
     properties.put("zookeeper.sync.time.ms", "250");
     properties.put("auto.commit.interval.ms", "1000");

     // 创建消费者连接器
     ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

     HashMap topicCount = new HashMap<>();
     topicCount.put("first", 1);

     Map>> consumerMap = consumer.createMessageStreams(topicCount);

     KafkaStream stream = consumerMap.get("first").get(0);

     ConsumerIterator it = stream.iterator();

     while (it.hasNext()) {
       System.out.println(new String(it.next().message()));
     }
  }
}

官方提供案例(自动维护消费情况, 新API)

高级消费者和简单的消费者有以下的区别。

1.自动/隐藏偏移管理(Offset Management )

2.自动(简单)分区分配

3.Broker 故障转移 => 自动重新平衡

4.Consumer 故障转移 => 自动重新平衡

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class CustomNewConsumer {

  public static void main(String[] args) {

     Properties props = new Properties();
     // 定义kakfa 服务的地址,不需要将所有broker指定上 
     props.put("bootstrap.servers", "hadoop000:9092");
     // 制定consumer group 
     props.put("group.id", "test");
     // 是否自动确认offset 
     props.put("enable.auto.commit", "true");
     // 自动确认offset的时间间隔 
     props.put("auto.commit.interval.ms", "1000");
     // key的序列化类
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     // value的序列化类 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     // 定义consumer 
     KafkaConsumer consumer = new KafkaConsumer<>(props);
     // 消费者订阅的topic, 可同时订阅多个 
     consumer.subscribe(Arrays.asList("first", "second","third"));

     while (true) {
       // 读取数据,读取超时时间为100ms 
       ConsumerRecords records = consumer.poll(100);

       for (ConsumerRecord record : records)
         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }
  }
}

低级API

也叫Simple Consumer, 实际使用起来并不简单.

实现使用低级API读取指定topic,指定partition,指定offset的数据。

1)消费者使用低级API 的主要步骤:

步骤 主要工作
1 根据指定的分区从主题元数据中找到主副本
2 获取分区最新的消费进度
3 从主副本拉取分区的消息
4 识别主副本的变化,重试

2)方法描述:

findLeader() 客户端向种子节点发送主题元数据,将副本集加入备用节点
getLastOffset() 消费者客户端发送偏移量请求,获取分区最近的偏移量
run() 消费者低级AP I拉取消息的主要方法
findNewLeader() 当分区的主副本节点发生故障,客户将要找出新的主副本

3)代码:

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class SimpleExample {
  private List m_replicaBrokers = new ArrayList<>();

  public SimpleExample() {
    m_replicaBrokers = new ArrayList<>();
  }

  public static void main(String args[]) {
    SimpleExample example = new SimpleExample();
    // 最大读取消息数量
    long maxReads = Long.parseLong("3");
    // 要订阅的topic
    String topic = "test1";
    // 要查找的分区
    int partition = Integer.parseInt("0");
    // broker节点的ip
    List seeds = new ArrayList<>();
    seeds.add("192.168.9.102");
    seeds.add("192.168.9.103");
    seeds.add("192.168.9.104");
    // 端口
    int port = Integer.parseInt("9092");
    try {
      example.run(maxReads, topic, partition, seeds, port);
    } catch (Exception e) {
      System.out.println("Oops:" + e);
      e.printStackTrace();
    }
  }

  public void run(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws Exception {

    // 获取指定Topic partition的元数据
    PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);

    if (metadata == null) {
      System.out.println("Can't find metadata for Topic and Partition. Exiting");
      return;
    }

    if (metadata.leader() == null) {
      System.out.println("Can't find Leader for Topic and Partition. Exiting");
      return;
    }

    String leadBroker = metadata.leader().host();
    String clientName = "Client" + a_topic + "" + a_partition;

    SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
    long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
    int numErrors = 0;

    while (a_maxReads > 0) {
      if (consumer == null) {
        consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);

      }

      FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();

      FetchResponse fetchResponse = consumer.fetch(req);

      if (fetchResponse.hasError()) {
        numErrors++;
        // Something went wrong!
        short code = fetchResponse.errorCode(a_topic, a_partition);
        System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);

        if (numErrors > 5)
          break;
        if (code == ErrorMapping.OffsetOutOfRangeCode()) {
          // We asked for an invalid offset. For simple case ask for
          // the last element to reset
          readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
          continue;
        }

        consumer.close();
        consumer = null;
        leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
        continue;
      }

      numErrors = 0;

      long numRead = 0;
      for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
        long currentOffset = messageAndOffset.offset();
        if (currentOffset < readOffset) {
          System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
          continue;
        }

        readOffset = messageAndOffset.nextOffset();
        ByteBuffer payload = messageAndOffset.message().payload();

        byte[] bytes = new byte[payload.limit()];
        payload.get(bytes);
        System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
        numRead++;
        a_maxReads--;
      }

      if (numRead == 0) {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException ie) {
        }
      }
    }

    if (consumer != null)
      consumer.close();
  }

  public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {

    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);

    Map requestInfo = new HashMap();

    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));

    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);

    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
      System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
      return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
  }

  private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {

    for (int i = 0; i < 3; i++) {
      boolean goToSleep = false;
      PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
      if (metadata == null) {
        goToSleep = true;
      } else if (metadata.leader() == null) {
        goToSleep = true;
      } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
        // first time through if the leader hasn't changed give
        // ZooKeeper a second to recover
        // second time, assume the broker did recover before failover,
        // or it was a non-Broker issue

        goToSleep = true;
      } else {
        return metadata.leader().host();
      }

      if (goToSleep) {
           Thread.sleep(1000);
      }
    }
    System.out.println("Unable to find new leader after Broker failure. Exiting");
    throw new Exception("Unable to find new leader after Broker failure. Exiting");
  }

  private PartitionMetadata findLeader(List a_seedBrokers, int a_port, String a_topic, int a_partition) {
    PartitionMetadata returnMetaData = null;

    loop:
    for (String seed : a_seedBrokers) {
      SimpleConsumer consumer = null;

      try {
        consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
        List topics = Collections.singletonList(a_topic);
        TopicMetadataRequest req = new TopicMetadataRequest(topics);
        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
        List metaData = resp.topicsMetadata();

        for (TopicMetadata item : metaData) {
          for (PartitionMetadata part : item.partitionsMetadata()) {
            if (part.partitionId() == a_partition) {
              returnMetaData = part;
               break loop;
            }
          }
        }
      } catch (Exception e) {
        System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
      } finally {
        if (consumer != null)
          consumer.close();
      }
    }

    if (returnMetaData != null) {
      m_replicaBrokers.clear();
      for (BrokerEndPoint replica : returnMetaData.replicas()) {
        m_replicaBrokers.add(replica.host());
      }
    }
    return returnMetaData;
  }
}

Views: 551