Featured image of post Spark动态加载hive配置的方案

Spark动态加载hive配置的方案

一般来说,Spark写Hive,把xxx-site.xml系列配置文件打进jar包里,或spark-submit指定下file之类,new个HiveContext就完事了。
要写外部集群,也不外乎是换对应的xxx-site.xml,改改thrift服务地址啥的,不费劲。
好了,本文结束。









不对,擅长断更的我不会为此特意写篇博客。
现在的场景是,每次Spark任务启动的时候才能拿到外部Hive集群的配置信息(别问我为什么,问就是中台的需求,很多集群,java应用启动后才能去读到任务配置,反射组装RDD并执行,Hive配置?lazy的,到写入的时候才会去拿)。
这个过程踩了不少坑,试了几种方案,直接说结论吧。

  1. SparkContext创建的时候会创建一个Configuration对象(注意 loadDefaults=true),写入Hive会用到它;而这个Configuration对象里面已经放了常规的那些***-site.xml系列配置文件作为 defaultResources,这时写入Hive相当于按fat-jar里面的配置来了;
  2. 围观Configuration代码,reload配置之后会将defaultResources逐个读出,而defaultResources是个有序的List,那么显然可以用Configuration#addDefaultResource()把外部集群的相关配置xml设置为默认资源,这样拿配置的时候就会拿到外部集群的配置啦!!!
  3. 为了方便配置的读取,直接放在hdfs吧,这样直接Configuration.addDefaultResource("hdfs:///path/to/hive-site.xml")不就可以了吗?诶怎么不行,再围观Configuration代码,可以看到加载默认资源最终用的是Configuration#getResource()方法,这个方法体就一句话:return classLoader.getResource(name);,也就是说,它不会去解析hdfs协议,而是直接从classpath里面去读取。所以不能直接从hdfs读取;
  4. 最后的方案是把配置文件放在hdfs,写入Hive前,把它下载到当前classpath的其中某个目录下(比如classpath包含. 则下载到System.getProperty("user.dir")下),然后Configuration.addDefaultResource("hive-site.xml"),因为Configuration是用ClassLoader进行加载的,所以注意路径没有/
  5. 这就完事了?并不,跑起来会发现还是查询jar包里的hive metastore地址,所以还要解析hive-site.xml,读取出hive.metastore.uris值并放入环境变量中。
  6. 这就完事了?并不,考虑到后续还会有其他写入操作,以及SparkContext.stop()操作,这些操作都会用到Configuration读取配置,然而现在以及有了外部集群的默认资源了,需要删掉,然而Configuration并没有提供删除默认资源的方法,所以这里要手动反射删除之。

最终代码(简化版):

@Slf4j
class WriteExtraHive{
    public static final String HIVE_METASTORE_URIS_KEY = "hive.metastore.uris";
    public static final String BASE_HDFS_PATH = "/path/to/";
    private boolean useSparkSql; //实际的实现是支持走jdbc和走SparkSql,根据是否有hive的配置文件
    private Set<String> extraDefaultResource = new HashSet<>();
    private String hosts; //集群节点,这里只用于区分hdfs的配置路径

    public void write(){
        init(); //加载配置
        write(); //真正写hive
        end(); //移除额外添加的默认资源
    }

    public void init(){
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
        String hiveSiteXmlPath = calHadoopXmlPath(hosts, "hive-site", false);
        useSparkSql = hiveSiteXmlPath != null;
        log.info("hive-site.xml文件({})存在:{}", hiveSiteXmlPath, useSparkSql);
        if (useSparkSql) {
            String hiveMetaStoreUris = parseMetaStoreUri(hiveSiteXmlPath);
            if (StringUtils.isNotEmpty(hiveMetaStoreUris)) {
                log.info("从hive-site.xml文件读取到{}={},并设置到环境变量", HIVE_METASTORE_URIS_KEY, hiveMetaStoreUris);
                System.setProperty(HIVE_METASTORE_URIS_KEY, hiveMetaStoreUris);
                calHadoopXmlPath(hosts, "hive-site", true);
                calHadoopXmlPath(hosts, "hdfs-site", true);
            } else {
                useSparkSql = false;
            }
        }
    }

    private void write(){
        HiveContext hiveContext = new HiveContext(sc); //别问我从哪来的SparkContext,示例代码,随意看看
        DataFrame docDataFrame = hiveContext.createDataFrame(rowRdd, sparkSchema); //rdd和Schema也是,别问
        docDataFrame.write()
                .mode(SaveMode.Overwrite)
                .saveAsTable("xxx.yyy");
    }

    public void end(){
        synchronized (Configuration.class) {
            Configuration tempalte = new Configuration(false);
            CopyOnWriteArrayList<String> defaultResources = TestUtil.getPrivateField(conf, "defaultResources"); //getPrivateField方法如其名,递归父类拿到字段并设可见再读
            if (defaultResources == null) {
                return;
            }
            for (String resource : extraDefaultResource) {
                defaultResources.remove(resource);
            }
            WeakHashMap<Configuration, Object> REGISTRY = TestUtil.getPrivateField(conf, "REGISTRY");
            if (REGISTRY == null) {
                return;
            }
            for (Configuration curConf : REGISTRY.keySet()) {
                Boolean loadDefaults = TestUtil.getPrivateField(curConf, "loadDefaults");
                if (loadDefaults != null && loadDefaults) {
                    curConf.reloadConfiguration();
                }
            }
        }
    }

    private String calHadoopXmlPath(String hosts, String fileName, boolean addToDefaultRs) {
        String hdfsPath = String.format("hdfs://%shive/%s-%s.xml", BASE_HDFS_PATH, hosts, fileName);
        try {
            FileSystem fs = FileSystem.get(new Configuration());
            if (HdfsUtil.isFileExist(hdfsPath, fs)) {
                if (addToDefaultRs) {
                    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
                    URL cpResource = classLoader.getResource("");
                    String cpDir = cpResource != null ? cpResource.getPath() : (System.getProperty("user.dir") + File.separator);
                    String downloadFileName = String.format("%s-%s_%s.xml", hosts, fileName, System.currentTimeMillis()); //实际下载本地的名字
                    String fullDownloadFilePath = cpDir + downloadFileName;
                    log.info("增加Hadoop配置文件:{}到Configuration默认资源,下载到本地:{}", hdfsPath, fullDownloadFilePath);
                    try (OutputStream os = new BufferedOutputStream(new FileOutputStream(fullDownloadFilePath))) {
                        HdfsUtil.copyFileAsStream(hdfsPath, os, fs);
                        Configuration.addDefaultResource(downloadFileName); //加入默认资源
                        extraDefaultResource.add(downloadFileName); //记录加过哪些默认资源,后面要移除
                    } catch (Exception e) {
                        log().error(e.getMessage(), e);
                    }
                    log.info("增加Hadoop配置文件:{}后读取classLoader.getResource({})={}", fileName, downloadFileName, classLoader.getResource(downloadFileName));
                }
                return hdfsPath;
            } else {
                log.info("不存在文件:{}", fileName);
            }
        } catch (Exception e) {
            log.error("get FileSystem fail!", e);
        }
        return null;
    }

    private String parseMetaStoreUri(String hiveSiteXmlPath) {
        Configuration conf = new Configuration(false);
        try {
            conf.addResource(new URL(hiveSiteXmlPath));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return conf.get(HIVE_METASTORE_URIS_KEY);
    }
}
comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy