游戏攻略网
当前位置: 首页 游戏攻略

spark大数据分析源码解析(StructredStreamingKafkaMysql)

时间:2023-08-22 作者: 小编 阅读量: 1 栏目名: 游戏攻略

前言每年天猫双十一购物节,都会有一块巨大的实时作战大屏,展现当前的销售情况。这种炫酷的页面背后,其实有着非常强大的技术支撑,而这种场景其实就是实时报表分析。

前言

每年天猫双十一购物节,都会有一块巨大的实时作战大屏,展现当前的销售情况。这种炫酷的页面背后,其实有着非常强大的技术支撑,而这种场景其实就是实时报表分析。

1、业务需求概述

模拟交易订单数据,发送至分布式消息队列Kafka,实时消费交易订单数据进行分析处理,业务流程图如下所示:

实时从Kafka消费交易订单数据,按照不同维度实时统计【销售订单额】,最终报表Report结果存储MySQL数据库;

二 项目代码1.模拟交易数据

编写程序,实时产生交易订单数据,使用Json4J类库转换数据为JSON字符,发送kafka Topic中,代码如下:

// =================================== 订单实体类 =================================package cn.itcast.Spark.mock/*** 订单实体类(Case Class)* @param orderId 订单ID* @param userId 用户ID* @param orderTime 订单日期时间* @param ip 下单IP地址* @param orderMoney 订单金额* @param orderStatus 订单状态*/case class OrderRecord(orderId: String,userId: String,orderTime: String,ip: String,orderMoney: Double,orderStatus: Int)// ================================== 模拟订单数据 ==================================package cn.itcast.spark.mockimport java.util.Propertiesimport org.apache.commons.lang3.time.FastDateFormatimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import org.apache.kafka.common.serialization.StringSerializerimport org.json4s.jackson.Jsonimport scala.util.Random/*** 模拟生产订单数据,发送到Kafka Topic中* Topic中每条数据Message类型为String,以JSON格式数据发送* 数据转换:* 将Order类实例对象转换为JSON格式字符串数据(可以使用json4s类库)*/object MockOrderProducer {def main(args: Array[String]): Unit = {var producer: KafkaProducer[String, String] = nulltry {// 1. Kafka Client Producer 配置信息val props = new Properties()props.put("bootstrap.servers", "node1.itcast.cn:9092")props.put("acks", "1")props.put("retries", "3")props.put("key.serializer", classOf[StringSerializer].getName)props.put("value.serializer", classOf[StringSerializer].getName)// 2. 创建KafkaProducer对象,传入配置信息producer = new KafkaProducer[String, String](props)// 随机数实例对象val random: Random = new Random()// 订单状态:订单打开 0,订单取消 1,订单关闭 2,订单完成 3val allStatus =Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)while(true){// 每次循环 模拟产生的订单数目val batchNumber: Int = random.nextInt(1)5(1 to batchNumber).foreach{number =>val currentTime: Long = System.currentTimeMillis()val orderId: String = s"${getDate(currentTime)}d".format(number)val userId: String = s"${1random.nextInt(5)}d".format(random.nextInt(1000))val orderTime: String = getDate(currentTime, format="yyyy-MM-dd HH:mm:ss.SSS")val orderMoney: String = s"${5random.nextInt(500)}.d".format(random.nextInt(100))val orderStatus: Int = allStatus(random.nextInt(allStatus.length))// 3. 订单记录数据val orderRecord: OrderRecord = OrderRecord(orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus)// 转换为JSON格式数据val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)println(orderJson)// 4. 构建ProducerRecord对象val record = new ProducerRecord[String, String]("orderTopic", orderJson)// 5. 发送数据:def send(messages: KeyedMessage[K,V]*), 将数据发送到Topicproducer.send(record)}Thread.sleep(random.nextInt(100)500)}}catch {case e: Exception => e.printStackTrace()}finally {if(null != producer) producer.close()}}/**=================获取当前时间=================*/def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = {val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)val formatDate: String = fastFormat.format(time) // 格式化日期formatDate}/**================= 获取随机IP地址 =================*/def getRandomIp: String = {// ip范围val range: Array[(Int, Int)] = Array((607649792,608174079), //36.56.0.0-36.63.255.255(1038614528,1039007743), //61.232.0.0-61.237.255.255(1783627776,1784676351), //106.80.0.0-106.95.255.255(2035023872,2035154943), //121.76.0.0-121.77.255.255(2078801920,2079064063), //123.232.0.0-123.235.255.255(-1950089216,-1948778497),//139.196.0.0-139.215.255.255(-1425539072,-1425014785),//171.8.0.0-171.15.255.255(-1236271104,-1235419137),//182.80.0.0-182.92.255.255(-770113536,-768606209),//210.25.0.0-210.47.255.255(-569376768,-564133889) //222.16.0.0-222.95.255.255)// 随机数:IP地址范围下标val random = new Random()val index = random.nextInt(10)val ipNumber: Int = range(index)._1random.nextInt(range(index)._2 - range(index)._1)// 转换Int类型IP地址为IPv4格式number2IpString(ipNumber)}/**=================将Int类型IPv4地址转换为字符串类型=================*/def number2IpString(ip: Int): String = {val buffer: Array[Int] = new Array[Int](4)buffer(0) = (ip >> 24) & 0xffbuffer(1) = (ip >> 16) & 0xffbuffer(2) = (ip >> 8) & 0xffbuffer(3) = ip & 0xff// 返回IPv4地址buffer.mkString(".")}}

