首页 运维干货由Flink与Kafka实践探究Kafka的两个问题

由Flink与Kafka实践探究Kafka的两个问题

运维派隶属马哥教育旗下专业运维社区,是国内成立最早的IT运维技术社区,欢迎关注公众号:yunweipai
领取学习更多免费Linux云计算、Python、Docker、K8s教程关注公众号:马哥linux运维

笔者在某次实践过程中,搭建了一个Flink监控程序,监控wikipedia编辑,对编辑者编辑的字节数进行实时计算,最终把数据sink到kafka的消费者中展示出来,监控程序本身比较简单,只要在程序中指定好WikipediaEditsSource源并配置好sink与kafka关联就可以,类似一个略微复杂版的wordcount,按照网络上的教程,在实践的最后,开启zookeeper服务和kafka服务,接着用

kafka-console-producer --topic wiki-result  --broker-list localhost:9092

这条命令创建一个名为wiki-resulttopic,然后运行监控程序,最后用

kafka-console-consumer --bootstrap-server --zookeeper localhost: 9092--topic wiki-result

启动消费者,就可以在终端窗口里观察到源源不断的wikipedia数据

由Flink与Kafka实践探究Kafka的两个问题插图

当笔者第二天再次跑这个监控程序时,发现上次执行的命令

kafka-console-producer --topic wiki-result  --broker-list localhost:9092

是生产者命令,然而此例中的生产者实际上是Fink监控程序,那么原作者为何使用kafka-console-producer命令去创建topic而不是用kafka-topics命令呢?

kafka-console-producer --topic wiki-result  --broker-list localhost:9092

命令是生产者指定topic,是否自动创建了topic呢?

笔者尝试把现有的topic:wiki-result删掉,然后重新创建topic,提示如下,并没有真正删除,为此笔者去查了下相关资料,将topic创建与删除的原理彻底弄懂了。

由Flink与Kafka实践探究Kafka的两个问题插图1

在 Kafka 中,Topic 是一个存储消息的逻辑概念,不同的topic在物理上来说是分开存储的,可以有多个producer向他push消息,也可以有多个consumer去pull消息,每个 Topic 可以划分多个分区,每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。每个消息在被添加到分区时,都会被分配一个连续的序列号 offset,它是消息在此分区中的唯一编号,Kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 Kafka 只保证在同一个分区内的消息是有序的。

由Flink与Kafka实践探究Kafka的两个问题插图2

通过命令

kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test-topic

创建了1个名为test-topic的topic,拥有1个分区,每个分区分配2个副本。创建逻辑如图,总的来说就是后台逻辑会监听zookeeper下对应的目录节点,一旦发起topic创建命令,该命令会创建新的数据节点从而触发后台的创建逻辑。

由Flink与Kafka实践探究Kafka的两个问题插图3

命令行部分比较直白,无非就是一些基本校验,分配副本(尽可能保证分区的副本平均分配到每个broker上),把分配方案持久化到zookeeper的/brokers/topics/节点下。

后台逻辑部分主要由controller负责,controller内部保存了很多信息,其中有一个分区状态机,用于记录topic各个分区的状态。这个状态机内部注册了一些zookeeper监听器。Controller在启动的时候会创建这些监听器。其中一个监听器(TopicChangeListener)就是用于监听zookeeper的/brokers/topics目录的子节点变化的。一旦该目录子节点数发生变化就会调用这个监听器的处理方法。TopicChangeListener监听器一方面会更新controller的缓存信息(比如更新集群当前所有的topic列表以及更新新增topic的分区副本分配方案缓存等),另一方面就是创建对应的分区及其副本对象并为每个分区确定leader副本及ISR。至此,整个topic的创建就完成了!

除了使用kafka-topics –create创建topic外,还可以使用kafka-console-producer发布消息时创建,kafka第一步先获取topic的leader信息,当发现不可用的时候,在去创建此topic。

Kafka 删除topic的命令:

kafka-topics.sh --zookeeper localhost:2181 --delete –topic test-topic

然而此命令不能真正删除topic,只是在zookeeper的/admin/delete_topics下创建一个临时节点。

Kafka controller在启动的时候会注册对于Zookeeper节点/admin/delete_topics的子节点变更监听器,并创建一个单独的线程,执行topic删除的操作,监听器捕获到删除时创建的临时节点,立刻触发删除逻辑,查询test-topic是否正在被使用,根据其状态决定是否删除。

那么什么时候线程会真正删除此topic呢?只有当在server.properties配置了delete.topic.enable=true时并重新启动Kafka,此Topic才会被真正删除。

至此Topic的创建和删除原理已经清楚了,而对于在实践过程中遇到的问题也清晰了。

本文链接:https://www.yunweipai.com/37097.html

网友评论comments

发表回复

您的电子邮箱地址不会被公开。

暂无评论

Copyright © 2012-2022 YUNWEIPAI.COM - 运维派 京ICP备16064699号-6
扫二维码
扫二维码
返回顶部