一,回顾
上篇文章讲了tomcat的Connector启动过程,启动之后,就可以接收请求了。这篇文章主要讲解tomcat如何接收请求。
二,NioEndpoint
2.1 NioEndpoint#Acceptor创建链接
在NioEndpoint#Acceptor的run方法中,接受socket链接,代码如下:
NioEndpoint#Acceptor
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
...//略 是否要暂停 Loop if endpoint is paused
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = serverSock.accept();
} catch (IOException ioe) {
//we didn't get a socket
countDownConnection();
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
}
// Successful accept, reset the error delay
errorDelay = 0;
// setSocketOptions() will add channel to the poller
// if successful
...//略,状态处理
} catch (SocketTimeoutException sx) {
// Ignore: Normal condition
} catch (IOException x) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), x);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
核心代码在socket = serverSock.accept()
中,当创建链路后。Selector会对此链路进行监控,然后进行数据的读写。
2.2 NioEndpoint#Poller接受数据
在poller的run方法中,对SelectionKey进行遍历。
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
...略// Time to terminate?
try {
if ( !close ) {
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
//遍历SelectionKey
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
//处理SelectionKey
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
stopLatch.countDown();
}
在run方法中,有一句processKey(sk, attachment)
,对SelectionKey进行处理,我们看其具体实现逻辑:
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
//处理文件发送
processSendfile(sk,attachment, false);
} else {
if ( isWorkerAvailable() ) {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
//读操作
if (!processSocket(attachment, SocketStatus.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
//写操作
if (!processSocket(attachment, SocketStatus.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
processKey对文件进行了隔离操作,然后使用processSocket方法进行读写操作。
protected boolean processSocket(NioSocketWrapper attachment, SocketStatus status, boolean dispatch) {
try {
if (attachment == null) {
return false;
}
SocketProcessor sc = processorCache.pop();
if ( sc == null ) sc = new SocketProcessor(attachment, status);
else sc.reset(attachment, status);
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
return false;
} catch (Throwable t) {
return false;
}
return true;
}
processSocket创建了一个SocketProcessor,然后提交到线程池中进行处理。我们看SocketProcessor的run方法。
public void run() {
NioChannel socket = ka.getSocket();
if (socket == null) {
return;
}
SelectionKey key = socket.getIOChannel().keyFor(
socket.getPoller().getSelector());
synchronized (socket) {
try {
int handshake = -1;
try {
...//略,握手处理
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (status == null) {
state = handler.process(ka, SocketStatus.OPEN_READ);
} else {
state = handler.process(ka, status);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1 ) {
close(socket, key);
} else {
ka.getPoller().add(socket,handshake);
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error("", t);
socket.getPoller().cancelledKey(key);
} finally {
ka = null;
status = null;
//return to cache
if (running && !paused) {
processorCache.push(this);
}
}
}
}
run方法对握手进行了处理,然后开始处理数据。在handler的procee实现了具体的处理方式:
public SocketState process(SocketWrapperBase<S> wrapper,
SocketStatus status) {
...//略,对状态的一些判断
Processor processor = connections.get(socket);
if (status == SocketStatus.DISCONNECT && processor == null) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}
wrapper.setAsync(false);
ContainerThreadMarker.set();
try {
...//对process的判断
processor.setSslSupport(
wrapper.getSslSupport(getProtocol().getClientCertProvider()));
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (status == SocketStatus.CLOSE_NOW) {
...//略,很多状态
} else {
state = processor.process(wrapper);
}
if (state != SocketState.CLOSED && processor.isAsync()) {
state = processor.asyncPostProcess();
}
...//略,协议更新处理
if (dispatches == null || !dispatches.hasNext()) {
// Only returns non-null iterator if there are
// dispatches to process.
dispatches = wrapper.getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END ||
state == SocketState.UPGRADING ||
dispatches != null && state != SocketState.CLOSED);
...//略,状态判断
return state;
} catch(java.net.SocketException e) {
...//略,异常
}
finally {
ContainerThreadMarker.clear();
}
// Make sure socket/processor is removed from the list of current
// connections
connections.remove(socket);
// Don't try to add upgrade processors back into the pool
if (processor !=null && !processor.isUpgrade()) {
release(wrapper, processor, false);
}
return SocketState.CLOSED;
}
在这个方法中,我们只看最重要的 processor.process(wrapper);
对于http协议,processor是Http11Processor,下面将详细讲解。
三,Http11Processor
下面我们看看Process是如何对NioSocketWrapper进行处理的。我们看process方法,方法太长,省略了许多:
public SocketState process(SocketWrapperBase<?> socketWrapper)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the I/O
...//略
// Flags
keepAlive = true;
openSocket = false;
sendfileInProgress = false;
readComplete = true;
keptAlive = false;
while (!getErrorState().isError() && keepAlive && !isAsync() &&
httpUpgradeHandler == null && !endpoint.isPaused()) {
// Parsing the request header
try {
if (!inputBuffer.parseRequestLine(keptAlive)) {
if (handleIncompleteRequestLineRead()) {
break;
}
}
if (endpoint.isPaused()) {
// 503 - Service unavailable
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
if (!inputBuffer.parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!disableUploadTimeout) {
socketWrapper.setReadTimeout(connectionUploadTimeout);
}
}
} catch (Throwable t) {
...//略,其他异常
// 400 - Bad Request ,返回400错误
response.setStatus(400);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
// Has an upgrade been requested?
...//略,更新请求
if (!getErrorState().isError()) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
prepareRequest();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}
if (maxKeepAliveRequests == 1) {
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}
// Process the request in the adapter
if (!getErrorState().isError()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
getAdapter().service(request, response);
// Handle when the response was committed before a serious
// error occurred. Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if(keepAlive && !getErrorState().isError() && (
response.getErrorException() != null ||
(!isAsync() &&
statusDropsConnection(response.getStatus())))) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
} catch (InterruptedIOException e) {
setErrorState(ErrorState.CLOSE_NOW, e);
} catch (HeadersTooLargeException e) {
// The response should not have been committed but check it
// anyway to be safe
if (response.isCommitted()) {
setErrorState(ErrorState.CLOSE_NOW, e);
} else {
response.reset();
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, e);
response.setHeader("Connection", "close"); // TODO: Remove
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("http11processor.request.process"), t);
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}
// Finish the handling of the request
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
if (!isAsync()) {
// If this is an async request then the request ends when it has
// been completed. The AsyncContext is responsible for calling
// endRequest() in that case.
endRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
if (getErrorState().isError()) {
response.setStatus(500);
}
request.updateCounters();
...//略
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
if (breakKeepAliveLoop(socketWrapper)) {
break;
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
if (getErrorState().isError() || endpoint.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync()) {
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else {
if (sendfileInProgress) {
return SocketState.SENDFILE;
} else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
} else {
return SocketState.LONG;
}
} else {
return SocketState.CLOSED;
}
}
}
}
方法太长,略去了一部分。代码主要是作用是生成了org.apache.coyote.Requset对象和org.apache.coyote.Response对象,对升级和错误进行了额外的处理等。我们重点看getAdapter().service(request, response);
这段代码会转发到Servlet请求上。getAdapter()返回一个CoyoteAdapter对象,我们接着看。
四,CoyoteAdapter
service方法对req和res进行转型,转成ServletRequest和ServletResponse
public void service(org.apache.coyote.Request req,
org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request == null) {
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);
// Link objects
request.setResponse(response);
response.setRequest(request);
// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
// Set query string encoding
req.getParameters().setQueryStringEncoding
(connector.getURIEncoding());
}
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}
boolean async = false;
try {
// Parse and set Catalina and configuration specific
// request parameters
req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
boolean postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
}
AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
if (asyncConImpl != null) {
async = true;
ReadListener readListener = req.getReadListener();
if (readListener != null && request.isFinished()) {
// Possible the all data may have been read during service()
// method so this needs to be checked here
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
if (req.sendAllDataReadEvent()) {
req.getReadListener().onAllDataRead();
}
} finally {
request.getContext().unbind(false, oldCL);
}
}
} else {
request.finishRequest();
response.finishResponse();
if (postParseSuccess) {
// Log only if processing was invoked.
// If postParseRequest() failed, it has already logged it.
request.getMappingData().context.logAccess(
request, response,
System.currentTimeMillis() - req.getStartTime(),
false);
}
}
} catch (IOException e) {
// Ignore
} finally {
req.getRequestProcessor().setWorkerThreadName(null);
AtomicBoolean error = new AtomicBoolean(false);
res.action(ActionCode.IS_ERROR, error);
// Recycle the wrapper request and response
if (!async || error.get()) {
request.recycle();
response.recycle();
} else {
// Clear converters so that the minimum amount of memory
// is used by this processor
request.clearEncoders();
response.clearEncoders();
}
}
}
太长了,我们只关注最重要的:connector.getService().getMapper().map(serverName, decodedURI, version, request.getMappingData());
connector.getService()返回了StandardService,然后映射到了具体的Servlet上,这里就是详细解释了,要去跑步了,88.
五,结束
请求处理完后就会发送数据,就不一一详解了。