企业级消息队列(Kafka)

  • Kafka是什么? 消息队列
  • 为什么要有消息队列? 解耦、异构、并行
  • Kafka数据生成方式
    • producer-->kafka---->保存到本地
    • consumer---主动拉取数据
  • Kafka核心概念
    • producer(生产者)
      • 消息不丢失
      • 数据分发策略
    • kafka brokers
      • topic(主题):一类消息,比如用户信息、商品信息、订单信息
      • partition
        • 为什么要有partition? 数据太大,单个节点无法存储。
        • partition的物理形态? 文件夹
        • partition的数量设计?
          • 如果集群很小(10),将partition设置为broker的数量
          • 如果集群大于10台,就要根据数据量来设计
      • replication
        • 为什么要有replication? 保证数据安全,数据容错的考虑。
        • 设置几个副本?
          • 数据的越重要,副本数可以设的越高。但是,数据冗余高,ack时间越长,效率越低。
          • 平常设置2个。
      • segment(段)
        • 为什么要分段? 如果文件巨大,删除麻烦、查找麻烦。
          • 分段大小,1G。这个可以设置。
          • 删除数据的过期时间,168小时,等于7天。注意,在规划Kafka集群的是,要考虑数据存储几天的。Kafka集群的数量,建议3-5台。 24T * 5 = 120T
          • segment物理形态:有个log文件和index索引文件。
            • log 存放的原始数据
            • index 存放的是offset和物理地址值
            • 数据查找中会有二分查找法(掌握)
      • pagecache
        • 现代操作系统提供缓存技术,Kafka将当前生产的数据保存在缓存中。
        • 由于生产和消费时间差很小,所以消费者消费数据的时候,基本上都是从内存中获取数据。
      • sendfile
        • 作为消费者,想消费历史的数据。senfile技术不经过应用,在操作系统层面,读取完数据之后,直接输出到网卡。
      • partition isr
        • 如果partition有多个副本,需要选择一个leader出来,负责数据的读写读写读写操作。
        • 这个leader有可能挂掉,因为压力大。
        • 如果使用投票机制,会有相对来说比较大时间消费,时刻准备好备胎。
        • 满足什么样条件才能isr成员?同步leader的数据,在某个时间阈值和数量阈值内。不满足条件就踢出ISR。
      • partition leader
        • leader是针对partition的描述。
        • 负责数据的读写。
    • consumer
      • consumerGroup,消费数据都是以消费组的形式 出现的。
      • 消费组中的成员,消费数据是互不干扰的。当有一个消费者挂了之后,会在确定消费者无法重新消费后,触发负载均衡。
      • 两个不同的消费组,消费同一个topic的数据,都是完整。注意:在实际开发过程,要将自己的消费组设计成唯一的。
      • consumer消费offset管理。
        • 0.8版本,offset是有zookeeper进行管理的。
        • 0.8+,可以选择使用kafka的consumer_offset的topic进行管理。
  • Kafka常见问题?
    • Kafka为什么那么快? pageche、sendfile
    • Kafka消费不丢失机制? producer、broker、consumer。
    • Kafka消费数据全局有序? 单个partition是有序,全局有序违背设计的初衷。

