Skip to content

Commit

Permalink
UpgradeHandler may not read the initial data
Browse files Browse the repository at this point in the history
  • Loading branch information
pmd1nh committed Dec 4, 2024
1 parent b6c1ed3 commit e3d7006
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.ibm.ws.http.internal.VirtualHostMap.RequestHelper;
import com.ibm.ws.transport.access.TransportConnectionAccess;
import com.ibm.ws.transport.access.TransportConstants;
import com.ibm.wsspi.bytebuffer.WsByteBuffer;
import com.ibm.wsspi.channelfw.ConnectionLink;
import com.ibm.wsspi.channelfw.VirtualConnection;
import com.ibm.wsspi.channelfw.base.InboundApplicationLink;
Expand Down Expand Up @@ -167,20 +168,20 @@ public void init(VirtualConnection inVC, HttpDispatcherChannel channel) {
public void close(VirtualConnection conn, Exception e) {

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Close called , vc ->" + this.vc + " hc: " + this.hashCode());
Tr.debug(tc, "close ENTER, vc ->" + this.vc + " hc: " + this.hashCode());
}

if (this.vc == null) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Connection must be already closed since vc is null");
Tr.debug(tc, "close, Connection must be already closed since vc is null");
}

// closeCompleted check is for the close, destroy, close order scenario.
// Without this check, this second close (after the destroy) would decrement the connection again and produce a quiesce error.
if (this.decrementNeeded.compareAndSet(true, false) & !closeCompleted.get()) {
// ^ set back to false in case close is called more than once after destroy is called (highly unlikely)
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "decrementNeeded is true: decrement active connection");
Tr.debug(tc, "close, decrementNeeded is true: decrement active connection");
}
this.myChannel.decrementActiveConns();
}
Expand All @@ -193,24 +194,43 @@ public void close(VirtualConnection conn, Exception e) {
// so we will have to use close API from SRTConnectionContext31 and call closeStreams.
String closeNonUpgraded = (String) (this.vc.getStateMap().get(TransportConstants.CLOSE_NON_UPGRADED_STREAMS));
if (closeNonUpgraded != null && closeNonUpgraded.equalsIgnoreCase("true")) {

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "close streams from HttpDispatcherLink.close");
Tr.debug(tc, "close, CLOSE_NON_UPGRADED_STREAMS");
}

// Save the remain upgrading and unread data into a VC's stateMap which will be consumed in the UpgradeInputByteBufferUtil.initialRead
if (this.isc.isReadDataAvailable()) {
WsByteBuffer currentBuffer = this.isc.getReadBuffer();
WsByteBuffer newBuffer = HttpDispatcher.getBufferManager().allocate(currentBuffer.remaining());
newBuffer.put(currentBuffer);
newBuffer.flip();

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "close, saved unread data [" + newBuffer.remaining() + "] from isc buffer [" + currentBuffer + "] to vc statemap [" + newBuffer + "]");
}

currentBuffer = null;
vc.getStateMap().put(TransportConstants.NOT_UPGRADED_UNREAD_DATA, newBuffer);
}

Exception errorinClosing = this.closeStreams();

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Error closing in streams" + errorinClosing);
Tr.debug(tc, "close, Error closing in streams" + errorinClosing);
}

vc.getStateMap().put(TransportConstants.CLOSE_NON_UPGRADED_STREAMS, "CLOSED_NON_UPGRADED_STREAMS");

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "close EXIT");
}

return;
}

String upgradedListener = (String) (this.vc.getStateMap().get(TransportConstants.UPGRADED_LISTENER));
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "upgradedListener ->" + upgradedListener);
Tr.debug(tc, "close, upgradedListener ->" + upgradedListener);
}
if (upgradedListener != null && upgradedListener.equalsIgnoreCase("true")) {
boolean closeCalledFromWebConnection = false;
Expand All @@ -237,7 +257,7 @@ public void close(VirtualConnection conn, Exception e) {
// but we don't want to manipulate existing logic so a separate constant in the state map will be used for that below

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Connection Not to be closed here because Servlet Upgrade.");
Tr.debug(tc, "close EXIT, Connection Not to be closed here because Servlet Upgrade.");
}
return;
}
Expand All @@ -253,11 +273,11 @@ public void close(VirtualConnection conn, Exception e) {
// want to call close outside of the sync to avoid deadlocks.
WebConnCanClose = false;
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Upgraded Web Connection closing Dispatcher Link");
Tr.debug(tc, "close, Upgraded Web Connection closing Dispatcher Link");
}
} else {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Upgraded Web Connection already called close; returning");
Tr.debug(tc, "close EXIT, Upgraded Web Connection already called close; returning");
}
return;
}
Expand All @@ -274,12 +294,16 @@ public void close(VirtualConnection conn, Exception e) {
super.close(conn, e);
} finally {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "decrement active connection count");
Tr.debug(tc, "close, decrement active connection count");
}
this.myChannel.decrementActiveConns();
}
closeCompleted.compareAndSet(false, true);
}

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "close EXIT");
}
}

