Reactive Rpc4J

一个基于reactor-nettyjProtobuf开发的, 遵循 Protobuf RPC协议 实现的 简单响应式RPC框架

reactive-rpc4j-spring-boot-starter (opens new window)

# 引入

        <dependency>
            <groupId>com.github.759434091</groupId>
            <artifactId>reactive-rpc4j-spring-boot-starter</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

# 定义

# request

@Data
@Builder
@ProtobufClass
@NoArgsConstructor
@AllArgsConstructor
public class TestRequest {
    private int id;
}

# response

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ProtobufClass
public class TestResponse {
    private int id;
    private String value;
}

# service

@ProtobufService("testService")
public interface TestService {
    @ProtobufMethod
    Mono<TestResponse> testMethod(Mono<TestRequest> testRequestMono);
}

# 服务端

这里并没有选择暴露和发现分开注解操作, 因为注册接口的时候就已经注定这是暴露还是发现.

# 实现

public class TestServiceImpl implements TestService {
    @Override
    public Mono<TestResponse> testMethod(Mono<TestRequest> testRequestMono) {
        return testRequestMono
                .map(testRequest -> TestResponse.builder()
                        .id(testRequest.getId())
                        .value("hello")
                        .build())
                .log();
    }
}

# 注册

或者@Component@Service

@Configuration
public class ServerConfiguration {
    @Bean
    TestServiceImpl testService() {
        return new TestServiceImpl();
    }
}

# 配置

application.properties

reactive-rpc4j.port=8080

# 客户端

怎么简单怎么来..

# 注册

        TestService testService = RpcServiceClientRegister.register(
                TestService.class,
                "127.0.0.1",
                8000
        );

# 测试

服务端启动服务.

# Client Test

纯异步操作, 使用CountDownLatch避免提前退出.

        CountDownLatch countDownLatch = new CountDownLatch(1);

        TestRequest testRequest = TestRequest.builder()
                .id(1)
                .build();

        testService.testMethod(Mono.just(testRequest))
                .subscribe(testResponse -> {
                    log.info(JSON.toJSONString(testResponse));
                    countDownLatch.countDown();
                });

        countDownLatch.await();

# 日志

  • Server

    2019-04-18 15:59:15.746  INFO 39478 --- [           main] c.g.r.server.ServerTestApplication       : Starting ServerTestApplication on BDSHYF000105534 with PID 39478 (/Users/luxueneng/IdeaProjects/reactive-rpc4j-spring-boot-starter/target/test-classes started by luxueneng in /Users/luxueneng/IdeaProjects/reactive-rpc4j-spring-boot-starter)
    2019-04-18 15:59:15.750  INFO 39478 --- [           main] c.g.r.server.ServerTestApplication       : No active profile set, falling back to default profiles: default
    2019-04-18 15:59:19.394  INFO 39478 --- [           main] c.g.r.server.RpcServer                   : start server at port=8000
    2019-04-18 15:59:36.981  INFO 39478 --- [     loop-nio-2] reactor.Mono.Map.1                       : onSubscribe(FluxMap.MapSubscriber)
    2019-04-18 15:59:36.982  INFO 39478 --- [     loop-nio-2] reactor.Mono.Map.1                       : request(32)
    2019-04-18 15:59:36.984  INFO 39478 --- [     loop-nio-2] reactor.Mono.Map.1                       : onNext(TestResponse(id=1, value=hello))
    2019-04-18 15:59:37.002  INFO 39478 --- [     loop-nio-2] reactor.Mono.Map.1                       : onComplete()
    
  • Client

    15:59:36.886 [main] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@6dbcf214
    15:59:36.902 [reactor-tcp-nio-4] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0x08d21b4c, L:/127.0.0.1:51998 - R:/127.0.0.1:8000] Writing object PooledUnsafeHeapByteBuf(ridx: 0, widx: 47, cap: 47)
    15:59:37.027 [reactor-tcp-nio-4] DEBUG com.github.reactiverpc4jspringbootstarter.handler.DataPackageDecoder - [profiling] decode cost=3ms
    15:59:37.229 [reactor-tcp-nio-4] INFO com.github.reactiverpc4jspringbootstarter.client.ClientApplication - {"id":1,"value":"hello"}