https://spark-packages.org/里有很多third-party数据源的package,spark把包加载进来就可以使用了
csv格式在spark2.0版本之后是内置的,2.0之前属于第三方数据源
读取本地外部数据源
直接读取一个json文件
1 | [hadoop@hadoop000 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar |
运行报错:
1 | Caused by: java.lang.RuntimeException: file:/home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 57, 125, 10] |
查看load方法的源码:
1 | /** |
从源码中可以看出,如果不指定format,load默认读取的是parquet文件
1 | scala> val users = spark.read.load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet") |
读取其他格式的文件,必须通过format指定文件格式,如下:
1 | //windows idea环境下 |
1 | Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: XXX |
读取CSV格式文件
1 | //源文件内容如下: |
读取csv格式文件还可以自定义schema
1 | val peopleschema = StructType(Array( |
将读取的文件以其他格式写出
1 | //将上文读取的users.parquet以json格式写出 |
此操作的目的在于学会类型转换,生产上最开始进来的数据大多都是text,json等行式存储的文件,一般都要转成ORC,parquet列式存储的文件,加上压缩,能把文件大小减小到10%左右,大幅度减小IO和数据处理量,提高性能
此时如果再执行一次save,路径不变,则会报错:
1 | scala> users.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/parquet2json/") |
可以通过设置savemode来解决这个问题
默认是errorifexists
1 | scala> users.select("name","favorite_color").write.format("json").mode("overwrite").save("file:///home/hadoop/tmp/parquet2json/") |