若泽大数据 www.ruozedata.com

ruozedata


  • 主页

  • 归档

  • 分类

  • 标签

  • 发展历史

  • Suche

代码 | Spark读取mongoDB数据写入Hive普通表和分区表

Veröffentlicht am 2018-11-20 | Bearbeitet am 2019-06-01 | in Spark Other | Aufrufe:

版本:

spark 2.2.0
hive 1.1.0
scala 2.11.8
hadoop-2.6.0-cdh5.7.0
jdk 1.8
MongoDB 3.6.4

一 原始数据及Hive表

MongoDB数据格式
1
2
3
4
5
6
7
{
"_id" : ObjectId("5af65d86222b639e0c2212f3"),
"id" : "1",
"name" : "lisi",
"age" : "18",
"deptno" : "01"
}
Hive普通表
1
2
3
4
5
6
create table mg_hive_test(
id string,
name string,
age string,
deptno string
)row format delimited fields terminated by '\t';
Hive分区表
1
2
3
4
5
6
7
create table  mg_hive_external(
id string,
name string,
age string
)
partitioned by (deptno string)
row format delimited fields terminated by '\t';

二 IDEA+Maven+Java

依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.6.3</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.2.2</version>
</dependency>
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package com.huawei.mongo;/*
* @Author: Create by Achun
*@Time: 2018/6/2 21:00
*
*/

import com.mongodb.spark.MongoSpark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.bson.Document;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

public class sparkreadmgtohive {
public static void main(String[] args) {
//spark 2.x
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession.builder()
.master("local[2]")
.appName("SparkReadMgToHive")
.config("spark.sql.warehouse.dir", warehouseLocation)
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/test.mgtest")
.enableHiveSupport()
.getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

//spark 1.x
// JavaSparkContext sc = new JavaSparkContext(conf);
// sc.addJar("/Users/mac/zhangchun/jar/mongo-spark-connector_2.11-2.2.2.jar");
// sc.addJar("/Users/mac/zhangchun/jar/mongo-java-driver-3.6.3.jar");
// SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkReadMgToHive");
// conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/test.mgtest");
// conf.set("spark. serializer","org.apache.spark.serializer.KryoSerialzier");
// HiveContext sqlContext = new HiveContext(sc);
// //create df from mongo
// Dataset<Row> df = MongoSpark.read(sqlContext).load().toDF();
// df.select("id","name","name").show();

String querysql= "select id,name,age,deptno,DateTime,Job from mgtable b";
String opType ="P";

SQLUtils sqlUtils = new SQLUtils();
List<String> column = sqlUtils.getColumns(querysql);

//create rdd from mongo
JavaRDD<Document> rdd = MongoSpark.load(sc);
//将Document转成Object
JavaRDD<Object> Ordd = rdd.map(new Function<Document, Object>() {
public Object call(Document document){
List list = new ArrayList();
for (int i = 0; i < column.size(); i++) {
list.add(String.valueOf(document.get(column.get(i))));
}
return list;

// return list.toString().replace("[","").replace("]","");
}
});
System.out.println(Ordd.first());
//通过编程方式将RDD转成DF
List ls= new ArrayList();
for (int i = 0; i < column.size(); i++) {
ls.add(column.get(i));
}
String schemaString = ls.toString().replace("[","").replace("]","").replace(" ","");
System.out.println(schemaString);

List<StructField> fields = new ArrayList<StructField>();
for (String fieldName : schemaString.split(",")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

JavaRDD<Row> rowRDD = Ordd.map((Function<Object, Row>) record -> {
List fileds = (List) record;
// String[] attributes = record.toString().split(",");
return RowFactory.create(fileds.toArray());
});

Dataset<Row> df = spark.createDataFrame(rowRDD,schema);

//将DF写入到Hive中
//选择Hive数据库
spark.sql("use datalake");
//注册临时表
df.registerTempTable("mgtable");

if ("O".equals(opType.trim())) {
System.out.println("数据插入到Hive ordinary table");
Long t1 = System.currentTimeMillis();
spark.sql("insert into mgtohive_2 " + querysql + " " + "where b.id not in (select id from mgtohive_2)");
Long t2 = System.currentTimeMillis();
System.out.println("共耗时:" + (t2 - t1) / 60000 + "分钟");
}else if ("P".equals(opType.trim())) {

System.out.println("数据插入到Hive dynamic partition table");
Long t3 = System.currentTimeMillis();
//必须设置以下参数 否则报错
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict");
//depton为分区字段 select语句最后一个字段必须是deptno
spark.sql("insert into mg_hive_external partition(deptno) select id,name,age,deptno from mgtable b where b.id not in (select id from mg_hive_external)");
Long t4 = System.currentTimeMillis();
System.out.println("共耗时:"+(t4 -t3)/60000+ "分钟");
}
spark.stop();
}

}
工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.huawei.mongo;/*
* @Author: Create by Achun
*@Time: 2018/6/3 23:20
*
*/

import java.util.ArrayList;
import java.util.List;

public class SQLUtils {

public List<String> getColumns(String querysql){
List<String> column = new ArrayList<String>();
String tmp = querysql.substring(querysql.indexOf("select") + 6,
querysql.indexOf("from")).trim();
if (tmp.indexOf("*") == -1){
String cols[] = tmp.split(",");
for (String c:cols){
column.add(c);
}
}
return column;
}

public String getTBname(String querysql){
String tmp = querysql.substring(querysql.indexOf("from")+4).trim();
int sx = tmp.indexOf(" ");
if(sx == -1){
return tmp;
}else {
return tmp.substring(0,sx);
}
}

}

三 错误解决办法

1 IDEA会获取不到Hive的数据库和表,将hive-site.xml放入resources文件中。并且将resources设置成配置文件(设置成功文件夹是蓝色否则是灰色)
file–>Project Structure–>Modules–>Source
enter description here
2 上面错误处理完后如果报JDO类型的错误,那么检查HIVE_HOME/lib下时候否mysql驱动,如果确定有,那么就是IDEA获取不到。解决方法如下:

将mysql驱动拷贝到jdk1.8.0_171.jdk/Contents/Home/jre/lib/ext路径下(jdk/jre/lib/ext)
在IDEA项目External Libraries下的<1.8>里面添加mysql驱动
enter description here

四 注意点

由于将MongoDB数据表注册成了临时表和Hive表进行了关联,所以要将MongoDB中的id字段设置成索引字段,否则性能会很慢。
MongoDB设置索引方法:

1
db.getCollection('mgtest').ensureIndex({"id" : "1"}),{"background":true}

查看索引:

1
2
db.getCollection('mgtest').getIndexes()
MongoSpark网址:https://docs.mongodb.com/spark-connector/current/java-api/

ruozedata WeChat Bezahlung
# spark # 高级
最全的Flink部署及开发案例(KafkaSource+SinkToMySQL)
Spark在携程的实践(一)
  • Inhaltsverzeichnis
  • Übersicht

ruozedata

若泽数据优秀博客汇总
155 Artikel
31 Kategorien
74 schlagwörter
RSS
GitHub B站学习视频 腾讯课堂学习视频 官网
  1. 1. 版本:
  2. 2. 一 原始数据及Hive表
    1. 2.0.1. MongoDB数据格式
    2. 2.0.2. Hive普通表
    3. 2.0.3. Hive分区表
  • 3. 二 IDEA+Maven+Java
    1. 3.0.1. 依赖
    2. 3.0.2. 代码
    3. 3.0.3. 工具类
  • 4. 三 错误解决办法
  • 5. 四 注意点
  • |
    若泽数据
    |