AI智能
改变未来

java版gRPC实战之五:双向流


欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

本篇概览

  • 本文是《java版gRPC实战》系列的第五篇,目标是掌握双向流类型的服务,即请求参数是流的形式,响应的内容也是流的形式;
  • 先来看看官方资料对双向流式RPC的介绍:是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留;
  • 掌握了客户端流和服务端流两种类型的开发后,双向流类型就很好理解了,就是之前两种类型的结合体,请求和响应都按照流的方式处理即可;
  • 今天的实战,咱们来设计一个在线商城的功能:批量减扣库存,即客户端提交多个商品和数量,服务端返回每个商品减扣库存成功和失败的情况;
  • 咱们尽快进入编码环节吧,具体内容如下:
  1. 在proto文件中定义双向流类型的gRPC接口,再通过proto生成java代码
  2. 开发服务端应用
  3. 开发客户端应用
  4. 验证

源码下载

  • 本篇实战中的完整源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,《java版gRPC实战》系列的源码在grpc-tutorials文件夹下,如下图红框所示:

  • grpc-tutorials文件夹下有多个目录,本篇文章对应的服务端代码在double-stream-server-side目录下,客户端代码在double-stream-client-side目录下,如下图:

在proto文件中定义双向流类型的gRPC接口

  • 首先要做的就是定义gRPC接口,打开mall.proto,在里面新增方法和相关的数据结构,需要重点关注的是BatchDeduct方法的入参ProductOrder和返回值DeductReply都添加了stream修饰(ProductOrder是上一章定义的),代表该方法是双向流类型:
// gRPC服务,这是个在线商城的库存服务service StockService {// 双向流式:批量扣减库存rpc BatchDeduct (stream ProductOrder) returns (stream DeductReply) {}}// 扣减库存返回结果的数据结构message DeductReply {// 返回码int32 code = 1;// 描述信息string message = 2;}
  • 双击下图红框中的task即可生成java代码:

  • 生成下图红框中的文件,即服务端定义和返回值数据结构:

  • 接下来开发服务端;

开发服务端应用

  • 在父工程grpc-turtorials下面新建名为double-stream-server-side的模块,其build.gradle内容如下:
// 使用springboot插件plugins {id \'org.springframework.boot\'}dependencies {implementation \'org.projectlombok:lombok\'implementation \'org.springframework.boot:spring-boot-starter\'// 作为gRPC服务提供方,需要用到此库implementation \'net.devh:grpc-server-spring-boot-starter\'// 依赖自动生成源码的工程implementation project(\':grpc-lib\')// annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessorannotationProcessor \'org.projectlombok:lombok\'}
  • 配置文件application.yml:
spring:application:name: double-stream-server-side# gRPC有关的配置,这里只需要配置服务端口号grpc:server:port: 9901
  • 启动类DoubleStreamServerSideApplication.java的代码就不贴了,普通的springboot启动类而已;
  • 重点是提供grpc服务的GrpcServerService.java,咱们要做的就是给上层框架返回一个匿名类,至于里面的onNext、onCompleted方法何时被调用是上层框架决定的,另外还准备了成员变量totalCount,这样就可以记录总数了,由于请求参数是流,因此匿名类的onNext会被多次调用,并且由于返回值是流,因此onNext中调用了responseObserver.onNext方法来响应流中的每个请求,这样客户端就不断收到服务端的响应数据(即客户端的onNext方法会被多次调用):
