tomcat8源码阅读(六)--接收请求

一,回顾

上篇文章讲了tomcat的Connector启动过程,启动之后,就可以接收请求了。这篇文章主要讲解tomcat如何接收请求。

二,NioEndpoint

2.1 NioEndpoint#Acceptor创建链接

在NioEndpoint#Acceptor的run方法中,接受socket链接,代码如下:

NioEndpoint#Acceptor    
public void run() {

        int errorDelay = 0;

        // Loop until we receive a shutdown command
        while (running) {

           ...//略 是否要暂停 Loop if endpoint is paused

            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait
                countUpOrAwaitConnection();

                SocketChannel socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    socket = serverSock.accept();
                } catch (IOException ioe) {
                    //we didn't get a socket
                    countDownConnection();
                    // Introduce delay if necessary
                    errorDelay = handleExceptionWithDelay(errorDelay);
                    // re-throw
                    throw ioe;
                }
                // Successful accept, reset the error delay
                errorDelay = 0;

                // setSocketOptions() will add channel to the poller
                // if successful
                ...//略,状态处理
            } catch (SocketTimeoutException sx) {
                // Ignore: Normal condition
            } catch (IOException x) {
                if (running) {
                    log.error(sm.getString("endpoint.accept.fail"), x);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.accept.fail"), t);
            }
        }
        state = AcceptorState.ENDED;
    }

核心代码在socket = serverSock.accept()中,当创建链路后。Selector会对此链路进行监控,然后进行数据的读写。

2.2 NioEndpoint#Poller接受数据

在poller的run方法中,对SelectionKey进行遍历。

public void run() {
    // Loop until destroy() is called
    while (true) {

        boolean hasEvents = false;

        ...略// Time to terminate?

        try {
            if ( !close ) {
                if (wakeupCounter.getAndSet(-1) > 0) {
                    //if we are here, means we have other stuff to do
                    //do a non blocking select
                    keyCount = selector.selectNow();
                } else {
                    keyCount = selector.select(selectorTimeout);
                }
                wakeupCounter.set(0);
            }
            if (close) {
                events();
                timeout(0, false);
                try {
                    selector.close();
                } catch (IOException ioe) {
                    log.error(sm.getString(
                            "endpoint.nio.selectorCloseFail"), ioe);
                }
                break;
            }
        } catch (Throwable x) {
            ExceptionUtils.handleThrowable(x);
            log.error("",x);
            continue;
        }
        //either we timed out or we woke up, process events first
        if ( keyCount == 0 ) hasEvents = (hasEvents | events());

        //遍历SelectionKey    
        Iterator<SelectionKey> iterator =
            keyCount > 0 ? selector.selectedKeys().iterator() : null;
        // Walk through the collection of ready keys and dispatch
        // any active event.
        while (iterator != null && iterator.hasNext()) {
            SelectionKey sk = iterator.next();
            NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
            // Attachment may be null if another thread has called
            // cancelledKey()
            if (attachment == null) {
                iterator.remove();
            } else {
                iterator.remove();
                //处理SelectionKey
                processKey(sk, attachment);
            }
        }//while

        //process timeouts
        timeout(keyCount,hasEvents);
    }//while

    stopLatch.countDown();
}

在run方法中,有一句processKey(sk, attachment),对SelectionKey进行处理,我们看其具体实现逻辑:

 protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
    try {
        if ( close ) {
            cancelledKey(sk);
        } else if ( sk.isValid() && attachment != null ) {
            if (sk.isReadable() || sk.isWritable() ) {
                if ( attachment.getSendfileData() != null ) {
                    //处理文件发送
                    processSendfile(sk,attachment, false);
                } else {
                    if ( isWorkerAvailable() ) {
                        unreg(sk, attachment, sk.readyOps());
                        boolean closeSocket = false;
                        // Read goes before write
                        if (sk.isReadable()) {
                            //读操作
                            if (!processSocket(attachment, SocketStatus.OPEN_READ, true)) {
                                closeSocket = true;
                            }
                        }
                        if (!closeSocket && sk.isWritable()) {
                            //写操作                    
                            if (!processSocket(attachment, SocketStatus.OPEN_WRITE, true)) {
                                closeSocket = true;
                            }
                        }
                        if (closeSocket) {
                            cancelledKey(sk);
                        }
                    }
                }
            }
        } else {
            //invalid key
            cancelledKey(sk);
        }
    } catch ( CancelledKeyException ckx ) {
        cancelledKey(sk);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error("",t);
    }
}

