Skip to content

Commit

Permalink
重要更新:增加流量控制——当local不能写时,不从remote度、反向同理。这样就不会出现直接内存溢出的问题了
Browse files Browse the repository at this point in the history
  • Loading branch information
arloor committed Dec 9, 2018
1 parent 78c1f38 commit 3df63a9
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 23 deletions.
4 changes: 2 additions & 2 deletions proxyclient/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.arloor</groupId>
<artifactId>proxyclient</artifactId>
<version>1.1</version>
<version>1.2</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand All @@ -18,7 +18,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
<version>4.1.32.Final</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.slf4j</groupId>-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ public class ProxyConnenctionHandler extends ChannelInboundHandlerAdapter {

private SocketChannel remoteChannel;

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
boolean canWrite = ctx.channel().isWritable();
logger.warn(ctx.channel()+" 可写性:"+canWrite);
//流量控制,不允许继续读
localChannel.config().setAutoRead(canWrite);
super.channelWritabilityChanged(ctx);
}

public ProxyConnenctionHandler(SocketChannel channel) {
this.localChannel = channel;
}
Expand Down Expand Up @@ -88,6 +97,15 @@ protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf
});
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
boolean canWrite = ctx.channel().isWritable();
logger.warn(ctx.channel()+" 可写性:"+canWrite);
//流量控制,不允许继续读
localChannel.config().setAutoRead(canWrite);
super.channelWritabilityChanged(ctx);
}


@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Expand Down
4 changes: 2 additions & 2 deletions proxyserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.arloor</groupId>
<artifactId>proxyserver</artifactId>
<version>1.1</version>
<version>1.2</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand All @@ -18,7 +18,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
<version>4.1.32.Final</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.slf4j</groupId>-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class ProxyConnectionHandler extends ChannelInboundHandlerAdapter {
private EventLoopGroup remoteLoopGroup = ServerProxyBootStrap.REMOTEWORKER;
Expand All @@ -41,33 +43,46 @@ private ProxyConnectionHandler() {
// rejectHosts.add("facebook");
}


@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
boolean canWrite = ctx.channel().isWritable();
logger.warn(ctx.channel()+" 可写性:"+canWrite);
//流量控制,不允许继续读
remoteChannel.config().setAutoRead(canWrite);
super.channelWritabilityChanged(ctx);
}