package grpctutorials;import com.bolingcavalry.grpctutorials.lib.DeductReply;import com.bolingcavalry.grpctutorials.lib.ProductOrder;import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;import io.grpc.stub.StreamObserver;import lombok.extern.slf4j.Slf4j;import net.devh.boot.grpc.server.service.GrpcService;@GrpcService@Slf4jpublic class GrpcServerService extends StockServiceGrpc.StockServiceImplBase {@Overridepublic StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductReply> responseObserver) {// 返回匿名类,给上层框架使用return new StreamObserver<ProductOrder>() {private int totalCount = 0;@Overridepublic void onNext(ProductOrder value) {log.info("正在处理商品[{}],数量为[{}]",value.getProductId(),value.getNumber());// 增加总量totalCount += value.getNumber();int code;String message;// 假设单数的都有库存不足的问题if (0 == value.getNumber() % 2) {code = 10000;message = String.format("商品[%d]扣减库存数[%d]成功", value.getProductId(), value.getNumber());} else {code = 10001;message = String.format("商品[%d]扣减库存数[%d]失败", value.getProductId(), value.getNumber());}responseObserver.onNext(DeductReply.newBuilder().setCode(code).setMessage(message).build());}@Overridepublic void onError(Throwable t) {log.error("批量减扣库存异常", t);}@Overridepublic void onCompleted() {log.info("批量减扣库存完成,共计[{}]件商品", totalCount);responseObserver.onCompleted();}};}}

开发客户端应用

  • 在父工程grpc-turtorials下面新建名为double-stream-server-side的模块,其build.gradle内容如下:
plugins {id \'org.springframework.boot\'}dependencies {implementation \'org.projectlombok:lombok\'implementation \'org.springframework.boot:spring-boot-starter\'implementation \'org.springframework.boot:spring-boot-starter-web\'implementation \'net.devh:grpc-client-spring-boot-starter\'implementation project(\':grpc-lib\')}
  • 配置文件application.yml,设置自己的web端口号和服务端地址:
server:port: 8082spring:application:name: double-stream-client-sidegrpc:client:# gRPC配置的名字,GrpcClient注解会用到double-stream-server-side:# gRPC服务端地址address: \'static://127.0.0.1:9901\'enableKeepAlive: truekeepAliveWithoutCalls: truenegotiationType: plaintext
  • 启动类DoubleStreamClientSideApplication.java的代码就不贴了,普通的springboot启动类而已;

  • 正常情况下我们都是用StreamObserver处理服务端响应,这里由于是异步响应,需要额外的方法从StreamObserver中取出业务数据,于是定一个新接口,继承自StreamObserver,新增getExtra方法可以返回String对象,详细的用法稍后会看到:

package com.bolingcavalry.grpctutorials;import io.grpc.stub.StreamObserver;public interface ExtendResponseObserver<T> extends StreamObserver<T> {String getExtra();}
  • 重头戏来了,看看如何远程调用双向流类型的gRPC接口,代码中已经添加详细注释:
package grpctutorials;import com.bolingcavalry.grpctutorials.lib.DeductReply;import com.bolingcavalry.grpctutorials.lib.ProductOrder;import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;import io.grpc.stub.StreamObserver;import lombok.extern.slf4j.Slf4j;import net.devh.boot.grpc.client.inject.GrpcClient;import org.springframework.stereotype.Service;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;@Service@Slf4jpublic class GrpcClientService {@GrpcClient("double-stream-server-side")private StockServiceGrpc.StockServiceStub stockServiceStub;/*** 批量减库存* @param count* @return*/public String batchDeduct(int count) {CountDownLatch countDownLatch = new CountDownLatch(1);// responseObserver的onNext和onCompleted会在另一个线程中被执行,// ExtendResponseObserver继承自StreamObserverExtendResponseObserver<DeductReply> responseObserver = new ExtendResponseObserver<DeductReply>() {// 用stringBuilder保存所有来自服务端的响应private StringBuilder stringBuilder = new StringBuilder();@Overridepublic String getExtra() {return stringBuilder.toString();}/*** 客户端的流式请求期间,每一笔请求都会收到服务端的一个响应,* 对应每个响应,这里的onNext方法都会被执行一次,入参是响应内容* @param value*/@Overridepublic void onNext(DeductReply value) {log.info("batch deduct on next");// 放入匿名类的成员变量中stringBuilder.append(String.format("返回码[%d],返回信息:%s<br>" , value.getCode(), value.getMessage()));}@Overridepublic void onError(Throwable t) {log.error("batch deduct gRPC request error", t);stringBuilder.append("batch deduct gRPC error, " + t.getMessage());countDownLatch.countDown();}/*** 服务端确认响应完成后,这里的onCompleted方法会被调用*/@Overridepublic void onCompleted() {log.info("batch deduct on complete");// 执行了countDown方法后,前面执行countDownLatch.await方法的线程就不再wait了,// 会继续往下执行countDownLatch.countDown();}};// 远程调用,此时数据还没有给到服务端StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver);for(int i=0; i<count; i++) {// 每次执行onNext都会发送一笔数据到服务端,// 服务端的onNext方法都会被执行一次requestObserver.onNext(build(101 + i, 1 + i));}// 客户端告诉服务端:数据已经发完了requestObserver.onCompleted();try {// 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,// 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,// await的超时时间设置为2秒countDownLatch.await(2, TimeUnit.SECONDS);} catch (InterruptedException e) {log.error("countDownLatch await error", e);}log.info("service finish");// 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得return responseObserver.getExtra();}/*** 创建ProductOrder对象* @param productId* @param num* @return*/private static ProductOrder build(int productId, int num) {return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();}}
  • 最后做个web接口,可以通过web请求验证远程调用:
package grpctutorials;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class GrpcClientController {@Autowiredprivate GrpcClientService grpcClientService;@RequestMapping("/")public String printMessage(@RequestParam(defaultValue = "1") int count) {return grpcClientService.batchDeduct(count);}}
  • 编码完成,开始验证;

验证

  • 启动服务端DoubleStreamServerSideApplication:

  • 启动客户端DoubleStreamClientSideApplication:

  • 这里要改:浏览器输入http://localhost:8083/?count=10</font>,响应如下,可见远程调用gRPC服务成功,流式响应的每一笔返回都被客户端收到:

  • 下面是服务端日志,可见逐一处理了客户端的每一笔数据:

  • 下面是客户端日志,可见由于CountDownLatch的作用,发起gRPC请求的线程一直等待responseObserver.onCompleted在另一个线程被执行完后,才会继续执行:

  • 至此,四种类型的gRPC服务及其客户端开发就完成了,一般的业务场景咱们都能应付自如,接下来的文章咱们会继续深入学习,了解复杂场景下的gRPC操作;

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界…https://github.com/zq2599/blog_demos

赞(0) 打赏
未经允许不得转载:爱站程序员基地 » java版gRPC实战之五:双向流