← 返回首页
🔌

gRPC架构

📂 architecture ⏱ 2 min 219 words

gRPC架构

Protobuf协议定义

Protocol Buffers定义服务接口和消息格式,实现跨语言高效序列化。

syntax = "proto3";
package userservice;

service UserService {
  rpc GetUser(GetUserRequest) returns (UserResponse);
  rpc ListUsers(ListUsersRequest) returns (stream UserResponse);
  rpc CreateUser(CreateUserRequest) returns (UserResponse);
}

message GetUserRequest {
  int64 id = 1;
}

message UserResponse {
  int64 id = 1;
  string name = 2;
  string email = 3;
}

message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
}

四种通信模式

gRPC支持一元RPC、服务端流、客户端流和双向流。

// 一元RPC
@Service
public class UserGrpcService extends UserServiceGrpc.UserServiceImplBase {

    @Override
    public void getUser(GetUserRequest request,
                        StreamObserver<UserResponse> responseObserver) {
        User user = userService.findById(request.getId());
        responseObserver.onNext(toResponse(user));
        responseObserver.onCompleted();
    }

    // 服务端流
    @Override
    public void listUsers(ListUsersRequest request,
                          StreamObserver<UserResponse> responseObserver) {
        Page<User> users = userService.findAll(request.getPageSize());
        for (User user : users) {
            responseObserver.onNext(toResponse(user));
        }
        responseObserver.onCompleted();
    }
}

服务端配置

@Configuration
public class GrpcServerConfig {

    @Bean
    public Server grpcServer(UserGrpcService userService) {
        return ServerBuilder.forPort(9090)
            .addService(userService)
            .maxInboundMessageSize(1024 * 1024 * 10) // 10MB
            .build();
    }
}

@Configuration
public class GrpcClientConfig {

    @Bean
    public ManagedChannel userChannel() {
        return ManagedChannelBuilder.forAddress("localhost", 9090)
            .usePlaintext()
            .keepAliveTime(30, TimeUnit.SECONDS)
            .build();
    }
}

拦截器与链路追踪

通过拦截器实现认证、日志和分布式追踪。

public class AuthInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
            ServerCall<ReqT, RespT> call,
            Metadata headers,
            ServerCallHandler<ReqT, RespT> next) {

        String token = headers.get(AUTHORIZATION_KEY);
        if (token == null || !validateToken(token)) {
            call.close(Status.UNAUTHENTICATED.withDescription("Invalid token"), new Metadata());
            return new ServerCall.Listener<>() {};
        }
        return next.startCall(call, headers);
    }
}

负载均衡

@Bean
public ManagedChannel loadBalancedChannel() {
    return ManagedChannelBuilder.forTarget("dns:///user-service:9090")
        .defaultLoadBalancingPolicy("round_robin")
        .usePlaintext()
        .build();
}