将HDFS上的数据解析出来,然后通过hfile方式批量写入Hbase(需要多列写入) 写⼊数据的关键api:
1 | saveAsNewAPIHadoopFile( |
最初写hfile警告
1
Does it contain files in subdirectories that correspond to column family names
这个原因大概三种
* 代码问题
* 数据源问题
* setMapOutputKeyClass 和 saveAsNewAPIHadoopFile中的Class不不⼀一致
这里是我的是数据源问题
正常写put操作的时候,服务端自动帮助排序,因此在使用put操作的时候没有涉及到这样的错误
1
Added a key not lexically larger than previous
但是在写hfile的时候如果出现报错:
1
Added a key not lexically larger than previous
这样的错误,一般会认为rowkey没有做好排序,然后傻fufu的去验证了一下,rowkey的确做了排序。
真正原因:
`spark写hfile时候是按照rowkey+列族+列名进⾏行排序的,因此在写⼊数据的时候,要做到整体有序 (事情还没完)`
因为需要多列写入,最好的⽅式:要么反射来动态获取列名称和列值 、 要么通过datafame去获取(df.columns)
反射方式:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31val listData: RDD[(ImmutableBytesWritable, ListBuffer[KeyValue])] = rdd.map
{
line =>
val rowkey = line.vintime
val clazz = Class.forName(XXXXXXXXXXXXXXXX)
val fields = clazz.getDeclaredFields
var list = new ListBuffer[String]()
var kvlist = new ListBuffer[KeyValue]()//
if (fields != null && fields.size > 0) {
for (field <- fields) {
field.setAccessible(true)
val column = field.getName
list.append(column)
} }
val newList = list.sortWith(_ < _)
val ik = new ImmutableBytesWritable(Bytes.toBytes(rowkey))
for(column <- newList){
val declaredField: Field =
line.getClass.getDeclaredField(column)
}
declaredField.setAccessible(true)
val value = declaredField.get(line).toString
val kv: KeyValue = new KeyValue(
Bytes.toBytes(rowkey),
Bytes.toBytes(columnFamily),
Bytes.toBytes(column),
Bytes.toBytes(value))
kvlist.append(kv)
}
(ik, kvlist)
}
datafame的方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
val tmpData: RDD[(ImmutableBytesWritable, util.LinkedList[KeyValue])] =
df.rdd.map(
line =>{
val rowkey = line.getAs[String]("vintime")
val ik = new ImmutableBytesWritable(Bytes.toBytes(rowkey))
var linkedList = new util.LinkedList[KeyValue]()
for (column <- columns) {
val kv: KeyValue = new KeyValue(
Bytes.toBytes(rowkey),
Bytes.toBytes(columnFamily),
Bytes.toBytes(column),
Bytes.toBytes(line.getAs[String](column)))
linkedList.add(kv)
}
(ik, linkedList)
})
val result: RDD[(ImmutableBytesWritable, KeyValue)] =
tmpData.flatMapValues(
s => {
val values: Iterator[KeyValue] =
JavaConverters.asScalaIteratorConverter(s.iterator()).asScala
values
}
).sortBy(x =>x._1 , true)
仔细观察可以发现,其实两者都做了排序操作,但是即便经过(1)步骤后仍然报错:
1
Added a key not lexically larger than previous
那么再回想⼀下之前写hfile的要求:
rowkey+列族+列都要有序,那么如果出现数据的重复,也不算是有序的操作! 因为,做一下数据的去重:
1
2
val key: RDD[(String, TransferTime)] = data.reduceByKey((x, y) => y)
val unitData: RDD[TransferTime] = key.map(line => line._2)
果然,这样解决了:Added a key not lexically larger than previous这个异常 但是会报如下另⼀个异常:
1
Kryo serialization failed: Buffer overflow
这个是因为在对⼀些类做kryo序列化时候,数据量的缓存⼤小超过了默认值,做⼀下调整即可
1
2
sparkConf.set("spark.kryoserializer.buffer.max" , "256m")
sparkConf.set("spark.kryoserializer.buffer" , "64m")
完整代码1 | object WriteTransferTime extends WriteToHbase{ |