79
79
import org .apache .hadoop .hbase .util .CommonFSUtils ;
80
80
import org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
81
81
import org .apache .hadoop .hbase .util .MapReduceExtendedCell ;
82
+ import org .apache .hadoop .hdfs .DFSConfigKeys ;
83
+ import org .apache .hadoop .hdfs .HdfsConfiguration ;
82
84
import org .apache .hadoop .io .NullWritable ;
83
85
import org .apache .hadoop .io .SequenceFile ;
84
86
import org .apache .hadoop .io .Text ;
90
92
import org .apache .hadoop .mapreduce .lib .output .FileOutputFormat ;
91
93
import org .apache .hadoop .mapreduce .lib .output .PathOutputCommitter ;
92
94
import org .apache .hadoop .mapreduce .lib .partition .TotalOrderPartitioner ;
95
+ import org .apache .hadoop .net .NetUtils ;
93
96
import org .apache .yetus .audience .InterfaceAudience ;
94
97
import org .slf4j .Logger ;
95
98
import org .slf4j .LoggerFactory ;
@@ -306,8 +309,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
306
309
LOG .trace ("Failed get of location, use default writer {}" , Bytes .toString (rowKey ));
307
310
} else {
308
311
LOG .debug ("First rowkey: [{}]" , Bytes .toString (rowKey ));
309
- InetSocketAddress initialIsa =
310
- new InetSocketAddress (loc .getHostname (), loc .getPort ());
312
+ InetSocketAddress initialIsa = getDNFavoredNode (conf , loc );
311
313
if (initialIsa .isUnresolved ()) {
312
314
LOG .trace ("Failed resolve address {}, use default writer" , loc .getHostnamePort ());
313
315
} else {
@@ -329,6 +331,14 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
329
331
this .previousRows .put (family , rowKey );
330
332
}
331
333
334
+ private InetSocketAddress getDNFavoredNode (Configuration conf , HRegionLocation loc ) {
335
+ HdfsConfiguration .init ();
336
+ Configuration dnConf = new HdfsConfiguration (conf );
337
+ int dnPort = NetUtils .createSocketAddr (dnConf .get (DFSConfigKeys .DFS_DATANODE_ADDRESS_KEY ,
338
+ DFSConfigKeys .DFS_DATANODE_ADDRESS_DEFAULT )).getPort ();
339
+ return new InetSocketAddress (loc .getHostname (), dnPort );
340
+ }
341
+
332
342
private Path getTableRelativePath (byte [] tableNameBytes ) {
333
343
String tableName = Bytes .toString (tableNameBytes );
334
344
String [] tableNameParts = tableName .split (":" );
0 commit comments