1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.fs.FileAlreadyExistsException;
24  import org.apache.hadoop.fs.FileSystem;
25  import org.apache.hadoop.hbase.TableName;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.hbase.client.BufferedMutator;
29  import org.apache.hadoop.hbase.client.Connection;
30  import org.apache.hadoop.hbase.client.ConnectionFactory;
31  import org.apache.hadoop.hbase.client.Put;
32  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33  import org.apache.hadoop.mapred.FileOutputFormat;
34  import org.apache.hadoop.mapred.InvalidJobConfException;
35  import org.apache.hadoop.mapred.JobConf;
36  import org.apache.hadoop.mapred.RecordWriter;
37  import org.apache.hadoop.mapred.Reporter;
38  import org.apache.hadoop.util.Progressable;
39  
40  
41  
42  
43  @InterfaceAudience.Public
44  @InterfaceStability.Stable
45  public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
46  
47    
48    public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
49  
50    
51  
52  
53  
54    protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
55      private BufferedMutator m_mutator;
56      private Connection connection;
57      
58  
59  
60  
61      public TableRecordWriter(final BufferedMutator mutator) throws IOException {
62        this.m_mutator = mutator;
63      }
64  
65      public TableRecordWriter(JobConf job) throws IOException {
66        
67        TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
68        connection = ConnectionFactory.createConnection(job);
69        m_mutator = connection.getBufferedMutator(tableName);
70      }
71  
72      public void close(Reporter reporter) throws IOException {
73        this.m_mutator.close();
74        if (connection != null) {
75          connection.close();
76          connection = null;
77        }
78      }
79  
80      public void write(ImmutableBytesWritable key, Put value) throws IOException {
81        m_mutator.mutate(new Put(value));
82      }
83    }
84  
85    @Override
86    public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
87        Progressable progress)
88    throws IOException {
89      return new TableRecordWriter(job);
90    }
91  
92    @Override
93    public void checkOutputSpecs(FileSystem ignored, JobConf job)
94    throws FileAlreadyExistsException, InvalidJobConfException, IOException {
95      String tableName = job.get(OUTPUT_TABLE);
96      if (tableName == null) {
97        throw new IOException("Must specify table name");
98      }
99    }
100 }