dremio ConduitServer 简单说明

管理员

ConduitServer 是基于grpc 开发的服务,通过提供的ConduitServiceRegistry 进行定义服务的注册

实现并注册的服务

参考下图,都是grpc 的实现,可以看出包含了不少,比如datasetcatalog,informationschema,option,nessise ,flightservuce,jobresult。。。。

服务的启动

  • 参考处理
 
// 启动,包含了基于tcp以及进程内部的
@Override
  public void start() throws Exception {
    final ConduitServiceRegistryImpl registry = (ConduitServiceRegistryImpl) registryProvider.get();
 
    for (BindableService service : registry.getServiceList()) {
      serverBuilder.addService(service);
      inProcessServerBuilder.addService(service);
    }
 
    final TracingServerInterceptor tracingServerInterceptor = TracingServerInterceptor.newBuilder().withTracer(TracerFacade.INSTANCE.getTracer()).build();
    serverBuilder.intercept(tracingServerInterceptor);
    inProcessServerBuilder.intercept(tracingServerInterceptor);
 
    for (CloseableBindableService closeableService : registry.getCloseableServiceList()) {
      logger.debug("Conduit service being added {}", closeableService.getClass().getName());
      serverBuilder.addService(closeableService);
      inProcessServerBuilder.addService(closeableService);
      closeableServices.add(closeableService);
    }
 
    for (ServerServiceDefinition serverServiceDefinition : registry.getServerServiceDefinitionList()) {
      serverBuilder.addService(serverServiceDefinition);
      inProcessServerBuilder.addService(serverServiceDefinition);
    }
 
 
    serverBuilder.maxInboundMetadataSize(Integer.MAX_VALUE).maxInboundMessageSize(Integer.MAX_VALUE)
      .intercept(TransmitStatusRuntimeExceptionInterceptor.instance());
 
    if (sslEngineFactory.isPresent()) {
      final SslContextBuilder contextBuilder = sslEngineFactory.get().newServerContextBuilder();
      // add gRPC overrides using #configure
      ((NettyServerBuilder)serverBuilder).sslContext(GrpcSslContexts.configure(contextBuilder).build());
    }
    server = serverBuilder.build();
    inProcesServer = inProcessServerBuilder.build();
    server.start();
    inProcesServer.start();
 
    logger.info("ConduitServer is up. Listening on port '{}'", server.getPort());
  }

使用

  • ContextService 中
    会回去信息并包装为一个sabotcontex对象中
 
protected SabotContext newSabotContext() throws Exception{
    if (queryPlannerAllocator == null) {
      throw new IllegalStateException("Context Service has not been #start'ed");
    }
 
    final FabricService fabric = this.fabric.get();
    int conduitPort = -1 ;
   // 获取端口信息
    if (conduitServer.get() != null) {
      conduitPort = conduitServer.get().getPort();
    }
    int userport = -1;
    try {
      userport = userServer.get().getPort();
    } catch(RuntimeException ex){
      if(roles.contains(ClusterCoordinator.Role.COORDINATOR)){
        throw ex;
      }
    }
 
    final SabotConfig sConfig = bootstrapContext.getConfig();
    final String rpcBindAddressOpt = sConfig.getString(ExecConstants.REGISTRATION_ADDRESS);
    final String rpcBindAddress = (rpcBindAddressOpt.trim().isEmpty()) ? fabric.getAddress() : rpcBindAddressOpt;
 
    InetAddress[] iFaces = InetAddress.getAllByName(rpcBindAddress);
    logger.info("IFaces {} bound to the host: {}", Arrays.asList(iFaces).toString(), rpcBindAddress);
 
    final NodeEndpoint.Builder identityBuilder = NodeEndpoint.newBuilder()
      .setAddress(rpcBindAddress)
      .setUserPort(userport)
      .setFabricPort(fabric.getPort())
      .setConduitPort(conduitPort)
      .setStartTime(System.currentTimeMillis())
      .setMaxDirectMemory(VM.getMaxDirectMemory())
      .setAvailableCores(VM.availableProcessors())
      .setRoles(ClusterCoordinator.Role.toEndpointRoles(roles))
      .setDremioVersion(DremioVersionInfo.getVersion())
      .setNodeTag(bootstrapContext.getDremioConfig().getString(DremioConfig.NODE_TAG));
 
    if (engineIdProvider != null && engineIdProvider.get() != null) {
      identityBuilder.setEngineId(engineIdProvider.get());
    }
 
    if (subEngineIdProvider != null && subEngineIdProvider.get() != null) {
      identityBuilder.setSubEngineId(subEngineIdProvider.get());
    }
 
    String containerId = System.getenv("CONTAINER_ID");
    if(containerId != null){
      identityBuilder.setProvisionId(containerId);
    }
 
    final NodeEndpoint identity = identityBuilder.build();
    // 包装放到SabotContext 中
    return new SabotContext(
      bootstrapContext.getDremioConfig(),
      identity,
      sConfig,
      roles,
      bootstrapContext.getClasspathScan(),
      bootstrapContext.getLpPersistance(),
      bootstrapContext.getAllocator(),
      coord.get(),
      resourceInformationProvider.get(),
      workStats,
      kvStoreProvider.get(),
      namespaceServiceFactoryProvider.get(),
      orphanageFactoryProvider.get(),
      datasetListingServiceProvider.get(),
      userService.get(),
      materializationDescriptorProvider,
      queryObserverFactory,
      accelerationManager,
      accelerationListManager,
      catalogService,
      masterCoordinatorConduit.get(),
      informationSchemaStub,
      viewCreatorFactory,
      queryPlannerAllocator,
      spillService,
      connectionReaderProvider,
      credentialsService.get(),
      jobResultInfoProvider.get(),
      optionManagerProvider.get(),
      systemOptionManagerProvider.get(),
      optionValidatorProvider.get(),
      bootstrapContext.getExecutor(),
      coordinatorModeInfoProvider,
      nessieClientProvider,
      statisticsService,
      statisticsAdministrationServiceFactory,
      statisticsListManagerProvider,
      userDefinedFunctionListManagerProvider,
      relMetadataQuerySupplier,
      jobsRunnerProvider,
      datasetCatalogStub,
      globalCredentailsServiceProvider,
      credentialsServiceProvider,
      conduitInProcessChannelProviderProvider,
      sysFlightChannelProviderProvider,
      sourceVerifierProvider
    );
  }
  • dac 模块对于部分服务的使用
    参考调用链,这个是job 的,其他的可以类似分析,可以看看ConduitProviderImpl 一些实现
 
