gRPC背压流控、压缩及JSON通信【知识笔记】
目录
一、压缩1.Server端所有方法压缩2.Server单独方法压缩3.Client请求内容压缩二、使用JSON通信1.方法描述使用JSON编译2.JSON编译具体过程三、手动流量控制1.Consuming Side2.Producing Side四、系列文章
本文继续整理gRPC的使用,走查解读官方给出的压缩示例、使用JSON通信以及手动流量控制。
一、压缩
1.Server端所有方法压缩
server = ServerBuilder.forPort(port).intercept(new ServerInterceptor() {@Overridepublic <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) {// @1 在拦截器中设置压缩算法call.setCompression("gzip");return next.startCall(call, headers);}}).addService(new GreeterImpl()).build().start();
备注:如果需要在Server端所有方法进行压缩,可以在ServerInterceptor拦击器中通过setCompression进行设置。
2.Server单独方法压缩
如果不想对所有的方法传输内容压缩,gPRC提供了单独方法的压缩。
int port = 50051;server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start();static classGreeterImplextendsGreeterGrpc.GreeterImplBase{@Overridepublic void sayHello(HelloRequest req, StreamObserver<HelloReply> plainResponseObserver) {ServerCallStreamObserver<HelloReply> responseObserver =(ServerCallStreamObserver<HelloReply>) plainResponseObserver;// @1 对单个方法传输内容进行压缩responseObserver.setCompression("gzip");HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();responseObserver.onNext(reply);responseObserver.onCompleted();}}
备注:单个方法的压缩通过ServerCallStreamObserver的setCompression进行单独设置。
3.Client请求内容压缩
客户端对请求内容进行压缩,下面示例通过gzip进行压缩。
publicvoidgreet(String name) {HelloRequest request = HelloRequest.newBuilder().setName(name).build();HelloReply response;try {// @1 对请求内容设置压缩类型response = blockingStub.withCompression("gzip").sayHello(request);} catch (StatusRuntimeException e) {logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());return;}
二、使用JSON通信
gRPC可以通过Json格式进行通信,虽然并不建议这么做,Json的效率要远低于ProtoBuf。看下示例是如何通过Json格式通信的。
1.方法描述使用JSON编译
对方法的出参和入参使用JSON适配器,示例中通过MethodDescriptor.toBuilder重写出入参数的解析格式。
static final MethodDescriptor<HelloRequest, HelloReply> METHOD_SAY_HELLO =GreeterGrpc.getSayHelloMethod().toBuilder(// @1 请求参数使用JSON编译 JsonMarshaller.jsonMarshaller(HelloRequest.getDefaultInstance()),// @2 返回参数使用JSON编译JsonMarshaller.jsonMarshaller(HelloReply.getDefaultInstance())).build();
2.JSON编译具体过程
既然通过对方法的出入参数编译成JSON格式,看下gRPC是如何做的呢?
public static <T extends Message> Marshaller<T> jsonMarshaller(final T defaultInstance) {final Parser parser = JsonFormat.parser();final Printer printer = JsonFormat.printer();return jsonMarshaller(defaultInstance, parser, printer);}public static <T extends Message> Marshaller<T> jsonMarshaller(final T defaultInstance, final Parser parser, final Printer printer) {final Charset charset = Charset.forName("UTF-8");return new Marshaller<T>() {@Overridepublic InputStream stream(T value) {try {// @1 通过printer.print将出入参数转换为JSON格式return new ByteArrayInputStream(printer.print(value).getBytes(charset));} catch (InvalidProtocolBufferException e) {// ...}}// ....}
备注:在JsonFormat.print方法中进行具体的请求/返回参数转换为JSON的具体实现。
请求转换JSON格式截图

返回转换JSON格式截图

3.Client使用JSON格式的方法描述
public HelloReply sayHello(HelloRequest request) {// @1 使用JSON格式的方法描述METHOD_SAY_HELLOreturn blockingUnaryCall(getChannel(), METHOD_SAY_HELLO, getCallOptions(), request);}
4.Server使用JSON格式的方法描述
public ServerServiceDefinition bindService() {return ServerServiceDefinition.builder(GreeterGrpc.getServiceDescriptor().getName())// @1 使用JSON格式的方法描述METHOD_SAY_HELLO.addMethod(HelloJsonClient.HelloJsonStub.METHOD_SAY_HELLO,asyncUnaryCall(new UnaryMethod<HelloRequest, HelloReply>() {@Overridepublicvoidinvoke(HelloRequest request, StreamObserver<HelloReply> responseObserver) {sayHello(request, responseObserver);}})).build();}
三、手动流量控制
gRPC的流量控制基于HTTP/2的流量控制,即背压模式。关于gRPC和HTTP/2背压模式原理和关系,请看下面摘录。
At the bottom is the HTTP/2's byte-based flow control. HTTP/2 works on streams of bytes and is completely unaware of gRPC messages or reactive streams. By default, the stream consumer allocates a budget of 65536 bytes.The stream producer can send up to this many bytes before backpressure engages. As the consumer reads bytes, WINDOW_UPDATE messages are sent to the producer to increase its send budget.In the middle is the gRPC-Java message-based flow control. gRPC's flow control adapts the stream-based flow control of HTTP/2 to a message-based flow control model.Importantly, gRPC's flow control is aware of how it interacts with HTTP/2 and the network.On producing side, an on-ready handler reads a message, serializes it into bytes using protobuf, and then queues it up for transmission over the HTTP/2 byte stream.If there is insuficient room in the HTTP/2 flow control window to transmit, backpressure engages an no more messages are requested from the producer until space becomes available.On the consuming side, each time a consumer calls request(x), gRPC attempts to read and deserialize x messages from the HTTP/2 stream.Since the size of a protobuf encoded message is variable, there is not a one-to-one correlation between pulling messages from gRPC and pulling bytes over HTTP/2.
1.Consuming Side
publicstaticvoidmain(String[] args) throws InterruptedException, IOException {// @1 Server端服务实现StreamingGreeterGrpc.StreamingGreeterImplBase svc = new StreamingGreeterGrpc.StreamingGreeterImplBase() {@Overridepublic StreamObserver<HelloRequest> sayHelloStreaming(final StreamObserver<HelloReply> responseObserver) {final ServerCallStreamObserver<HelloReply> serverCallStreamObserver =(ServerCallStreamObserver<HelloReply>) responseObserver;// @2 禁止自动流控模式,开启手动流控serverCallStreamObserver.disableAutoInboundFlowControl();// @3 背压模式流控,当消费端有足够空间时将会回调OnReadyHandler// 默认空间大小为65536字节classOnReadyHandlerimplementsRunnable{private boolean wasReady = false;@Overridepublicvoidrun() {if (serverCallStreamObserver.isReady() && !wasReady) {wasReady = true;logger.info("READY");// @4 向HTTP/2流请求读取并解压(x)条消息// 即发信号通知发送端发送继续发消息serverCallStreamObserver.request(1);}}}final OnReadyHandler onReadyHandler = new OnReadyHandler();serverCallStreamObserver.setOnReadyHandler(onReadyHandler);// @5 处理具体进来的请求return new StreamObserver<HelloRequest>() {@OverridepublicvoidonNext(HelloRequest request) {try {String name = request.getName();logger.info("--> " + name);Thread.sleep(100);String message = "Hello " + name;logger.info("<-- " + message);HelloReply reply = HelloReply.newBuilder().setMessage(message).build();// @6 向Client发送请求responseObserver.onNext(reply);if (serverCallStreamObserver.isReady()) {// @7 向HTTP/2流请求读取并解压(x)条消息serverCallStreamObserver.request(1);} else {onReadyHandler.wasReady = false;}} catch (Throwable throwable) {//}}@OverridepublicvoidonError(Throwable t) {t.printStackTrace();responseObserver.onCompleted();}@OverridepublicvoidonCompleted() {logger.info("COMPLETED");responseObserver.onCompleted();}};}};final Server server = ServerBuilder.forPort(50051).addService(svc).build().start();
2.Producing Side
publicstaticvoidmain(String[] args) throws InterruptedException {final CountDownLatch done = new CountDownLatch(1);ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();StreamingGreeterGrpc.StreamingGreeterStub stub = StreamingGreeterGrpc.newStub(channel);ClientResponseObserver<HelloRequest, HelloReply> clientResponseObserver =new ClientResponseObserver<HelloRequest, HelloReply>() {ClientCallStreamObserver<HelloRequest> requestStream;@OverridepublicvoidbeforeStart(final ClientCallStreamObserver<HelloRequest> requestStream) {this.requestStream = requestStream;// @1设置手动流量控制requestStream.disableAutoInboundFlowControl();// @2 当Consumer端有足够空间时自动回调// 序列化protobuf先发送到缓存区(还未到Server端)// Server端需要调用request()向Client拉取消息requestStream.setOnReadyHandler(new Runnable() {Iterator<String> iterator = names().iterator();@Overridepublicvoidrun() {while (requestStream.isReady()) {if (iterator.hasNext()) {String name = iterator.next();logger.info("--> " + name);HelloRequest request = HelloRequest.newBuilder().setName(name).build();// @3 将消息发送到缓存区requestStream.onNext(request);} else {// @4 标记Client发送完成requestStream.onCompleted();}}}});}@OverridepublicvoidonNext(HelloReply value) {// @5 接受Server端返回信息logger.info("<-- " + value.getMessage());// @6 通知Client继续发送requestStream.request(1);}@OverridepublicvoidonError(Throwable t) {t.printStackTrace();done.countDown();}@OverridepublicvoidonCompleted() {logger.info("All Done");done.countDown();}};stream processing.stub.sayHelloStreaming(clientResponseObserver);done.await();channel.shutdown();channel.awaitTermination(1, TimeUnit.SECONDS);}
四、系列文章
「瓜农老梁 学习同行」
赞 (0)