/*
Expand Down Expand Up @@ -1273,7 +1297,7 @@ public void run() {
}

if (ic.decrementNeeded.compareAndSet(true, false)) {
// ^ set back to false in case close is called more than once after destroy is called (highly unlikely)
// ^ set back to false in case close is called more than once after destroy is called (highly unlikely)
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "decrementNeeded is true: decrement active connection");
}
Expand Down Expand Up @@ -1348,23 +1372,24 @@ public boolean handleHTTP2UpgradeRequest(Map<String, String> http2Settings) {
VirtualConnection vc = link.getVirtualConnection();
H2InboundLink h2Link = new H2InboundLink(channel, vc, getTCPConnectionContext());
boolean bodyReadAndQueued = false;
if(this.isc != null) {
if(this.isc.isIncomingBodyExpected() && !this.isc.isBodyComplete()){
if (this.isc != null) {
if (this.isc.isIncomingBodyExpected() && !this.isc.isBodyComplete()) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Body needed for request. Queueing data locally before upgrade.");
}
HttpInputStreamImpl body = this.request.getBody();
body.setupChannelMultiRead();
byte[] inBytes = new byte[1024];
try{
try {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Starting request read loop.");
}
for (int n; (n = body.read(inBytes)) != -1;) {}
for (int n; (n = body.read(inBytes)) != -1;) {
}
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Finished request read loop.");
}
}catch(Exception e){
} catch (Exception e) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Got exception reading request and queueing up data. Can't handle request upgrade to HTTP2.", e);
}
Expand All @@ -1374,12 +1399,12 @@ public boolean handleHTTP2UpgradeRequest(Map<String, String> http2Settings) {
}
body.setReadFromChannelComplete();
bodyReadAndQueued = true;
}else{
} else {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "No body needed for request. Continuing upgrade as normal.");
}
}
}else {
} else {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Failed to get isc, Null value received which could cause issues expecting data. Continuing upgrade as normal.");
}
Expand All @@ -1398,7 +1423,7 @@ public boolean handleHTTP2UpgradeRequest(Map<String, String> http2Settings) {
// A problem occurred with the connection start up, a trace message will be issued from waitForConnectionInit()
vc.getStateMap().put(h2InitError, true);
}
if(bodyReadAndQueued)
if (bodyReadAndQueued)
isc.setBodyComplete();
return rc;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
/*******************************************************************************
* Copyright (c) 2013 IBM Corporation and others.
* Copyright (c) 2013, 2024 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* IBM Corporation - initial API and implementation
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/
package com.ibm.ws.transport.access;

Expand All @@ -31,4 +28,6 @@ public class TransportConstants {
public static final String UPGRADED_LISTENER = "UpgradedListener";
public static final String CLOSE_UPGRADED_WEBCONNECTION = "CloseUpgradedWebConnection";

//Initial upgrade request data may be read together with the headers before the upgrade.
public static final String NOT_UPGRADED_UNREAD_DATA = "NotUpgradedUnreadData";
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corporation and others.
* Copyright (c) 2014, 2024 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
package com.ibm.ws.webcontainer31.upgrade;

Expand Down Expand Up @@ -48,6 +45,10 @@ public UpgradeReadCallback(ReadListener rl, UpgradeInputByteBufferUtil uIBBU, Th
_upgradeStream = uIBBU;
_contextManager = tcm;
_srtUpgradeStream = srtUpgradeStream;

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "UpgradeReadCallback constructor, ReadListener [" + _rl + "], this " + this);
}
}

/* (non-Javadoc)
Expand All @@ -56,6 +57,9 @@ public UpgradeReadCallback(ReadListener rl, UpgradeInputByteBufferUtil uIBBU, Th
@Override
@FFDCIgnore(IOException.class)
public void complete(VirtualConnection vc, TCPReadRequestContext rsc) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "complete ENTER , ReadListener [" + _rl + "], this " + this);
}

if(vc == null){
return;
Expand Down Expand Up @@ -106,6 +110,10 @@ public void complete(VirtualConnection vc, TCPReadRequestContext rsc) {
_srtUpgradeStream.notify();
}
}

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "complete EXIT ");
}
}
}

Expand Down
Loading

0 comments on commit e3d7006

Please sign in to comment.