Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UpgradeHandler may not notify the application to read the initial data that sent along with the upgrade request. #30328

Open
wants to merge 3 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.ibm.ws.http.internal.VirtualHostMap.RequestHelper;
import com.ibm.ws.transport.access.TransportConnectionAccess;
import com.ibm.ws.transport.access.TransportConstants;
import com.ibm.wsspi.bytebuffer.WsByteBuffer;
import com.ibm.wsspi.channelfw.ConnectionLink;
import com.ibm.wsspi.channelfw.VirtualConnection;
import com.ibm.wsspi.channelfw.base.InboundApplicationLink;
Expand Down Expand Up @@ -167,20 +168,20 @@ public void init(VirtualConnection inVC, HttpDispatcherChannel channel) {
public void close(VirtualConnection conn, Exception e) {

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Close called , vc ->" + this.vc + " hc: " + this.hashCode());
Tr.debug(tc, "close ENTER, vc ->" + this.vc + " hc: " + this.hashCode());
}

if (this.vc == null) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Connection must be already closed since vc is null");
Tr.debug(tc, "close, Connection must be already closed since vc is null");
}

// closeCompleted check is for the close, destroy, close order scenario.
// Without this check, this second close (after the destroy) would decrement the connection again and produce a quiesce error.
if (this.decrementNeeded.compareAndSet(true, false) & !closeCompleted.get()) {
// ^ set back to false in case close is called more than once after destroy is called (highly unlikely)
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "decrementNeeded is true: decrement active connection");
Tr.debug(tc, "close, decrementNeeded is true: decrement active connection");
}
this.myChannel.decrementActiveConns();
}
Expand All @@ -193,24 +194,43 @@ public void close(VirtualConnection conn, Exception e) {
// so we will have to use close API from SRTConnectionContext31 and call closeStreams.
String closeNonUpgraded = (String) (this.vc.getStateMap().get(TransportConstants.CLOSE_NON_UPGRADED_STREAMS));
if (closeNonUpgraded != null && closeNonUpgraded.equalsIgnoreCase("true")) {

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "close streams from HttpDispatcherLink.close");
Tr.debug(tc, "close, CLOSE_NON_UPGRADED_STREAMS");
}

// Save the remain upgrading and unread data into a VC's stateMap which will be consumed in the UpgradeInputByteBufferUtil.initialRead
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be better to just say "Save the remaining unread data"? I'm not following what upgrading vs unread data is.

if (this.isc.isReadDataAvailable()) {
WsByteBuffer currentBuffer = this.isc.getReadBuffer();
pmd1nh marked this conversation as resolved.
Show resolved Hide resolved
WsByteBuffer newBuffer = HttpDispatcher.getBufferManager().allocate(currentBuffer.remaining());
newBuffer.put(currentBuffer);
newBuffer.flip();

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "close, saved [" + newBuffer.remaining() + "] unread data from isc buffer [" + currentBuffer + "] to vc statemap [" + newBuffer + "]");
}

currentBuffer = null;
vc.getStateMap().put(TransportConstants.NOT_UPGRADED_UNREAD_DATA, newBuffer);
}

Exception errorinClosing = this.closeStreams();

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Error closing in streams" + errorinClosing);
Tr.debug(tc, "close, Error closing in streams" + errorinClosing);
}

vc.getStateMap().put(TransportConstants.CLOSE_NON_UPGRADED_STREAMS, "CLOSED_NON_UPGRADED_STREAMS");

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "close EXIT");
}

return;
}

String upgradedListener = (String) (this.vc.getStateMap().get(TransportConstants.UPGRADED_LISTENER));
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "upgradedListener ->" + upgradedListener);
Tr.debug(tc, "close, upgradedListener ->" + upgradedListener);
}
if (upgradedListener != null && upgradedListener.equalsIgnoreCase("true")) {
boolean closeCalledFromWebConnection = false;
Expand All @@ -237,7 +257,7 @@ public void close(VirtualConnection conn, Exception e) {
// but we don't want to manipulate existing logic so a separate constant in the state map will be used for that below

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Connection Not to be closed here because Servlet Upgrade.");
Tr.debug(tc, "close EXIT, Connection Not to be closed here because Servlet Upgrade.");
}
return;
}
Expand All @@ -253,11 +273,11 @@ public void close(VirtualConnection conn, Exception e) {
// want to call close outside of the sync to avoid deadlocks.
WebConnCanClose = false;
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Upgraded Web Connection closing Dispatcher Link");
Tr.debug(tc, "close, Upgraded Web Connection closing Dispatcher Link");
}
} else {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Upgraded Web Connection already called close; returning");
Tr.debug(tc, "close EXIT, Upgraded Web Connection already called close; returning");
}
return;
}
Expand All @@ -274,12 +294,16 @@ public void close(VirtualConnection conn, Exception e) {
super.close(conn, e);
} finally {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "decrement active connection count");
Tr.debug(tc, "close, decrement active connection count");
}
this.myChannel.decrementActiveConns();
}
closeCompleted.compareAndSet(false, true);
}

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "close EXIT");
}
}

