实时计算-智慧大屏分析
  xP7vvuQeeYuv 2023年11月02日 93 0


需求分析

  • 按省份|用户性别|用户年龄段,统计当日新增付费用户首单平均消费及人数占比

技术栈

  • 数据库监控与采集 maxwell
  • 实时数仓存储 kafka
  • 实时计算 spark stream
  • 中间层OLAP hbase
  • 可视化层OLAP elastic search
  • topic offset手动提交 redis
  • BI可视化 echart、Superset、Kibana

业务流程

实时计算-智慧大屏分析_sql

Maxwell 监控抓取 MySQL 数据

  • bootstrap采集维度数据
bin/maxwell-bootstrap --user maxwell --password *** --host chandao --database gmall --table base_province --client_id maxwell_1
  • 实时监控
${MAXWELL_HOME}/bin/maxwell --config ${MAXWELL_HOME}/config.properties >/dev/null 2>&1 &
  • maxwell原理
把自己伪装成 slave,假装从 master 复制数据,监控mysql binlog
mysql binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW,保证数据一致性!
  • mysql binlog statement
语句级,binlog 会记录每次一执行写操作的语句。
相对 row 模式节省空间,但是可能产生不一致性,比如
update tt set create_date=now()
如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点: 节省空间
缺点: 有可能造成数据不一致
  • maxwell并行度
默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱binlog 的顺序。如果要提高并行度,首先设置 kafka 的分区数>1,然后设置 producer_partition_by 属性可选值 producer_partition_by=database|table|primary_key|random| column

ODS层:spark stream数据分流

  • maxwell数据格式
{"database":"hzmall","table":"z_user_info","type":"insert","ts":1589385314,"xid":82982,"xoffset":0,"data":{"id":30,"user_name":"zhang3","tel":"138*********"}}
  • spark stream分流
//分流
jsonObjDStream.foreachRDD {
  rdd => {
    rdd.foreach {
      jsonObj => {
        //获取操作类型
        val opType: String = jsonObj.getString("type")
        //获取操作的数据
        val dataJsonObj: JSONObject = jsonObj.getJSONObject("data")
        //获取表名
        val tableName: String = jsonObj.getString("table")

        if (dataJsonObj != null && !dataJsonObj.isEmpty)
          if (("order_info".equals(tableName) && "insert".equals(opType)) || (tableName.equals("order_detail") && "insert".equals(opType))
              || tableName.equals("base_province") || tableName.equals("user_info") || tableName.equals("sku_info")
              || tableName.equals("base_trademark") || tableName.equals("base_category3") || tableName.equals("spu_info")) {
            //拼接要发送到的主题
            var sendTopic = "ods_" + tableName
            MyKafkaSink.send(sendTopic, dataJsonObj.toString)
          }
      }
    }
    //手动提交偏移量
    OffsetManagerUtil.saveOffset(topic, groupId, offsetRanges)

DWD首单维护关联分析

  • phoenix 在 hbase 中建表
  • 将用户是否消费的状态保存到 Hbase 中
create table user_status( user_id varchar primary key ,state.if_consumed varchar ) SALT_BUCKETS = 3
  • 订单迭代计算关联K->V user_status
// 以分区为单位,将整个分区的数据拼接一条SQL进行一次查询
val orderInfoWithFirstFlagDStream: DStream[OrderInfo] = orderInfoDStream.mapPartitions {
  orderInfoItr => {
    //当前一个分区中所有的订单的集合
    val orderInfoList: List[OrderInfo] = orderInfoItr.toList

    //获取当前分区中获取下订单的用户
    val userIdList: List[Long] = orderInfoList.map(_.user_id)
    //根据用户集合到Phoenix中查询,看看哪些用户下过单   坑1  字符串拼接
    var sql: String = s"select user_id,if_consumed from user_status where user_id in('${userIdList.mkString("','")}')"
    //执行sql从Phoenix获取数据
    val userStatusList: List[JSONObject] = PhoenixUtil.queryList(sql)
    //获取消费过的用户id  
    val consumedUserIdList: List[String] = userStatusList.map(_.getString("USER_ID"))

    for (orderInfo <- orderInfoList) {
      // 类型转换
      if (consumedUserIdList.contains(orderInfo.user_id.toString)) {
        orderInfo.if_first_order = "0"
      } else {
        orderInfo.if_first_order = "1"
      }
    }
    orderInfoList.toIterator
  }
}
  • 订单关联地域维度
  • 维度数据和状态数据
  • 相同点:长期保存维护、可修改、使用 k-v 方式查询
  • 不同点:数据变更的时机不同、状态数据往往因为事实数据的新增变化而变更、维度数据只会受到业务数据库中的变化而变更
  • 根据共同点,维度数据也是非常适合使用 hbase 存储的,稍有不同的是维度数据必须启动单独的实时计算来监控维度表变化来更新实时数据。
create table province_info (id varchar primary key,info.name varchar,info.area_code varchar,info.iso_code varchar)SALT_BUCKETS = 3
  • 订单与用户维度关联
create table user_info (id varchar primary key ,user_level varchar, birthday varchar,gender varchar, age_group varchar , gender_name varchar)SALT_BUCKETS = 3
/以分区为单位对数据进行处理,每个分区拼接一个sql 到phoenix上查询用户数据
val orderInfoWithUserInfoDStream: DStream[OrderInfo] = orderInfoWithProvinceDStream.mapPartitions {
  orderInfoItr => {
    //转换为list集合
    val orderInfoList: List[OrderInfo] = orderInfoItr.toList
    //获取所有的用户id
    val userIdList: List[Long] = orderInfoList.map(_.user_id)

    //根据id拼接sql语句,到phoenix查询用户
    var sql: String = s"select id,user_level,birthday,gender,age_group,gender_name from user_info where id in ('${userIdList.mkString("','")}')"
    //当前分区中所有的下单用户
    val userList: List[JSONObject] = PhoenixUtil.queryList(sql)
    val userMap: Map[String, UserInfo] = userList.map( userJsonObj => {
        val userInfo: UserInfo = JSON.toJavaObject(userJsonObj, classOf[UserInfo])
        (userInfo.id, userInfo)
      }).toMap

    for (orderInfo <- orderInfoList) {
      val userInfoObj: UserInfo = userMap.getOrElse(orderInfo.user_id.toString, null)
      if (userInfoObj != null) {
        orderInfo.user_age_group = userInfoObj.age_group
        orderInfo.user_gender = userInfoObj.gender_name
      }
    }
    orderInfoList.toIterator
  }
}

订单首单OLAP

  • 订单宽表存储es
// 保存订单数据到ES中
rdd.foreachPartition(
  orderInfoItr => {
    val orderInfoList: List[(String, OrderInfo)] = orderInfoItr.toList.map(orderInfo => (orderInfo.id.toString, orderInfo))
    val dateStr: String = new SimpleDateFormat("yyyyMMdd").format(new Date())
    MyESUtil.bulkInsert(orderInfoList, "order_info_" + dateStr)

    //3.4写回到Kafka
    for ((orderInfoId, orderInfo) <- orderInfoList) {
      MyKafkaSink.send("dwd_order_info", JSON.toJSONString(orderInfo, new SerializeConfig(true)))
    }
  })
  • 大盘可视化



【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
xP7vvuQeeYuv
最新推荐 更多

2024-05-31