ts=2023-01-13 02:16:58;thread_name=qtp925804538-1592;id=638;is_daemon=false;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
    @com.dremio.service.job.JobsServiceGrpc.getSubscribeToJobEventsMethod()
        at com.dremio.service.job.JobsServiceGrpc$JobsServiceStub.subscribeToJobEvents(JobsServiceGrpc.java:332)
        at com.dremio.service.jobs.JobDataClientUtils.waitForFinalState(JobDataClientUtils.java:173)
        at com.dremio.dac.resource.JobResource.getDataForVersion(JobResource.java:220)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-2)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
        at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)
        at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
        at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:219)
        at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
        at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475)
        at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397)
        at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
        at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
        at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
        at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234)
        at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
        at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
        at org.glassfish

JobsClient 的获取

public JobsClient getJobsClient() {
    // lazily initialized as the endpoint information might not be available in the service set
    if (jobsClient == null) {
      synchronized (this) {
        if (jobsClient == null) {
          // we use a local variable to ensure all blocked threads on this block will only be able to
          // use jobsClient after start() is done
           // 依赖conduitProvider
          final JobsClient client = new JobsClient(grpcFactory, allocator, portProvider, selfEndpoint, conduitProvider);
          client.start();
          jobsClient = client;
        }
      }
    }
    return jobsClient;
  }

JobsServiceBlockingStub grpc 桩的获取

/**
 * Get the blocking stub to make RPC requests to jobs service.
 *
 * @return blocking stub
 */
public JobsServiceGrpc.JobsServiceBlockingStub getBlockingStub() {
  return JobsServiceGrpc.newBlockingStub(conduitProvider.getOrCreateChannel(selfEndpoint.get()));
}

其他模块的使用(可以通过ConduitProviderImpl 返回的chananel 进行不同grpc 服务的调用,基本就包含了上边那些服务定义)

说明

ConduitServer 是基于grpc 开发的的内部服务,方便服务的访问,并没有使用dremio 自己开发的那套rpc 模式,毕竟grpc 开发稳定性以及效率更高

参考资料

services/grpc/src/main/java/com/dremio/service/conduit/server/ConduitServer.java
services/grpc/src/main/java/com/dremio/service/conduit/server/ConduitServiceRegistry.java
sabot/kernel/src/main/java/com/dremio/exec/server/ContextService.java
services/jobs/src/main/java/com/dremio/service/jobs/JobsClient.java
services/grpc/src/main/java/com/dremio/service/conduit/client/ConduitProviderImpl.java
services/jobs/src/main/java/com/dremio/service/jobs/HybridJobsService.java
services/jobs/src/main/java/com/dremio/service/jobs/JobsClient.java