Spark从集合、HDFS、HBase、Elasticsearch和MySQL中读取数据,创建RDD

   日期:2020-07-09     浏览:262    评论:0    
核心提示:RDD的创建RDD(Resilient Distributed Dataset)全称为弹性分布式数据集,是Spark对数据的核心抽象。创建RDD是使用RDD的第一步,一般可以由内存中的集合、文件、外部数据源生成或者由其他RDD转换而成。1.并行集合并行集合可以对Driver中的集合调用parallelize方法得到,Driver会将集合切成分区,并将数据分区分发到整个集群中。val sc = new SparkContext(sparkConf)val rdd = sc.parallelize(S

RDD的创建

RDD(Resilient Distributed Dataset)全称为弹性分布式数据集,是Spark对数据的核心抽象。创建RDD是使用RDD的第一步,一般可以由内存中的集合、文件、外部数据源生成或者由其他RDD转换而成。

1.并行集合
并行集合可以对Driver中的集合调用parallelize方法得到,Driver会将集合切成分区,并将数据分区分发到整个集群中。

val sc = new SparkContext(sparkConf)
val rdd = sc.parallelize(Seq(1,2,3))

2.从HDFS中读取
HDFS作为Spark 的底层存储,Spark集群中的节点就会去读取分散在 HDFS中的数据块。

val sc = new SparkContext(sparkConf)
val rdd = sc.textFile("/user/me/a.txt")

3.从HBase中读取
Spark从HBase读取文件有两种方式,一种是基于HBase的Scan操作,另一种是直接读取HBase的Region文件(切片,切片是有序二进制文件)。

val sc = new SparkContext(sparkConf)
val tableName = "your_hbaseTable"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum","zk1,zk2,zk3")
conf.set("hbase.zookeeper.property.clientPort","2181")
conf.set(TableInputFormat.INPUT_TABLE,tableName)
val rdd = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],
classOf[org.apache.hadoop.habse.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.habse.client.Result])
//利用 HBase API解析出行键与列值
rdd.foreach ({ case (_,result) => 
   val rowkey = Bytes.toString(result.getRow)
   val value = Bytes.toString(result.getValue("cf".getBytes,"cl".getBytes))
})

4.从Elasticsearch中读取
Elasticsearch是一个分布式的全文搜索引擎,其数据存储模式采取水平分片(sharding) + 副本(replication) 的形式,需要引用第三方库 elasticsearch-spark_2.10-2.1.0.jar

sparkConf.set("es.index.auto.create","true")
sparkConf.set("es.nodes","es1")
sparkConf.set("es.port","9200")
val sc = new SparkContext(sparkConf)
val rdd = EsSpark.esRDD(sc,"spark/docs").foreach(line => {
    val key = line._1
    val value = line._2
})

5.从MySQL读取
Spark从 MySQL读取数据是基于JDBC的,这点与 Sqoop相似。

val lowerBound = 1
val upperBound = 10000
val numPartition = 10 
val rdd = new JdbcRDD(sc, () => {
     Class.forname("com.mysql.jdbc.Driver").newInstance()
     DriverManger.getConnection("jdbc:mysql://localhost:3306/db","root","123456")
},
"SELECt name FROM mysqlTable WHERe ID >= ? AND ID <= ? ",
     lowerBound,
     upperBound,
     numPartition,
     r => r.getString(1)
)

6.由其他RDD转换得到
RDD算子分为两类,一类为转换(transform)算子,另一类为行动(action)算子,转换算子返回的还是RDD。

val rdd = sc.textFile("/user/me/a.txt").map(x => (x._1,x._1+x._2))
 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服