2.创建Maven模块

创建Maven模块,加入相关依赖,具体内如如下:

<repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>jboss</id><url>http://repository.jboss.com/nexus/content/groups/public</url></repository></repositories><properties><scala.version>2.11.12</scala.version><scala.binary.version>2.11</scala.binary.version><spark.version>2.4.5</spark.version><Hadoop.version>2.6.0-cdh5.16.2</hadoop.version><kafka.version>2.0.0</kafka.version><mysql.version>8.0.19</mysql.version></properties><dependencies><!-- 依赖Scala语言 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- Spark Core 依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- Spark SQL 依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- Structured StreamingKafka 依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- Hadoop Client 依赖 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><!-- Kafka Client 依赖 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency><!-- 根据ip转换为省市区 --><dependency><groupId>org.lionsoul</groupId><artifactId>ip2region</artifactId><version>1.7.2</version></dependency><!-- MySQL Client 依赖 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><!-- JSON解析库:fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency></dependencies><build><outputDirectory>target/classes</outputDirectory><testOutputDirectory>target/test-classes</testOutputDirectory><resources><resource><directory>${project.basedir}/src/main/resources</directory></resource></resources><!-- Maven 编译的插件 --><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>

项目结构如下:3.核心代码

RealTimeOrderReport.javapackage cn.itcast.spark.reportimport java.util.concurrent.TimeUnitimport org.apache.spark.sql._import org.apache.spark.sql.functions._import org.apache.spark.sql.streaming.{OutputMode, Trigger}import org.apache.spark.sql.expressions.{UserDefinedAggregateFunction, UserDefinedFunction}import org.apache.spark.sql.types.{DataType, DataTypes}import org.lionsoul.ip2region.{DataBlock, DbConfig, DbSearcher}def printToConsole(dataFrame: DataFrame) = {dataFrame.writeStream.format("console").outputMode(OutputMode.Update()).option("numRows","50").option("truncate","false").start()}def main(args: Array[String]): Unit = {//1.获取spark实例对象val spark: SparkSession = SparkSession.builder().appName("isDemo").master("local[3]").config("spark.sql.shuffle.partitions", "3").getOrCreate()import spark.implicits._val dataFrame: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1.itcast.cn:9092").option("subscribe", "orderTopic").load().selectExpr("CAST (value AS STRING)")// printToConsole(dataFrame)val ip_to_region: UserDefinedFunction = udf((ip: String) => {// 1. 创建DbSearch对象,指定数据字典文件位置val dbSearcher = new DbSearcher(new DbConfig(), "src/main/dataset/ip2region.db")// 2. 传递IP地址,解析获取数据val dataBlock: DataBlock = dbSearcher.btreeSearch(ip)// 3. 获取解析省份和城市val region: String = dataBlock.getRegion//println(region) // 中国|0|海南省|海口市|教育网val Array(_, _, pronvice, city, _) = region.split("\\|")(pronvice, city)})val frame: DataFrame = dataFrame.select(get_json_object($"value", "$.ip").as("ip"),get_json_object($"value", "$.orderMoney").cast(DataTypes.createDecimalType(10, 2)).as("money"),get_json_object($"value", "$.orderStatus").as("status")).filter($"status" === 0).withColumn("region", ip_to_region($"ip")).select($"region._1".as("province"),$"region._2".as("city"),$"money")// printToConsole(frame)// /**// * 订单实体类(Case Class)// * @param orderId 订单ID// * @param userId 用户ID// * @param orderTime 订单日期时间// * @param ip 下单IP地址// * @param orderMoney 订单金额// * @param orderStatus 订单状态// *///// printToConsole(dframe)//SELECT "国家" as type, SUM(money) as totalMoney FROM tmp_view//SELECT province as type, SUM(money) as totalMoney FROM tmp_view GROUP BY province//SELECT city as type, SUM(money) as totalMoney FROM (SELECT * FROM tmp_view WHERE city in ("北京市", "上海市", "深圳市", "广州市", "杭州市", "成都市", "南京市", "武汉市", "西安市"))t GROUP BY t.cityframe.createOrReplaceTempView("tmp_view")val f: DataFrame = spark.sql("""|SELECT "国家" as type, SUM(money) as totalMoney FROM tmp_view""".stripMargin)val f2: DataFrame = spark.sql("""|SELECT province as type, SUM(money) as totalMoney FROM tmp_view GROUP BY province""".stripMargin)val f3: DataFrame = spark.sql("""|SELECT city as type, SUM(money) as totalMoney FROM (SELECT * FROM tmp_view WHERE city in ("北京市", "上海市", "深圳市", "广州市", "杭州市", "成都市", "南京市", "武汉市", "西安市"))t GROUP BY t.city""".stripMargin)// printToConsole(f3)saveToMySQL(f,"total")saveToMySQL(f2,"totalprovince")saveToMySQL(f3,"totalcity")spark.streams.awaitAnyTermination()}def saveToMySQL(streamDF:DataFrame,reportType:String)={streamDF.writeStream.outputMode(OutputMode.Complete()).queryName(s"${reportType}").foreachBatch((batchDF:DataFrame,batchId:Long)=>{batchDF.coalesce(1).write.mode(SaveMode.Overwrite).format("jdbc").option("url","jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true").option("driver","com.mysql.cj.jdbc.Driver").option("user","root").option("password","123456").option("dbtable",s"db_spark.tb_order${reportType}").save()}).option("checkpointLocation", s"datas/spark/structured-ckpt-${System.currentTimeMillis()}").start()}}OrderRecord.scalapackage cn.itcast.spark.mock/*** 订单实体类(Case Class)* @param orderId 订单ID* @param userId 用户ID* @param orderTime 订单日期时间* @param ip 下单IP地址* @param orderMoney 订单金额* @param orderStatus 订单状态*/case class OrderRecord(orderId: String,userId: String,orderTime: String,ip: String,orderMoney: Double,orderStatus: Int)

总结

总结: 实时报表分析是近年来很多公司采用的报表统计方案之一,其中最主要的应用就是实时大屏展示。利用流式计算实时得出结果直接被推送到前端应用,实时显示出重要指标的变换情况。

最典型的案例便是淘宝双十一活动,每年双十一购物节,除疯狂购物外,最引人注目的就是双十一大屏不停跳跃的成交总额。在整个计算链路中包括从天猫交易下单购买到数据采集,数据计算,数据校验,最终落到双十一大屏上展示的全链路时间压缩在5秒以内,顶峰计算性能高达数三十万笔订单/秒,通过多条链路流计算备份确保万无一失。

这次的双十一实时报表分析实战主要用SQL编写,尚未用DSL编写,这是有待完善的地方.此次的天猫双十一实时报表分享就到这里,喜欢的小伙伴欢迎一键三连!!

    推荐阅读
  • steam棋牌游戏推荐(幸运之夜新版本亮相TGC)

    steam棋牌游戏推荐12月1日,2017腾讯游戏嘉年华正式在成都开幕,腾讯的VR社交游戏《幸运之夜》在现场发布了最新版本。VR《幸运之夜》在TGC2017上惊艳亮相《幸运之夜》新版本发布邀请好友一起游戏今年7月底,《幸运之夜》正式在Steam发布,并推出了首款游戏作品“德州扑克”。今年的TGC2017现场,《幸运之夜》全新版本便带来了对互动性方面的提升。

  • 儿童睡前故事卖火柴的小女孩大全(卖火柴的小女孩)

    在长发公主的故事里,兔子小姐变成了手持宝剑的骑士,穿过了山川和河流,战胜了地狱恶犬,最终救出了长着一头金黄色长发的熊猫先生。随着一阵空间的波动,熊猫先生和兔子小姐来到了冰天雪地的圣诞节。小女孩被这突如其来的变化惊呆了。熊猫先生没有回答,轻轻摸了摸小女孩的头。小女孩点燃了第二根火柴。熊猫先生和兔子小姐则来到壁炉前,商量起小女孩最后一个愿望。熊猫先生蹲下来,握起小女孩的手。

  • 怎样做ppr管快一点(PPR管安装方法及技巧)

    怎样做ppr管快一点PPR管安装方法及技巧首先准备好需要的材料:热熔机,小剪刀,ppr管,管件,手巾。一定要根据自己热熔ppr管的口径,准备相应的热熔头。清洁:清洁管材与管件的焊接段部位,建议用95%浓度酒精擦净。在熔接时间内迅速的将管材无旋转的垂直插入管件中,并维持5秒以上,然后按相应冷却时间冷却。热熔后做到横平竖直,美观大方。

  • 大托特包搭配技巧(大托特包搭配技巧简述)

    西装外套+托特包复古时尚的格子,由黑白交错的条纹形成,文艺而又端庄搭配撞色托特包,优雅而不失俏皮,让气场变得灵动起来内搭白色连帽卫衣,减龄又可爱,接下来我们就来聊聊关于大托特包搭配技巧?大托特包搭配技巧西装外套+托特包复古时尚的格子,由黑白交错的条纹形成,文艺而又端庄。衬衫+托特包白色的衬衫休闲慵懒,给人一种空灵的感觉以及干净纯粹的气质。

  • 刘涛电视剧口碑(电视剧景气指数第一)

    还记得3月份刘涛在和周渝民主演的《大宋宫词》中扮嫩出演少妇被很多观众吐槽。万万没想到时隔数月,刘涛带着她的新剧《星辰大海》杀回来了。目前主要的剧情线在刘涛饰演的女主简爱身上。因为小时候意外发现母亲出轨的事,得知真相的父亲激愤之下杀死母亲并自杀,简爱因此成为了一个孤女。逃出傻子家的简爱在与姑姑的争执中误伤姑姑,从此开启逃命生涯。简爱从面馆辞职走投无路,误打误撞进入大公司之后面临着同事的故意刁难。

  • 外墙装修材料有哪些 外墙装修材料有哪些类型

    外墙涂料具有装饰性良好、耐污染耐老化以及施工维修容易和价格合理的特点。一般来说釉面外墙砖有亚光面与无光面两大类。它的装饰的效果也不错,有柚木色、深灰色等等可供选择。由于它的表面的肌理很清晰,所以色泽漂亮且装饰性极强。本站,中国知名大型装修平台,装修领导品牌。

  • 世界上有哪些花(世界上有哪些花 名字)

    瓜叶菊、香豌豆、夏兰、石竹、石蒜、荷花、翠菊、睡莲、福禄考、晚香玉、万寿菊、千日红、建兰、铃兰、报岁兰、香堇、大岩桐、水仙、小草兰、瓜叶菊、蒲包花、免子花、入腊红、三色堇、百日草、鸡冠花、一串红。孔雀草、大波斯菊、金盏菊、非洲凤仙花、菊花、非洲菊、观赏凤梨类、射干、非洲紫罗兰、天堂鸟、炮竹红、菊花、康乃馨、红掌、满天星、星辰花、三角梅、虞美人。

  • 长歌行李长歌母亲是谁杀的(长歌行李长歌的简介)

    下面更多详细答案一起来看看吧!长歌行李长歌母亲是谁杀的《长歌行》李长歌母亲是自杀的。李长歌,太子李建成之女,生母则是回纥王族。父母手足均死于玄武门之变,满怀愤恨的长歌凭高超武艺逃出皇宫,并在追捕过程中制造“坠崖假死”而逃生,其后女扮男装隐瞒身份流落民间,一心只想为父母复仇,在家和国的利害冲突中,最后放弃复仇,和阿诗勒隼一起成为了民族和解的使者。

  • 什么时候喝蛋白粉增肌效果最好(什么时候喝蛋白粉增肌效果最好)

    从长远来看,这种方法被证明可以促使肌肉明显增长。如果摄入量超过一定的阈值,蛋白质的合成就会受限。如果是以乳清饮料的形式摄入乳清蛋白,运动者可以在运动结束后立即饮用。按每公斤体重1克的标准,在健身前后立即摄入以及在运动后1小时内摄入可快速吸收的碳水化合物,可以明显抑制肌肉分解,并大大促进肌肉快速和明显的增长。在这种情况下,大量分泌的胰岛素促进了氨基酸向工作中的肌肉运输,为蛋白质合成奠定基础。

  • 窦骁周冬雨山楂树之恋结局(周冬雨18岁第一次出演)

    周冬雨18岁第一次出演要说最近最火的电影,非《少年的你》莫属,上映14天,已经收获了12.45亿的票房成绩,成为现阶段最强的票房黑马而作为该片主演的周冬雨和易烊千玺,也凭借在该片中的精彩演出,演技得到大众的进一步认可作为“。