processKey对文件进行了隔离操作,然后使用processSocket方法进行读写操作。

protected boolean processSocket(NioSocketWrapper attachment, SocketStatus status, boolean dispatch) {
    try {
        if (attachment == null) {
            return false;
        }
        SocketProcessor sc = processorCache.pop();
        if ( sc == null ) sc = new SocketProcessor(attachment, status);
        else sc.reset(attachment, status);
        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
    } catch (RejectedExecutionException ree) {
        return false;
    } catch (Throwable t) {
        return false;
    }
    return true;
}

processSocket创建了一个SocketProcessor,然后提交到线程池中进行处理。我们看SocketProcessor的run方法。

public void run() {
    NioChannel socket = ka.getSocket();
    if (socket == null) {
        return;
    }
    SelectionKey key = socket.getIOChannel().keyFor(
            socket.getPoller().getSelector());

    synchronized (socket) {
        try {
            int handshake = -1;

            try {
                ...//略,握手处理
            if (handshake == 0) {
                SocketState state = SocketState.OPEN;
                // Process the request from this socket
                if (status == null) {
                    state = handler.process(ka, SocketStatus.OPEN_READ);
                } else {
                    state = handler.process(ka, status);
                }
                if (state == SocketState.CLOSED) {
                    close(socket, key);
                }
            } else if (handshake == -1 ) {
                close(socket, key);
            } else {
                ka.getPoller().add(socket,handshake);
            }
        } catch (CancelledKeyException cx) {
            socket.getPoller().cancelledKey(key);
        } catch (VirtualMachineError vme) {
            ExceptionUtils.handleThrowable(vme);
        } catch (Throwable t) {
            log.error("", t);
            socket.getPoller().cancelledKey(key);
        } finally {
            ka = null;
            status = null;
            //return to cache
            if (running && !paused) {
                processorCache.push(this);
            }
        }
    }
}

run方法对握手进行了处理,然后开始处理数据。在handler的procee实现了具体的处理方式:

public SocketState process(SocketWrapperBase<S> wrapper,
        SocketStatus status) {
    ...//略,对状态的一些判断

    Processor processor = connections.get(socket);
    if (status == SocketStatus.DISCONNECT && processor == null) {
        // Nothing to do. Endpoint requested a close and there is no
        // longer a processor associated with this socket.
        return SocketState.CLOSED;
    }

    wrapper.setAsync(false);
    ContainerThreadMarker.set();

    try {
        ...//对process的判断
        processor.setSslSupport(
                wrapper.getSslSupport(getProtocol().getClientCertProvider()));

        SocketState state = SocketState.CLOSED;
        Iterator<DispatchType> dispatches = null;
        do {
            if (status == SocketStatus.CLOSE_NOW) {
                ...//略,很多状态
            } else {
                state = processor.process(wrapper);
            }

            if (state != SocketState.CLOSED && processor.isAsync()) {
                state = processor.asyncPostProcess();
            }

            ...//略,协议更新处理
            if (dispatches == null || !dispatches.hasNext()) {
                // Only returns non-null iterator if there are
                // dispatches to process.
                dispatches = wrapper.getIteratorAndClearDispatches();
            }
        } while (state == SocketState.ASYNC_END ||
                state == SocketState.UPGRADING ||
                dispatches != null && state != SocketState.CLOSED);

        ...//略,状态判断
        return state;
    } catch(java.net.SocketException e) {
       ...//略,异常
    }
    finally {
        ContainerThreadMarker.clear();
    }

    // Make sure socket/processor is removed from the list of current
    // connections
    connections.remove(socket);
    // Don't try to add upgrade processors back into the pool
    if (processor !=null && !processor.isUpgrade()) {
        release(wrapper, processor, false);
    }
    return SocketState.CLOSED;
}

在这个方法中,我们只看最重要的 processor.process(wrapper);对于http协议,processor是Http11Processor,下面将详细讲解。

三,Http11Processor

下面我们看看Process是如何对NioSocketWrapper进行处理的。我们看process方法,方法太长,省略了许多:

public SocketState process(SocketWrapperBase<?> socketWrapper)
    throws IOException {
    RequestInfo rp = request.getRequestProcessor();
    rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

    // Setting up the I/O
     ...//略
    // Flags
    keepAlive = true;
    openSocket = false;
    sendfileInProgress = false;
    readComplete = true;
    keptAlive = false;

    while (!getErrorState().isError() && keepAlive && !isAsync() &&
            httpUpgradeHandler == null && !endpoint.isPaused()) {

        // Parsing the request header
        try {
            if (!inputBuffer.parseRequestLine(keptAlive)) {
                if (handleIncompleteRequestLineRead()) {
                    break;
                }
            }

            if (endpoint.isPaused()) {
                // 503 - Service unavailable
                response.setStatus(503);
                setErrorState(ErrorState.CLOSE_CLEAN, null);
            } else {
                keptAlive = true;
                // Set this every time in case limit has been changed via JMX
                request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
                if (!inputBuffer.parseHeaders()) {
                    // We've read part of the request, don't recycle it
                    // instead associate it with the socket
                    openSocket = true;
                    readComplete = false;
                    break;
                }
                if (!disableUploadTimeout) {
                    socketWrapper.setReadTimeout(connectionUploadTimeout);
                }
            }
        } catch (Throwable t) {
            ...//略,其他异常
            // 400 - Bad Request ,返回400错误
            response.setStatus(400);
            setErrorState(ErrorState.CLOSE_CLEAN, t);
            getAdapter().log(request, response, 0);
        }

        // Has an upgrade been requested?
        ...//略,更新请求

        if (!getErrorState().isError()) {
            // Setting up filters, and parse some request headers
            rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
            try {
                prepareRequest();
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.request.prepare"), t);
                }
                // 500 - Internal Server Error
                response.setStatus(500);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }
        }

        if (maxKeepAliveRequests == 1) {
            keepAlive = false;
        } else if (maxKeepAliveRequests > 0 &&
                socketWrapper.decrementKeepAlive() <= 0) {
            keepAlive = false;
        }

        // Process the request in the adapter
        if (!getErrorState().isError()) {
            try {
                rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                getAdapter().service(request, response);
                // Handle when the response was committed before a serious
                // error occurred.  Throwing a ServletException should both
                // set the status to 500 and set the errorException.
                // If we fail here, then the response is likely already
                // committed, so we can't try and set headers.
                if(keepAlive && !getErrorState().isError() && (
                        response.getErrorException() != null ||
                                (!isAsync() &&
                                statusDropsConnection(response.getStatus())))) {
                    setErrorState(ErrorState.CLOSE_CLEAN, null);
                }
            } catch (InterruptedIOException e) {
                setErrorState(ErrorState.CLOSE_NOW, e);
            } catch (HeadersTooLargeException e) {
                // The response should not have been committed but check it
                // anyway to be safe
                if (response.isCommitted()) {
                    setErrorState(ErrorState.CLOSE_NOW, e);
                } else {
                    response.reset();
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, e);
                    response.setHeader("Connection", "close"); // TODO: Remove
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("http11processor.request.process"), t);
                // 500 - Internal Server Error
                response.setStatus(500);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }
        }

        // Finish the handling of the request
        rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
        if (!isAsync()) {
            // If this is an async request then the request ends when it has
            // been completed. The AsyncContext is responsible for calling
            // endRequest() in that case.
            endRequest();
        }
        rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

        // If there was an error, make sure the request is counted as
        // and error, and update the statistics counter
        if (getErrorState().isError()) {
            response.setStatus(500);
        }
        request.updateCounters();

        ...//略

        rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

        if (breakKeepAliveLoop(socketWrapper)) {
            break;
        }
    }

    rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

    if (getErrorState().isError() || endpoint.isPaused()) {
        return SocketState.CLOSED;
    } else if (isAsync()) {
        return SocketState.LONG;
    } else if (isUpgrade()) {
        return SocketState.UPGRADING;
    } else {
        if (sendfileInProgress) {
            return SocketState.SENDFILE;
        } else {
            if (openSocket) {
                if (readComplete) {
                    return SocketState.OPEN;
                } else {
                    return SocketState.LONG;
                }
            } else {
                return SocketState.CLOSED;
            }
        }
    }
}

