RDD的创建
RDD(Resilient Distributed Dataset)全称为弹性分布式数据集,是Spark对数据的核心抽象。创建RDD是使用RDD的第一步,一般可以由内存中的集合、文件、外部数据源生成或者由其他RDD转换而成。
1.并行集合
并行集合可以对Driver中的集合调用parallelize方法得到,Driver会将集合切成分区,并将数据分区分发到整个集群中。
val sc = new SparkContext(sparkConf)
val rdd = sc.parallelize(Seq(1,2,3))
2.从HDFS中读取
HDFS作为Spark 的底层存储,Spark集群中的节点就会去读取分散在 HDFS中的数据块。
val sc = new SparkContext(sparkConf)
val rdd = sc.textFile("/user/me/a.txt")
3.从HBase中读取
Spark从HBase读取文件有两种方式,一种是基于HBase的Scan操作,另一种是直接读取HBase的Region文件(切片,切片是有序二进制文件)。
val sc = new SparkContext(sparkConf)
val tableName = "your_hbaseTable"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum","zk1,zk2,zk3")
conf.set("hbase.zookeeper.property.clientPort","2181")
conf.set(TableInputFormat.INPUT_TABLE,tableName)
val rdd = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],
classOf[org.apache.hadoop.habse.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.habse.client.Result])
//利用 HBase API解析出行键与列值
rdd.foreach ({ case (_,result) =>
val rowkey = Bytes.toString(result.getRow)
val value = Bytes.toString(result.getValue("cf".getBytes,"cl".getBytes))
})
4.从Elasticsearch中读取
Elasticsearch是一个分布式的全文搜索引擎,其数据存储模式采取水平分片(sharding) + 副本(replication) 的形式,需要引用第三方库 elasticsearch-spark_2.10-2.1.0.jar
sparkConf.set("es.index.auto.create","true")
sparkConf.set("es.nodes","es1")
sparkConf.set("es.port","9200")
val sc = new SparkContext(sparkConf)
val rdd = EsSpark.esRDD(sc,"spark/docs").foreach(line => {
val key = line._1
val value = line._2
})
5.从MySQL读取
Spark从 MySQL读取数据是基于JDBC的,这点与 Sqoop相似。
val lowerBound = 1
val upperBound = 10000
val numPartition = 10
val rdd = new JdbcRDD(sc, () => {
Class.forname("com.mysql.jdbc.Driver").newInstance()
DriverManger.getConnection("jdbc:mysql://localhost:3306/db","root","123456")
},
"SELECt name FROM mysqlTable WHERe ID >= ? AND ID <= ? ",
lowerBound,
upperBound,
numPartition,
r => r.getString(1)
)
6.由其他RDD转换得到
RDD算子分为两类,一类为转换(transform)算子,另一类为行动(action)算子,转换算子返回的还是RDD。
val rdd = sc.textFile("/user/me/a.txt").map(x => (x._1,x._1+x._2))