查看: 576|回复: 4

kafka如何自定义去消费已经消费过的数据

[复制链接]
新浪微博达人勋 poiu72  未实名认证
论坛徽章:
23
SAS研习者初级
日期:2012-08-20 22:27:43HBase徽章
日期:2017-03-10 12:01:24Java徽章
日期:2017-06-09 15:42:21storm徽章
日期:2017-07-20 17:23:53python徽章
日期:2017-09-07 16:20:32Java徽章
日期:2018-02-08 16:11:57知识图谱徽章
日期:2018-06-15 13:47:35pyspark徽章
日期:2018-09-06 14:49:50计算徽章
日期:2019-05-09 14:32:23spark徽章
日期:2019-08-01 16:31:52Hive徽章
日期:2017-03-10 11:59:23python徽章
日期:2017-03-02 16:33:17
发表于 2019-6-16 22:03 | 显示全部楼层 |阅读模式

Kafka

kafka如何自定义去消费已经消费过的数据?

Conosumer.properties配置文件中有两个重要参数
auto.commit.enable:如果为true,则consumer的消费偏移offset会被记录到zookeeper。下次consumer启动时会从此位置继续消费。
auto.offset.reset  该参数只接受两个常量largestSmallest,分别表示将当前offset指到日志文件的最开始位置和最近的位置。
如果进一步想控制时间,则需要调用SimpleConsumer,自己去设置相关参数。比较重要的参数是 kafka.api.OffsetRequest.EarliestTime()kafka.api.OffsetRequest.LatestTime()分别表示从日志(数据)的开始位置读取和只读取日志。
如何使用SimpleConsumer
首先,你必须知道读哪个topic的哪个partition
然后,找到负责该partitionbroker leader,从而找到存有该partition副本的那个broker
再者,自己去写requestfetch数据
最终,还要注意需要识别和处理brokerleader的改变



回复

使用道具 举报

新浪微博达人勋 poiu72  未实名认证
论坛徽章:
23
SAS研习者初级
日期:2012-08-20 22:27:43HBase徽章
日期:2017-03-10 12:01:24Java徽章
日期:2017-06-09 15:42:21storm徽章
日期:2017-07-20 17:23:53python徽章
日期:2017-09-07 16:20:32Java徽章
日期:2018-02-08 16:11:57知识图谱徽章
日期:2018-06-15 13:47:35pyspark徽章
日期:2018-09-06 14:49:50计算徽章
日期:2019-05-09 14:32:23spark徽章
日期:2019-08-01 16:31:52Hive徽章
日期:2017-03-10 11:59:23python徽章
日期:2017-03-02 16:33:17
 楼主| 发表于 2019-6-16 22:21 | 显示全部楼层
consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据消费。当然这里会产生一个很严重的问题,如果你重启一消费者程序,那你连一条数据都抓不到,但是log文件中明明可以看到所有数据都好好的存在。换句话说,一旦你消费过这些数据,那你就无法再次用同一个groupid消费同一组数据了。 原因:消费者消费了数据并不从队列中移除,只是记录了offset偏移量。同一个consumergroup的所有consumer合起来消费一个topic,并且他们每次消费的时候都会保存一个offset参数在zookeeper的root上。如果此时某个consumer挂了或者新增一个consumer进程,将会触发kafka的负载均衡,暂时性的重启所有consumer,重新分配哪个consumer去消费哪个partition,然后再继续通过保存在zookeeper上的offset参数继续读取数据。注意ffset保存的是consumer 组消费的消息偏移。  要消费同一组数据,你可以 1)        采用不同的group。 2)        通过一些配置,就可以将线上产生的数据同步到镜像中去,然后再由特定的集群区处理大批量的数据。
回复 支持 反对

使用道具 举报

新浪微博达人勋 poiu72  未实名认证
论坛徽章:
23
SAS研习者初级
日期:2012-08-20 22:27:43HBase徽章
日期:2017-03-10 12:01:24Java徽章
日期:2017-06-09 15:42:21storm徽章
日期:2017-07-20 17:23:53python徽章
日期:2017-09-07 16:20:32Java徽章
日期:2018-02-08 16:11:57知识图谱徽章
日期:2018-06-15 13:47:35pyspark徽章
日期:2018-09-06 14:49:50计算徽章
日期:2019-05-09 14:32:23spark徽章
日期:2019-08-01 16:31:52Hive徽章
日期:2017-03-10 11:59:23python徽章
日期:2017-03-02 16:33:17
 楼主| 发表于 2019-6-29 00:01 | 显示全部楼层
kafka如何设置生存周期与清理数据:日志文件的删除策略非常简单:启动一个后台线程定期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的创建时间).清理参数在server.properties文件中。
回复 支持 反对

使用道具 举报

新浪微博达人勋 poiu72  未实名认证
论坛徽章:
23
SAS研习者初级
日期:2012-08-20 22:27:43HBase徽章
日期:2017-03-10 12:01:24Java徽章
日期:2017-06-09 15:42:21storm徽章
日期:2017-07-20 17:23:53python徽章
日期:2017-09-07 16:20:32Java徽章
日期:2018-02-08 16:11:57知识图谱徽章
日期:2018-06-15 13:47:35pyspark徽章
日期:2018-09-06 14:49:50计算徽章
日期:2019-05-09 14:32:23spark徽章
日期:2019-08-01 16:31:52Hive徽章
日期:2017-03-10 11:59:23python徽章
日期:2017-03-02 16:33:17
 楼主| 发表于 2019-6-29 00:02 | 显示全部楼层
zookeeper如何管理kafka:Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partition leader建立socket连接并发送消息. Broker端使用zookeeper用来注册broker信息,以及监测partition leader存活性. Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.  
回复 支持 反对

使用道具 举报

论坛徽章:
1
spark徽章
日期:2019-08-01 16:31:52
发表于 2019-7-6 15:02 | 显示全部楼层
在程序里面写方法,按照指定的offset去消费数据
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册 新浪微博登陆

本版积分规则

 

GMT+8, 2019-10-20 18:12 , Processed in 0.097556 second(s), 37 queries .

关闭

扫一扫加入
本版微信群