方法太长,略去了一部分。代码主要是作用是生成了org.apache.coyote.Requset对象和org.apache.coyote.Response对象,对升级和错误进行了额外的处理等。我们重点看getAdapter().service(request, response);这段代码会转发到Servlet请求上。getAdapter()返回一个CoyoteAdapter对象,我们接着看。

四,CoyoteAdapter

service方法对req和res进行转型,转成ServletRequest和ServletResponse
public void service(org.apache.coyote.Request req,
org.apache.coyote.Response res)
throws Exception {

    Request request = (Request) req.getNote(ADAPTER_NOTES);
    Response response = (Response) res.getNote(ADAPTER_NOTES);

    if (request == null) {

        // Create objects
        request = connector.createRequest();
        request.setCoyoteRequest(req);
        response = connector.createResponse();
        response.setCoyoteResponse(res);

        // Link objects
        request.setResponse(response);
        response.setRequest(request);

        // Set as notes
        req.setNote(ADAPTER_NOTES, request);
        res.setNote(ADAPTER_NOTES, response);

        // Set query string encoding
        req.getParameters().setQueryStringEncoding
            (connector.getURIEncoding());

    }

    if (connector.getXpoweredBy()) {
        response.addHeader("X-Powered-By", POWERED_BY);
    }

    boolean async = false;

    try {

        // Parse and set Catalina and configuration specific
        // request parameters
        req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
        boolean postParseSuccess = postParseRequest(req, request, res, response);
        if (postParseSuccess) {
            //check valves if we support async
            request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
            // Calling the container
            connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
        }
        AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
        if (asyncConImpl != null) {
            async = true;
            ReadListener readListener = req.getReadListener();
            if (readListener != null && request.isFinished()) {
                // Possible the all data may have been read during service()
                // method so this needs to be checked here
                ClassLoader oldCL = null;
                try {
                    oldCL = request.getContext().bind(false, null);
                    if (req.sendAllDataReadEvent()) {
                        req.getReadListener().onAllDataRead();
                    }
                } finally {
                    request.getContext().unbind(false, oldCL);
                }
            }
        } else {
            request.finishRequest();
            response.finishResponse();
            if (postParseSuccess) {
                // Log only if processing was invoked.
                // If postParseRequest() failed, it has already logged it.
                request.getMappingData().context.logAccess(
                        request, response,
                        System.currentTimeMillis() - req.getStartTime(),
                        false);
            }
        }

    } catch (IOException e) {
        // Ignore
    } finally {
        req.getRequestProcessor().setWorkerThreadName(null);
        AtomicBoolean error = new AtomicBoolean(false);
        res.action(ActionCode.IS_ERROR, error);
        // Recycle the wrapper request and response
        if (!async || error.get()) {
            request.recycle();
            response.recycle();
        } else {
            // Clear converters so that the minimum amount of memory
            // is used by this processor
            request.clearEncoders();
            response.clearEncoders();
        }
    }

}

太长了,我们只关注最重要的:connector.getService().getMapper().map(serverName, decodedURI, version, request.getMappingData());

connector.getService()返回了StandardService,然后映射到了具体的Servlet上,这里就是详细解释了,要去跑步了,88.

五,结束

请求处理完后就会发送数据,就不一一详解了。