others - java.lang. ClassCastException: org.apache.hadoop.hbase.client.Result 不能转换为 org.apache.hadoop.hbase.client.Mutation

将值从一个hbase表传送到另一个时出现错误

INFO mapreduce.Job: Task Id : attempt_1410946588060_0019_r_000000_2, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Result cannot be cast to org.apache.hadoop.hbase.client.Mutation
 at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:87)
 at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:576)
 at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
 at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
 at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
 at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
 at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:645)
 at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:405)
 at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:396)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
 at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

驱动类:


 Configuration conf = HBaseConfiguration.create();

 // define scan and define column families to scan
 Scan scan = new Scan();
 scan.addFamily(Bytes.toBytes("cf1"));

 // Job job = new Job(conf,"ExampleSummary");
 Job job =Job.getInstance(conf); 

 job.setJarByClass(HBaseDriver.class);
 //
 // define input hbase tableS
 TableMapReduceUtil.initTableMapperJob(
"test1",
 scan,
 HBaseMapper.class,
 ImmutableBytesWritable.class,
 Result.class,
 job);
 // define output table
 TableMapReduceUtil.initTableReducerJob(
"test2",
 HBaseReducer.class, 
 job);

 job.waitForCompletion(true);

映射器:


 public void map(ImmutableBytesWritable rowKey, Result columns, Context context)
 throws IOException, InterruptedException {

 try {
 // get rowKey and convert it to string
 String inKey = new String(rowKey.get());
 // set new key having only date
 String oKey = inKey.split("#")[0];
 // get sales column in byte format first and then convert it to string (as it is stored as string from hbase shell)
 byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
 String sSales = new String(bSales);
 Integer sales = new Integer(sSales);
 // emit date and sales values
 context.write(new ImmutableBytesWritable(oKey.getBytes()), new IntWritable(sales));

 } catch (RuntimeException e){
 e.printStackTrace();
 }

reducer :


 public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) 
 throws IOException, InterruptedException {
 try {

 int sum = 0;
 // loop through different sales vales and add it to sum
 for (IntWritable sales : values) {
 Integer intSales = new Integer(sales.toString());
 sum += intSales;
 } 

 // create hbase put with rowkey as date


 Put insHBase = new Put(key.get());
 // insert sum value to hbase 
 insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
 // write data to Hbase table
 context.write(null, insHBase);

 } catch (Exception e) {
 e.printStackTrace();
 }
 }

时间:

我找到了解决方法,只要改变

这个:


TableMapReduceUtil.initTableMapperJob(
"test1",
 scan,
 HBaseMapper.class,
 ImmutableBytesWritable.class,
 Result.class,
 job);

到这个:


TableMapReduceUtil.initTableMapperJob(
"test1",
 scan,
 HBaseMapper.class,
 ImmutableBytesWritable.class,
 IntWritable.class,
 job);

...