/*
Expand Down Expand Up @@ -1273,7 +1297,7 @@ public void run() {
}

if (ic.decrementNeeded.compareAndSet(true, false)) {
// ^ set back to false in case close is called more than once after destroy is called (highly unlikely)
// ^ set back to false in case close is called more than once after destroy is called (highly unlikely)
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "decrementNeeded is true: decrement active connection");
}
Expand Down Expand Up @@ -1348,23 +1372,24 @@ public boolean handleHTTP2UpgradeRequest(Map<String, String> http2Settings) {
VirtualConnection vc = link.getVirtualConnection();
H2InboundLink h2Link = new H2InboundLink(channel, vc, getTCPConnectionContext());
boolean bodyReadAndQueued = false;
if(this.isc != null) {
if(this.isc.isIncomingBodyExpected() && !this.isc.isBodyComplete()){
if (this.isc != null) {
if (this.isc.isIncomingBodyExpected() && !this.isc.isBodyComplete()) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Body needed for request. Queueing data locally before upgrade.");
}
HttpInputStreamImpl body = this.request.getBody();
body.setupChannelMultiRead();
byte[] inBytes = new byte[1024];
try{
try {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Starting request read loop.");
}
for (int n; (n = body.read(inBytes)) != -1;) {}
for (int n; (n = body.read(inBytes)) != -1;) {
}
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Finished request read loop.");
}
}catch(Exception e){
} catch (Exception e) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Got exception reading request and queueing up data. Can't handle request upgrade to HTTP2.", e);
}
Expand All @@ -1374,12 +1399,12 @@ public boolean handleHTTP2UpgradeRequest(Map<String, String> http2Settings) {
}
body.setReadFromChannelComplete();
bodyReadAndQueued = true;
}else{
} else {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "No body needed for request. Continuing upgrade as normal.");
}
}
}else {
} else {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Failed to get isc, Null value received which could cause issues expecting data. Continuing upgrade as normal.");
}
Expand All @@ -1398,7 +1423,7 @@ public boolean handleHTTP2UpgradeRequest(Map<String, String> http2Settings) {
// A problem occurred with the connection start up, a trace message will be issued from waitForConnectionInit()
vc.getStateMap().put(h2InitError, true);
}
if(bodyReadAndQueued)
if (bodyReadAndQueued)
isc.setBodyComplete();
return rc;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
/*******************************************************************************
* Copyright (c) 2013 IBM Corporation and others.
* Copyright (c) 2013, 2024 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* IBM Corporation - initial API and implementation
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/
package com.ibm.ws.transport.access;

Expand All @@ -31,4 +28,6 @@ public class TransportConstants {
public static final String UPGRADED_LISTENER = "UpgradedListener";
public static final String CLOSE_UPGRADED_WEBCONNECTION = "CloseUpgradedWebConnection";

//Initial upgrade request data may be read together with the headers before the upgrade.
public static final String NOT_UPGRADED_UNREAD_DATA = "NotUpgradedUnreadData";
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corporation and others.
* Copyright (c) 2014, 2024 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
package com.ibm.ws.webcontainer31.upgrade;

Expand Down Expand Up @@ -48,6 +45,10 @@ public UpgradeReadCallback(ReadListener rl, UpgradeInputByteBufferUtil uIBBU, Th
_upgradeStream = uIBBU;
_contextManager = tcm;
_srtUpgradeStream = srtUpgradeStream;

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "UpgradeReadCallback constructor, ReadListener [" + _rl + "], this " + this);
}
}

/* (non-Javadoc)
Expand All @@ -56,6 +57,9 @@ public UpgradeReadCallback(ReadListener rl, UpgradeInputByteBufferUtil uIBBU, Th
@Override
@FFDCIgnore(IOException.class)
public void complete(VirtualConnection vc, TCPReadRequestContext rsc) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "complete ENTER , ReadListener [" + _rl + "], this " + this);
}

if(vc == null){
return;
Expand Down Expand Up @@ -106,6 +110,10 @@ public void complete(VirtualConnection vc, TCPReadRequestContext rsc) {
_srtUpgradeStream.notify();
}
}

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "complete EXIT ");
}
}
}

Expand Down
Loading