流式计算框架(Storm)

  • 流式计算框架的组成:一般flume+kafka+storm+redis,每个组件都可以被替换掉。
  • Storm是什么? 流式计算框架,一旦启动,永不停止。
  • Storm架构是什么?
    • client:用来创建一个stormtopology,然后将stormtopology序列化之后,通过rpc的提交到nimbus。
    • nimbus:发布rpc的服务,接受client的任务提交,对任务进行校验。将任务放入任务队列中(阻塞队列)。后台线程读取队列,获得任务信息,进行任务分配。
      • 获取当前集群中,空闲的worker资源。
      • 获取当前任务需要多少个Task。Task数量就是所有component(组件)的并行数量加上每个worker会启动的一个ackerBolt之和。
      • 将任务信息保存到zookeeper。
    • zookeeper:zookeeper保存任务信息及节点各种其他信息。
    • supervisor:通过watch机制,得到任务信息,然后启动属于自己的worker。
    • worker:被supervisor启动,负责具体的任务执行。
      • Task:本质上是个线程,是个executor。分为三种类型 SpoutTask、BoltTask、AckerTask。
  • Storm的编程模型?
    • Spout extends baseRichSpout
      • open :初始化方法
      • nextTuple :有个while一直调用该方法,调用一次发送一次数据。
      • field:声明输出的字段名称和数量
    • bolt1 extends baseRichBolt (手动ack)
      • prepare:初始化方法
      • execute: 执行方法
      • field:声明输出的字段名称和数量
    • bolt2 extends baseBasicBolt(自动ack)
      • execute: 执行方法
      • field:声明输出的字段名称和数量
    • 驱动类:topologybuilder
    • 运行模式:本地模式、集群模式
  • Storm组件的并行度怎么设置?
    • Spout是根据上游kafka的topic的分区数量设置。
    • Bolt1是根据Spout发送的数据/bolt1处理每条数据量(单位时间 1S)
    • Bolt2是根据Spout发送的数据/bolt1处理每条数据量(单位时间 1S)
  • Spout的worker的数量怎么设置?
    • 根据所有组件的并行度之和,进行设置。
    • 可以一个worker有两个Spout或者多个Spout。
    • 如果Spout下游不同层级的所有的bolt的数量很多情况下,运算压力,可以考虑worker和spout数量保持一致。
      • 压力更大,只能修改partition的数量。
      • 如果修改不了partition数量,只能曾加worker数,任由数据充斥在网络中。
  • Spout上下游衔接策略(StreamGrouping)
    • localorshuffle 分组策略是任何时候的第一选择。
    • fieldGroup 字段分组。

Storm的原理

  • 任务提交流程
    • 用来创建一个stormtopology,然后将stormtopology序列化之后,通过rpc的提交到nimbus。
    • 扩展:RPC框架,动态+反射技术+网络通信技术。
  • 集群启动流程
    • Java系统流程: Java -jar、Java -server、Java -client
    • 手动启动:nimbus、supervisor
    • 自动启动:supervisor根据任务信息启动worker。
  • 任务执行流程
    • 与nimbus、supervisor没有半毛钱关系,都在worker。
    • SpoutTask.open() 一般用来打开外部的数据源
    • while 方式调用 nextTuple方法。发送数据,需要考虑数据的分组策略。
      • 发送数据都是发送Tuple,会携带当前Tuple要发送给哪个taskid。
      • 然后根据task分配信息,得到taskid所在的worker。
      • 通过网络请求将tuple发送给远端的worker。
      • 远端的worker有个接受的线程,根据taskid找到对应的Bolt的输入队列(无锁队列,每秒处理600万订单),将tuple放到bolt的输入中。
      • 每个Task都是一个线程,后台不停的消费输入队列的内容。消费到消息之后,会调用bolt的execute方法,将数据传入给bolt。
    • Bolt的execute方法接收到了Tuple,经过一顿处理。然后向下游发送数据Tuple。
      • 发送数据时,会根据下游的分组策略。比如:localorshuffle。
      • 如果是localorshuffle方式,直接找到当前worker中的对应Task,进行分发。将数据放入响应bolt的输入队列。
      • 每个Task都是一个线程,后台不停的消费输入队列的内容。消费到消息之后,会调用bolt的execute方法,将数据传入给bolt。
    • 如此循环。
  • 消息不丢失机制
    • 如何开启消息不丢失机制?
      • spout端发送数据的时候,要加上messageid.
      • spout要重写ack和fail方法。
      • 在topologybuiler的config文件中设置setNumAckers的数量大于1.默认是1
      • 在下游个每个层级bolt上,需要增加锚点。
    • 现象是什么?
      • 当消息处理成功,会调用ack方法。
      • 当消息处理失败
        • 超时处理,默认30S,会调用faile方法,并传入messageid。
        • 真的异常了,消息重发。 消息重发,需要手动设置。
          • 消息重发,最好是在spout发送tuple的时候,将tuple本身当做messageid传入,失败后,直接发送messageid
    • 实现机制?异或机制。
      • 相同为0,不同为1。
      • 需要上游发送时候的时候,发一个状态。并且需要下游处理完数据之后,发送一个状态。这两个状态值是一样的。
      • 每个层级都会产生新的锚点id。64位长整型。

标签: none

相关文章推荐

添加新评论,含*的栏目为必填