转自:
hadoop 远程调度(四)
简介
继续上一篇博客
远程调度(三)来介绍hadoop远程调度过程。上一篇主要写rpc 服务端启动,和接受到客户端请求,处理请求,及把请求封装成一个call对象。接下来介绍怎么执行客户端请求,和结果返回客户端。
Hander 中处理Call对象
hander对象数组初始化在Server.start()方法中,这个方法中启动了一系列hander线程,这些线程在run方法中从Server.callQueue队列中获取call对象,并且处理请求,获取返回结果,并且把结果返回个对应的客户端。
@Override
public void run() {
while (running) {
TraceScope traceScope = null;
try {
final Call call = callQueue.take();
try {
if (call.connection.user == null) {
value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
call.connection.user.doAs
(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);
}
}
);
}
} catch (Throwable e) {
}
CurCall.set(null);
synchronized (call.connection.responseQueue) {
setupResponse(buf, call, returnStatus, detailedErr,
value, errorClass, error);
call.sendResponse();
}
} catch (InterruptedException e) {
} finally {
IOUtils.cleanup(LOG, traceScope);
}
}
LOG.debug(Thread.currentThread().getName() + ": exiting");
}
}
#setupResponse() 方法
- 整理返回结果header
- 根据不同请求方式序列化返回结果数据到缓冲区中
Server.Call.sendResponse()方法
- 调用connection的sendResponse()方法
- connection.sendResponse()调用了responder.doRespond(call)方法,这时候请求被推送到responder线程中,这个线程在server.start()时被启动。用来等待处理返回结果
Server.Responder.doRespond() 方法
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue, true);
}
}
}
Server.Responder.processResponse()
作用: 处理一个响应,如果channel没有数据需要处理则返回true
private boolean processResponse(LinkedList<Call> responseQueue,
boolean inHandler) throws IOException {
boolean error = true;
boolean done = false;
int numElements = 0;
Call call = null;
try {
synchronized (responseQueue) {
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true;
}
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
int numBytes = channelWrite(channel, call.rpcResponse);
if (numBytes < 0) {
return true;
}
if (!call.rpcResponse.hasRemaining()) {
call.rpcResponse = null;
call.connection.decRpcCount();
if (numElements == 1) {
done = true;
} else {
done = false;
}
} else {
call.connection.responseQueue.addFirst(call);
if (inHandler) {
call.timestamp = Time.now();
incPending();
try {
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
done = true;
} finally {
decPending();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote partial " + numBytes + " bytes.");
}
}
error = false;
}
} finally {
if (error && call != null) {
LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");
done = true;
closeConnection(call.connection);
}
}
return done;
}
Server.Responder.run()方法
@Override
public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
doRunLoop();
}
private void doRunLoop() {
long lastPurgeTime = 0;
while (running) {
try {
waitPending();
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}
}
}
long now = Time.now();
if (now < lastPurgeTime + PURGE_INTERVAL) {
continue;
}
lastPurgeTime = now;
if(LOG.isDebugEnabled()) {
LOG.debug("Checking for old call responses.");
}
ArrayList<Call> calls;
synchronized (writeSelector.keys()) {
calls = new ArrayList<Call>(writeSelector.keys().size());
iter = writeSelector.keys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
Call call = (Call)key.attachment();
if (call != null && key.channel() == call.connection.channel) {
calls.add(call);
}
}
}
for(Call call : calls) {
doPurge(call, now);
}
} catch (OutOfMemoryError e) {
}
}
对结果返回的后续操作参考了
参考链接博客