1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.ipc;
19  
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufInputStream;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandlerAdapter;
24  
25  import java.io.IOException;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.CellScanner;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
32  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
33  import org.apache.hadoop.ipc.RemoteException;
34  
35  import com.google.protobuf.Message;
36  
37  
38  
39  
40  @InterfaceAudience.Private
41  public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
42    public static final Log LOG = LogFactory.getLog(AsyncServerResponseHandler.class.getName());
43  
44    private final AsyncRpcChannel channel;
45  
46    
47  
48  
49  
50  
51    public AsyncServerResponseHandler(AsyncRpcChannel channel) {
52      this.channel = channel;
53    }
54  
55    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
56      ByteBuf inBuffer = (ByteBuf) msg;
57      ByteBufInputStream in = new ByteBufInputStream(inBuffer);
58      int totalSize = inBuffer.readableBytes();
59      try {
60        
61        RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
62        int id = responseHeader.getCallId();
63        AsyncCall call = channel.removePendingCall(id);
64        if (call == null) {
65          
66          
67          
68          
69          
70          int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
71          int whatIsLeftToRead = totalSize - readSoFar;
72  
73          
74          
75          
76          inBuffer.skipBytes(whatIsLeftToRead);
77          return;
78        }
79  
80        if (responseHeader.hasException()) {
81          RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException();
82          RemoteException re = createRemoteException(exceptionResponse);
83          if (exceptionResponse.getExceptionClassName().
84              equals(FatalConnectionException.class.getName())) {
85            channel.close(re);
86          } else {
87            call.setFailed(re);
88          }
89        } else {
90          Message value = null;
91          
92          if (call.responseDefaultType != null) {
93            Message.Builder builder = call.responseDefaultType.newBuilderForType();
94            ProtobufUtil.mergeDelimitedFrom(builder, in);
95            value = builder.build();
96          }
97          CellScanner cellBlockScanner = null;
98          if (responseHeader.hasCellBlockMeta()) {
99            int size = responseHeader.getCellBlockMeta().getLength();
100           byte[] cellBlock = new byte[size];
101           inBuffer.readBytes(cellBlock, 0, cellBlock.length);
102           cellBlockScanner = channel.client.createCellScanner(cellBlock);
103         }
104         call.setSuccess(value, cellBlockScanner);
105       }
106     } catch (IOException e) {
107       
108       channel.close(e);
109     } finally {
110       inBuffer.release();
111     }
112   }
113 
114   
115 
116 
117 
118   private RemoteException createRemoteException(final RPCProtos.ExceptionResponse e) {
119     String innerExceptionClassName = e.getExceptionClassName();
120     boolean doNotRetry = e.getDoNotRetry();
121     return e.hasHostname() ?
122         
123         new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
124             e.getPort(), doNotRetry) :
125         new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
126   }
127 }