絮叨两句:
博主是一名数据分析实习生,利用博客记录自己所学的知识,也希望能帮助到正在学习的同学们
人的一生中会遇到各种各样的困难和折磨,逃避是解决不了问题的,唯有以乐观的精神去迎接生活的挑战
少年易老学难成,一寸光阴不可轻。
最喜欢的一句话:今日事,今日毕
性别标签开发
终于到了标签开发的环节,九九八十一难,最后的终点也是起点,大家继续加油
开发准备工作
↓↓↓↓↓↓↓↓↓↓点击下方链接,就可以获取POM文件和前期所要准备的工作!必点↓↓↓↓↓↓↓↓↓↓
企业级360°全方位用户画像:标签开发[匹配标签](前期准备工作)①
样例类:HBaseMeta 与 TagRule 提前定义好样例类,为了后面方便使用数据
HBase数据源source,直接读取Hbase数据会很慢,使用提前准备好的工具类读取Hbase数据
object Gender_Tag {
def main(args: Array[String]): Unit = {
//1.创建sparksession 对象 用于读取Mysql和Hbase数据库
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Gender_Tag").getOrCreate()
val sparkContext: SparkContext = spark.sparkContext
sparkContext.setLogLevel("WARN")
//-------------------------------------------------------------------------
//2.连接Mysql 用于读取四级标签与五级标签
val url: String ="jdbc:mysql://bd001:3306/tags_new?characterEncoding=UTF-8"
val table: String ="tbl_basic_tag"//要读取那张表
val properties: Properties = new Properties()
properties.put("user","root")//mysql数据库用户名
properties.put("password","123456")//mysql数据库密码
//--------------------------------------------------------------------------
//2.1读取MySQL数据 测试读取myslq数据是否成功
val mysqlConnect: DataFrame = spark.read.jdbc(url, table, properties)
//mysqlConnect.show()
//--------------------------------------------------------------------------------
//引入隐式转换
import spark.implicits._
//引入SparkSql内置函数
import org.apache.spark.sql.functions._
//引入java和scala 相互转换
import scala.collection.JavaConverters._
//3.读取四级标签 开始读取四级标签
val four_Tag: Dataset[Row] = mysqlConnect.select('id, 'rule).where("name='性别'")
//four_Tag.show(false)
//3.1 获取四级标签Id 五级标签的Pid是四级标签的Id
val four_Id: Int = four_Tag.map(row => {
val id: Int = row.getAs("id").toString.toInt
id
}).collectAsList().get(0)
//println("四级标签Id:",four_Id)
//3.2 获取四级标签rule 用于进行处理之后读取Hbase数据
val four_Map_Tag = four_Tag.map(row => {
row.getAs("rule").toString
.split("##")
.map(kv => {
val kvalue: Array[String] = kv.split("=")
(kvalue(0), kvalue(1))
})
}).collectAsList().get(0).toMap
//println("四级标签rule:",four_Map_Tag)
//3.2.1 将map封装成样例类 目的是为了更方便的获取值
val hBaseMetaCaseClass = mapToHbaseCaseClass(four_Map_Tag)
// println("将Map转换成样例类之后",hBaseMetaCaseClass)
//4.读取五级标签
val five_Tag: Dataset[Row] = mysqlConnect.select('id, 'rule).where("pid=" + four_Id)
// five_Tag.show()
//5.读取Hbase数据
val tbl_users: DataFrame = spark.read.format("cn.itcast.userprofile.up24.tools.HBaseDataSource")
.option(HBaseMeta.ZKHOSTS, hBaseMetaCaseClass.zkHosts)
.option(HBaseMeta.ZKPORT, hBaseMetaCaseClass.zkPort)
.option(HBaseMeta.FAMILY, hBaseMetaCaseClass.family)
.option(HBaseMeta.HBASETABLE, hBaseMetaCaseClass.hbaseTable)
.option(HBaseMeta.SELECTFIELDS, hBaseMetaCaseClass.selectFields)
.load()
// tbl_users.show()
//-----------------------上面代码已经将四级标签与五级标签的数据拿了出来,通过四级标签的结果读取Hbase的数据----------------------
//6.将五级标签与tbl_users[用户表]进行匹配
val five_List_Tag: List[TagRule] = five_Tag.map(row => {
val id = row.getAs("id").toString.toInt
val rule = row.getAs("rule").toString
TagRule(rule = rule, id = id)
}).collectAsList().asScala.toList
// println("five_List_Tag:",five_List_Tag)
val userDefinedFunction = udf((parameter: String) => {
var id=0
for (elem <- five_List_Tag) {
if (elem.rule == parameter) {
id=elem.id
}
}
id
})
//得到最终的结果标签
val new_Tag: DataFrame = tbl_users.select('id.as("userId"), userDefinedFunction('gender).as("tagsId"))
new_Tag.show()
//7.读取Hbase的历史数据,将新数据与老数据合并
// 考虑,hbase中最终标签表里已经有数据了,直接将新的数据写入,会发生什么问题? 答:会覆盖
//考虑,现在已经通过追加的方式解决了覆盖的问题,如相同的程序多跑几次会发生什么问题? 答会重复
//重复问题的解决办法,在追加数据之后,进程一次去重操作就可以了
val old_Tag: DataFrame = spark.read.format("cn.itcast.userprofile.up24.tools.HBaseDataSource")
.option(HBaseMeta.ZKHOSTS, hBaseMetaCaseClass.zkHosts)
.option(HBaseMeta.ZKPORT, hBaseMetaCaseClass.zkPort)
.option(HBaseMeta.FAMILY, hBaseMetaCaseClass.family)
.option(HBaseMeta.HBASETABLE, "test")
.option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
.load()
// old_Tag.show()
//7.1开始合并数据
if (old_Tag.count()==0){
//证明还没有数据直接将数据写入
new_Tag.write.format("cn.itcast.userprofile.up24.tools.HBaseDataSource")
.option(HBaseMeta.ZKHOSTS, hBaseMetaCaseClass.zkHosts)
.option(HBaseMeta.ZKPORT, hBaseMetaCaseClass.zkPort)
.option(HBaseMeta.FAMILY, hBaseMetaCaseClass.family)
.option(HBaseMeta.HBASETABLE, "test")
.option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
.save()
}else{
val append_Tag: UserDefinedFunction = udf((old_T: String, new_T: String) => {
println(old_T,new_T)
if (old_T=="") {
new_T
} else if (new_T=="") {
old_T
} else if (old_T=="" && new_T=="") {
""
}else{
val all_T = old_T + "," + new_T
//进行去重
val all_TAG = all_T.split(",").distinct.mkString(",")
all_TAG
}
})
val old_Append_new: DataFrame = old_Tag.join(new_Tag,old_Tag.col("userId")===new_Tag.col("userId"))
.select(
when(old_Tag.col("userId").isNotNull, old_Tag.col("userId"))
.when(new_Tag.col("userId").isNotNull, new_Tag.col("userId")).as("userId"),
append_Tag(old_Tag.col("tagsId"), new_Tag.col("tagsId")).as("tagsId"))
//8.将最终结果写入到Hbase中 old_Append_new.write.format("cn.itcast.userprofile.up24.tools.HBaseDataSource")
.option(HBaseMeta.ZKHOSTS, hBaseMetaCaseClass.zkHosts)
.option(HBaseMeta.ZKPORT, hBaseMetaCaseClass.zkPort)
.option(HBaseMeta.FAMILY, hBaseMetaCaseClass.family)
.option(HBaseMeta.HBASETABLE, "test")
.option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
.save()
}
}
def mapToHbaseCaseClass(four_Map_Tag: Map[String, String]) = {
HBaseMeta(
inType = four_Map_Tag.getOrElse(HBaseMeta.INTYPE,""),
zkHosts = four_Map_Tag.getOrElse(HBaseMeta.ZKHOSTS,""),
zkPort = four_Map_Tag.getOrElse(HBaseMeta.ZKPORT,""),
hbaseTable = four_Map_Tag.getOrElse(HBaseMeta.HBASETABLE,""),
family = four_Map_Tag.getOrElse(HBaseMeta.FAMILY,""),
selectFields = four_Map_Tag.getOrElse(HBaseMeta.SELECTFIELDS,""),
rowKey = four_Map_Tag.getOrElse(HBaseMeta.ROWKEY,""))
}
}
总结
开发流程:
- 创建sparksession 对象 用于读取Mysql和Hbase数据库
- 连接Mysql 用于读取四级标签与五级标签
- 读取四级标签
- 读取五级标签
- 读取Hbase数据 根据第3步处理好的数据用来读取Hbase数据
- 将五级标签与tbl_users[用户表]进行匹配
- 读取Hbase的历史数据,将新数据与老数据合并
- 将最终数据写入到Hbase
本篇博客主要为大家提供了匹配型性别标签如何进行开发的一个步骤流程。如有不懂得地方可以私信我,然后帮你讲解
如有什么不对的地方,还请帮忙纠正错误!