一、spark1.6读取csv
spark2.0才开始源码支持CSV,所以1.6版本需要借助第三方包来实现读取CSV文件,有好几种方法,
1.如果有maven的,到https://spark-packages.org/package/databricks/spark-csv下载对应scala版本的第三方jar包然后再maven的pom里面添加denpency,然后根据官网的用法用--packages传入。这样它就会自动去maven里面寻找了。
2.如果是Python开发的,用Python自带的库,比如pandas、csv等,可以参考这个博客。
3.如果没有maven可以通过textfile读入,然后通过opencsv来转化。到这里下载https://sourceforge.net/projects/opencsv/files/latest/download第三方jar包,放到spark安装目录的lib目录和Hadoop的share目录里面/usr/local/src/hadoop-2.6.1/share/hadoop/common/lib
import java.io.StringReader import au.com.bytecode.opencsv.CSVReader val pmix=sc.textFile("file:///mnt/hgfs/vm/20170101.csv") val pmixrdd=pmix.map{line =>val reader=new CSVReader(new StringReader(line)); reader.readNext()} pmixrdd.count
我放进去一个一千万行的数据,用了大概两分钟
二、转化为DataFrame
RDD有两种方式转化df:根据反射推断方式和编程方式。
根据反射推断的方式,scala2.10只能支持22列,但是我的表有30多列,所以我选择编程的方式,虽然编程的方式比较麻烦点。
import sqlContext.implicits._ import org.apache.spark.sql.types.{StructType,StructField,StringType}; import org.apache.spark.sql.Row; val schemaString = "region,store_type,sitename,storeid,check_no,employee,dob,dob_full,daypart,hour,minute,qcid,qc_name,qc,category,category_name,item,bohname,longname,tender_name,check_type,tot_amt,tot_amt_ala,price_tot,tot_amt_disc,disc_name,quantity,food_cost,paper_cost,burger_count,dimension_product_mix,dimension_channel_mix" val schema = StructType( schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true))) val rowRDD= pmixrdd.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15), p(16), p(17), p(18), p(19), p(20), p(21), p(22), p(23), p(24), p(25), p(26), p(27), p(28), p(29), p(30), p(31))) val pmixdf= sqlContext.createDataFrame(rowRDD, schema)
三、使用SQL
接下来注册一个临时表
pmixdf.registerTempTable("pmix") val results = sqlContext.sql("SELECT * FROM pmix limit 10").show() pmixdf.write.format("parquet").mode("overwrite").save("file:///mnt/hgfs/vm/pmix.parquet")
二、spark2.4读取csv
import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} val conf = new SparkConf().setAppName("pmix").setMaster("local[2]") //val sc = new SparkContext(conf) //如果是集群启动,就不用这句了,自动加载sc sc.setLogLevel("WARN") //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console val spark = new SQLContext(sc) import spark.implicits._ val df = spark.read.format("com.databricks.spark.csv") .option("header", "false") .option("inferSchema", "false") //也可以.option("inferSchema", true.toString) //自动推断属性列的数据类型 .option("delimiter",",") //分隔符,默认为 , .load("file:///mnt/hgfs/vm/20170101.csv") //.save(outpath) df.show()