public void channelRead(ChannelHandlerContext localCtx, Object msg) throws Exception {

HttpRequest request = (HttpRequest) msg;
logger.info("处理请求"+"[客户端:"+localChannel.remoteAddress()+"] " +request );
logger.info("处理请求" + "[客户端:" + localChannel.remoteAddress() + "] " + request);
if (!rejectRequest(request)) {
if(remoteChannel==null){
if (remoteChannel == null) {
if (request.getMethod() != null) {
establishConnectionAndSend(request, localCtx);
}else {
} else {
//可以认为这是非法的不正常的请求,返回一个404响应,省得别人怀疑
logger.error("错误的第一次请求,没有指定host port,关闭此channel");
logger.error("错误content:\n" +new String(request.getRequestBody()));
logger.error("错误content:\n" + new String(request.getRequestBody()));
//在这个时候就不要对响应加密了
localChannel.pipeline().removeFirst();
localChannel.writeAndFlush(Unpooled.wrappedBuffer(HttpResponse.ERROR404())).addListener(future -> {
localChannel.close();
});

}
}else {
} else {
// if (close||!remoteWritable.get()){
// logger.info("阻塞!已经不可写了,你还要写,你是禽兽吗!妈个鸡");
// Thread.sleep(50);
// }
localCtx.fireChannelRead(request);
}
}
}



private void establishConnectionAndSend(HttpRequest request, ChannelHandlerContext localCtx) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(remoteLoopGroup)
Expand All @@ -84,7 +99,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
ChannelFuture future = bootstrap.connect(request.getHost(), request.getPort());
future.addListener(ChannelFutureListener -> {
if (future.isSuccess()) {
logger.info("连接成功: " + request.getHost() + ":" + request.getPort() + (request.getMethod().equals(HttpMethod.CONNECT)?"":request.getPath()));
logger.info("连接成功: " + request.getHost() + ":" + request.getPort() + (request.getMethod().equals(HttpMethod.CONNECT) ? "" : request.getPath()));
localCtx.pipeline().addLast(Send2RemoteAdpterFactory.create(request.getMethod(), remoteChannel));
if (request.getMethod().equals(HttpMethod.CONNECT)) {
localChannel.writeAndFlush(Unpooled.wrappedBuffer(HttpResponse.ESTABLISHED()));
Expand All @@ -103,12 +118,16 @@ protected void initChannel(SocketChannel ch) throws Exception {
/**
* 当本channel被关闭时,及时关闭对应的另一个channel
* 出现异常和正常关闭都会导致本channel被关闭
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if(remoteChannel!=null&&remoteChannel.isActive()){
// logger.info(""+ctx.channel().bytesBeforeWritable());
// logger.info(""+ctx.channel().isWritable());
// logger.info(""+localWritable.get());
if (remoteChannel != null && remoteChannel.isActive()) {
remoteChannel.close().addListener(future -> {
logger.info("browser关闭连接,因此关闭到webserver连接");
});
Expand All @@ -118,50 +137,65 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("出现异常:"+cause.getMessage()+"——"+ctx.channel().remoteAddress());
logger.error("出现异常:" + cause.getMessage() + "——" + ctx.channel().remoteAddress());
logger.error(ExceptionUtil.getMessage(cause));
}

private class SendBack2ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private Logger logger=LoggerFactory.getLogger(SendBack2ClientHandler.class);
private Logger logger = LoggerFactory.getLogger(SendBack2ClientHandler.class);

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
boolean canWrite = ctx.channel().isWritable();
logger.warn(ctx.channel()+" 可写性:"+canWrite);
//流量控制,不允许继续读
localChannel.config().setAutoRead(canWrite);
super.channelWritabilityChanged(ctx);
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
// if (close||!localWritable.get()){
// logger.info("阻塞!已经不可写了,你还要写,你是禽兽吗!妈个鸡");
// Thread.sleep(50);
// }
localChannel.writeAndFlush(byteBuf.retain()).addListener(ChannelFutureListener -> {
logger.info("返回响应 " +byteBuf.writerIndex()+"字节 "+channelHandlerContext.channel().remoteAddress());
logger.info("返回响应 " + byteBuf.writerIndex() + "字节 " + channelHandlerContext.channel().remoteAddress());
});
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("出现异常:"+cause.getMessage()+"——"+ctx.channel());
logger.error("出现异常:" + cause.getMessage() + "——" + ctx.channel());
logger.error(ExceptionUtil.getMessage(cause));
}

/**
* 当本channel被关闭时,及时关闭对应的另一个channel
* 出现异常和正常关闭都会导致本channel被关闭
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if(localChannel!=null&&localChannel.isActive()){
localChannel.close().addListener(future -> {
logger.info("webserver关闭连接,因此关闭到browser连接");
});
}
if (localChannel != null && localChannel.isActive()) {
localChannel.close().addListener(future -> {
logger.info("webserver关闭连接,因此关闭到browser连接");
});
}
super.channelInactive(ctx);
}
}

/**
* 不代理某些网站,原因懂的
*
* @param request
* @return
*/
private boolean rejectRequest(HttpRequest request) {
if(request.getHost()==null){
if (request.getHost() == null) {
return false;
}
for (String rejectHost : rejectHosts
Expand Down
2 changes: 1 addition & 1 deletion proxyserver/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
</appender>

<!-- 日志输出级别 -->
<root level="INFO">
<root level="DEBUG">
<appender-ref ref="FILE" />
<appender-ref ref="STDOUT" />
</root>
Expand Down

0 comments on commit 3df63a9

Please sign in to comment.