Spring官方的RSocket Broker其实开发已经非常久了,我以为会伴随着Spring Cloud 2021.0发布的,但是没有发生。不过Spring RSocket Broker还是发布了最新的0.3版本,虽然还是预览版,但目前已经可用,考虑官方还没有提供对应的文档,大家入门做Demo还有些困难,所以这篇文章就是帮你快速入门Spring RSocket Broker,同时解析一下RSocket Broker的特性。
一、Spring RSocket Broker架构
首先让我们看一下Spring RSocket Broker的架构图,如下:
RSocket Broker为一个集群对外提供服务,其主要服务就是应用注册和RSocket请求的转发,集群中的每一个Broker都维护着统一的全局路由表。RSocket Broker有两个监听端口:8001端口主要负责提供对外RSocket服务,如应用到Broker之间的长连接,然后就是该长连接之上的RSocket请求的发送和接收。7001端口主要负责集群内部Broker节点之间的通讯,如同步应用接入的元数据信息,确保全局服务路由表的统一,还包括Broker之间的请求转发,当然Broker之间的通讯协议还是RSocket。
当一个服务应用和Broker建立连接时,会将一些基础信息发送给Broker,对应的属性主要包括:路由节点ID(routeId)、服务名称(sevice-name), tags(标签)。
- 路由节点ID(routeId): 这个是应用到broker创建的长连接的唯一标识,通常是UUID,当然也可以用户自己指定,连接的唯一性会让各个Broker集群的全局路由表处理方便很多。注意routeId是长连接的标识,不是应用的标识,应用的标识是通过tags进行标识的。如果一个应用和broker创建两条长连接,那么就有两个不同的routeId,当然这种情况下每一条连接可以提供不同的服务名称。
- 服务名称:这个其实就是服务的DNS Name,如果其他应用想调用该应用提供的RSocket服务,就需要指定该服务名称。虽然Spring RSocket提供了messageMapping,但是mapping的key是应用内部的,并不能保证全局唯一,只有Service Name + RSocket Mapping Key才能定位指定的服务,这个和http的原理是一致:Mapping Key类似HTTP Path,而Service Name类似域名。
- 标签:标签是用于标识应用提供的RSocket服务,当然RSocket Broker规范也提供了一些缺省的标签,如InstanceName, ClusterName, Region, MajorVersion等。标签不只是服务的元信息,此外还可以参与到服务路由上。如可以设置服务版本、集群名称等,之前开发中老大难的定向路由就可以通过标签轻松解决,如可以给服务打上InstanceName=app-leijuan的标签,然后在调用中设置InsanceName就可以调用某位开发者启动的服务实例。大家不用担心基于Tag的路由性能问题,背后有Roaring BitMap支持,速度非常快。
应用可以向集群中的一个或者几个RSocket Broker节点注册,这个取决于Broker Client的配置,这个稍后我们还会讲到。
当一个应用注册到Broker上后,如果该应用想调用某一RSocket服务,只需要根据Service Name + Routing key就可以向Broker发起RSocket调用请求,然后Broker会根据内部的全局路由表,找到能够提供服务的服务节点(RouteId),然后将请求转发给对应的服务节点,服务节点处理完毕后将response返回给Broker,Broker再将response返回给调用方。
如果当前Broker节点上并没有对应的服务路由接入,这个是Broker会将请求转发给有服务节点的Broker,这个就是请求转发,然后再将那个Broker处理的结果返回给调用方。有同学可能会问,这里可能存在一个调用链死循环的问题,如broker1将请求转发发给broker3,broker3上的服务突然下线也不存在,Broker3可能发回给Broker1,然后broker1再找其他broker发送?这种情况是不会发生的,Broker之间会同步应用上下线信息,所以每一个Broker都维护着集群统一的全局路由表,所以broker1给broker2转发消息,broker2上一定有对应服务的route连接,即便broker2上突发状况,服务对应的route没啦,那么会再转发给有服务的broker,当然如果都没有找到,这个时候请求会被Broker保留(hold)住,在超时后会返回错误。
二、Spring RSocket Broker项目样例
接下来我们就看一个真实的开发样例,三个应用:一个Broker Server,一个服务提供者,一个服务调用者。Spring RSocket Broker对应的开发包已经提交到Maven仓库,大家可以在文末链接查看。
1、RSocket Broker Server
RSocket Broker Server是一个标准的Spring Boot应用,你只需要在Spring Boot应用添加以下依赖:
<dependency>
<groupId>io.rsocket.broker</groupId>
<artifactId>rsocket-broker-spring</artifactId>
<version>0.3.0</version>
</dependency>
然后在application.yaml文件中添加以下配置:
io.rsocket.broker:
uri: tcp://0.0.0.0:8001
cluster.uri: tcp://0.0.0.0:7001
然后启动Spring Boot应用,Broker也就启动啦,并监听7001和8001端口。有同学可能会问,为何不提供一个独立的应用来启动RSocket Broker?这个可能是Spring Cloud项目的出发点相关,和Spring Config Server,Registry Server一样,都是被应用嵌入的,主要是方便开发者定制Broker的功能,如添加Web Console,对接Ops系统等,灵活性会就非常高。
2、RSocket Service Provider
接下来我们再创建一个Spring Boot应用,对外提供RSocket服务,首先添加一下以下依赖:spring-boot-starter-rsocket是标准的,方便Spring Boot应用集成RSocket,另外就是rsocket-broker-client-spring,这个是Broker Spring Client,负责完成和RSocket Broker的对接。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>io.rsocket.broker</groupId>
<artifactId>rsocket-broker-client-spring</artifactId>
<version>0.3.0</version>
</dependency>
然后在服务应用的application.yaml添加以下Broker Client配置,这里要说明service-name,建议采用前面谈到的DNS命名方式,这样可以确保服务名不会冲突。接下来就是brokers的地址列表,如下:
io.rsocket.broker.client:
service-name: com.example.PongService
brokers:
- tcp://localhost:8001
接下来我们还需要写一个RSocket服务,这个就是标准的Spring RSocket,代码如下:
@Controller
public class PongController {
@MessageMapping("pong")
public Mono<String> pong(String ping) {
return Mono.just("hello " + ping);
}
}
然后我们启动该服务应用,就会在RSocket Broker的日志输出中看到该应用注册到Broker的信息,这样RSocket服务就完成了在Broker上的注册。
3、RSocket Service Consumer
接下来我们还要创建一个应用用于调用RSocket服务,和服务应用一样,添加相同的依赖,由于该应用并不对外提供RSocket服务,你将service-name调整为Namespace + 应用名称即可,主要是不要和其他应用不要重名即可,如下:
io.rsocket.broker.client:
service-name: com.example.apps.${spring.application.name}
brokers:
- tcp://localhost:8001
接下来就是编写一个Web Controller访问RSocket服务,只需要注入BrokerRSocketRequester Bean,然后调用RSocket服务,这个和Spring RSocket的RSocketRequester使用方法类似,代码如下:
@RestController
public class PortalController {
@Autowired
private BrokerRSocketRequester requester;
@GetMapping("/")
public Mono<String> index() {
return requester.route("pong")
.address("com.example.PongService")
.data("ping")
.retrieveMono(String.class);
}
}
启动该应用后,你就可以使用curl命令进行测试,就可以看到熟悉的`Hello ping`输出。
你有可能觉得这个客户端调用比较原始, 其实你只要集成一下spring-retrosocket,然后就是你熟悉的Java接口,样例如下:
@RSocketBrokerClient
interface GreetingClient {
@MessageMapping("pong")
@ServiceAddress("com.example.PongService")
Mono<String> pong(String ping);
}
三、Spring RSocket Broker的一些思考
1、RSocket Broker特性
Spring RSocket Broker开发已经挺久了,开发者都是Spring Cloud团队成员,Oleh在Reactive和RSocket方面非常资深,Spencer也是Spring Cloud的核心架构师。Spencer在多个大会场合讲述RSocket给Spring Cloud带来的变化,完全是颠覆性的。从上述的应用样例你也可以看出,不提Reactive全异步的性能,你不再需要服务注册,你也不需要本地启动接听端口,介入Broker转发后混各种云的服务都可以通过Broker进行相互调用。关于RSocket Broker的优点,Spring RSocket Broker有对应的说明,如下:
Routing and forwarding are used to forward RSocket requests between two RSocket connections via broker. In some cases, point-to-point interactions between a client and server are enough, in an enterprise environment, it is useful to decouple the client and server from each other. Some examples of why decoupling is necessary include blue/green deployments, load balancing, A/B testing, feature toggles, etc. Additionally, providing an intermediary can help with security and scalability. Finally, with the load balancing, routing and QoS, better overall application latency and throughput can be achieved than by direct connections.
2、RSocket Broker中直接通讯的解决方案
有同学可能会有疑问,经过RSocket Broker会有一定的性能损失,我这个应用QPS非常高,不能有延迟啊。这样也没有关系,还记得服务应用中添加了 spring-boot-starter-rsocket依赖吗?我们只需要在application.yaml中添加以下配置项就可以打开RSocket的服务监听端口,然后再通过RSocket Broker提供的元信息,然后我们使用RSocketRequester创建到目标服务的连接即可。有一些工作量,但是已经非常小,我们只需要使用reactor-pool自动管理直接连接的连接池就可以。
spring.rsocket.server.transport: tcp
spring.rsocket.server.port: 42252
3、RSocket Broker请求等待
Spring RSocket Broker还有一个特性就是服务上线延迟支持。举一个例子,假设App-1和Service-1都在线上运行正常,突然Service-1的实例都下线啦,这个时候从app-1发出去的请求就找不到目标节点,这个时候该如何处理?
- 拒绝请求,马上返回失败:这个就是我们常说的快速失败的设计,在企业架构设计中使用的比较多。如果是采用同步通讯和Thread Pool模式,你基本上必须使用该设计,不然较长时间的线程堵塞马上让你服务无法响应请求。
- 等待服务上线:就是Broker先保留(hold)请求,然后等服务上线后再转发给上线的服务。这个设计有时非常有用,如在FaaS场景,Gateway上已经验证该函数是存在的,目前只是函数下线,所以触发一下函数上线然后请求等待就可以。此外还有就是网络抖动的问题,会导致连接被中断,这个时候也可以选择等待服务上线。另外还有一个场景就是服务发布,如一个长尾服务只有一个实例,只要你能在3-5秒内将应用重新启动完毕,这个时候broke就可以帮你hold住请求,再配合上客户端的retry,即便只有一个实例,也不会感觉到服务被中断。如在K8S中,将应用的镜像设置为last,然后在设置为获取最新,这样一个redploy button就可以啦。当然这个请求等待时间也不是无限长的,你可以设置一个超时时间,然后返回错误就可以。
回到上述的场景,Broker-1这时候就会保留(hold)住请求,当service-1上线后,请求会马上被转发到上线的节点上处理。和同步通讯不一样,异步的等待对Reactive系统并无系统压力,所以等待服务上线完全是没有问题。当然至于那种模式,你可以根据实际的技术需要进行选择,RSocket Broker同时支持这两个模式。
4、消息广播模型
Spring RSocket Broker还支持消息的广播,也就是将RSocket请求转发给一批服务节点。消息广播主要包括以下模型:
- fireAndForget模型:如配置推送场景,将配置更新请求发给service-name对应的服务列表即可。
- requestResponse模型:将请求发送给多个服务,然后将第一个响应的response返回给调用方,不论是响应的结果是成功还是失败,这个和JavaScript的Promise.race()类似。Promise.race()特性,业务上场景好像不多,为了方便理解这里添加一个timeout,将其转换为:”在指定时间内最快地将结正确的结果返回”。如在数据同步的场景中,数据会同步到多个备份服务器上,但是由于种种原因,可能导致备份服务器上的数据同步出现延迟或者丢失,于是我向多个备份服务器查询数据,并约定如果备份服务器上没有该数据,则在1秒超时后返回异常,这样就可以保证以最快的速度从备份服务器集群上拿到正确数据。当然在实际的架构中,我们会加上一层cache支持,避免同时向服务发起多个请求,浪费的资源也比较多。
- requestStream模型:将请求发送给多个服务方,然后将返回的stream进行汇总,然后将合并的消息流返回给调用方。在监控的场景非常有用,你希望各个服务或应用将5分钟内的日志都汇报上来,然后进行统计,这个时候就非常有帮助。
- Channel模型:连续地向多个服务方的channel发送信息,然后将发出的信息再进行汇总。如你有多个指定要发送出去,然后将各个应用上对指定的响应结果进行汇总,就可以采用这个模型。
从上述的几个模型看下来,基本上可以涵盖众多的广播需求。如公司要你做一个config server进行配置推送,借助RSocket Broker是不是分分钟就能搞定。借助RSocket Broker + Agent完成运维操作、日志采集等,是不是也不麻烦啦。在Prometheus的metrics定时采集场景,只需要发一个指令,就可以收集到所有机器上的metrics,比起向一台台节点发起HTTP请求,这种方式简单很多。
5、Gateway和Broker
大多数的Gateway设计都采用主动连接的方式,也就是Gateway主动去连接上游服务的Proxy模式,当然要连接到上游服务还需要借助服务注册发现,智能DNS等,其中的原理就不赘述啦。而Broker架构则采用被动模式,也就是等待服务连接到Broker上,也就是当服务Ready后,主动连接到Broker就可以,然后基于应用和Broker之间建立的长连接,进行请求转发即可。对比Gateway架构,Broker模式简单很多,内部不用管理上游服务的连接池,不需要服务注册发现,当然对网络也没有特殊的打通要求,混合云的场景也适用等。
RSocket Broker虽然是基于RSocket协议的,但是还可以通过Bridge桥接的方式支持各种协议,如RSocket HTTP Bridge就可以扩展支持HTTP接入。
6、嵌入式的RSocket Broker
回答前面的问题,RSocket Broker是被应用嵌入的,你需要添加对应的依赖和配置,然后启动对应的应用,这个和Spring Config Server等都是类似的,主要是方便开发者扩展Broker对应的特性,和其他系统进行集成。结合前面介绍的RSocket Broker特性,我们通过嵌入RSocket Broker,马上就可以实现一些典型的业务场景:
- Config/Registry Server: 既然应用已经和Broker建立了长连接,元信息也都发送给Broker,所以Registry Server就水到渠成。RSocket Broker支持各种消息广播模型,所以Config Server基本也就绪啦。单个应用的配置推送,基于独立tag推送,基于Service Name整体推送,全部没有问题。
- Web控制台:嵌入Broker后,再开发一个web控制台,这个对Spring Boot来说非常简单。
- Data Gateway:如果你想做一个Data Gateway对外提供数据访问服务,所有data worker节点连接到Broker,然后broker对外提供服务即可。
- Ops系统整合:这个使用Spring Boot整合即可,其他诸如对接入应用的健康度检查等,这个只要发一个消息给应用即可。
- 应用和Broker的优雅上下线:通过推送brokers的配置信息,应用可以连接到新的brokers节点上,完成brokers集群的上下线。应用的上下线,在Broker集群中发一个ROUTE_REMOVE的消息即可,然后应用在3-5后即可下线。
7、Spring RSocket Broker Client
目前RSocket Broker的Client SDK主要包括Java和Node.js,但是其他语言的Broker Client SDK大家也不用担心,Broker Client只是在RSocket SDK的Composite Metadata上添加一个新的 message/x.rsocket.broker.frame.v0 Metadata规范,借助于RSocket多语言SDK,主流语言开发的应用都可以快速接入到Broker上。
四、总结
最后有同学问道RSocket现在成熟了吗?
在Spring生态中,已经非常成熟。RSocket Java SDK由Spring团队开发,Spring RSocket提供了RSocket和Spring的集成,Spring Boot内置rsocket-starter,Spring Cloud Function也添加了RSocket支持,考虑Java开发人员的习惯,还提供spring-retrosocket。其他Spring产品基本都支持了Reactive,所以对接通过Reactive就可以。可以RSocket外围全部就绪啦,大家都在苦等RSocket Broker出现,这样集成和部署就更简单了。此外当前各种的各种服务,如REST API,GraphQL或者RPC框架,迁移到RSocket麻烦吗?如果是Spring体现的,就是添加一个@MessageMapping的事情,然后就可以通过RSocket访问这些服务 。Dubbo/HSF的服务,在Interface添加上spring-retrosocket提供的Annotation,然后就可以通过RSocket协议访问这些服务啦。REST API在Controller基础上添加@MessageMapping就可以。至于GraphQL,不需要任何调整,添加一个新的GraphqlController对接GraphQL底层服务即可。就目前的Spring RSocket Broker特性来说,对于一个中型企业,可以说不用什么调整,完全可以胜任,这个就是Spring Config Server,Spring Registry Server和Spring Cloud Gateway的定位是一样的。
至于Spring Cloud团队一直在讲述RSocket对Spring Cloud和开发体验的影响,相信阅读了这里,你有了自己的判断。但是还是那句老话:“世有伯乐,然后有千里马。千里马常有,而伯乐不常有”。
如果大家想了解更多的技术细节,我在样例项目的的READM.md中进行了说明,有兴趣可以访问项目的Git仓库。项目样例的git仓库地址为:https://github.com/linux-china/spring-rsocket-broker-demo欢迎clone试用。
链接:https://repo1.maven.org/maven2/io/rsocket/broker/
互联网技术实战营·数据智能专题
互联网企业数据运营正在经历从工具化到数据化,从数据化向智能化演进的转变。以数据智能驱动科学运营,洞察用户行为,优化运营策略,进而实现用户的精细化和智能化运营。
本次课程面向关注数据治理与用户增长的互娱、电商、游戏等互联网企业,特邀阿里云的技术专家分享最佳实践和解决方案,您可按需选择想要学习的课程。