如何在bulkloadHFile中获取数据触发ES

来源:3-3 HBase是什么

lindy_chan

2021-08-09

我将csv文件导入到hbase 使用ImportTsv生成HFile

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv

已经成功,但是我想使用自定义的协处理器 导入之后触发 postBulkLoadHFile 事件,获取到数据后进行插入ES,达到二级缓存的作用。 因为是hfile的 所以它操作的不是表级别的,触发不到 postPut 这个事件,如果单纯的操作表结构是可以的。

现在的问题是 如何获取到数据


public class HbaseDataSyncEsObserver implements RegionObserver, RegionCoprocessor, BulkLoadObserver{

 @Override
    public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
                                  List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths) {

        Map<String, Object> data = new HashMap<>();
        Map<String, Map<String, Object>> esData = new HashMap<>();

        for (Pair<byte[], String> familyPath : stagingFamilyPaths) {

            String key = Bytes.toString(familyPath.getFirst());
            String value = familyPath.getSecond();

            LOG.info(Bytes.toString(familyPath.getFirst()) + "===JULONG===>" + familyPath.getSecond());
            data.put(key, value);
//            esData.put(key, value);
        }
        esData.put("000", data);

        ElasticSearchUtil.saveEsDataWithBulk(esData, index);
    }

这里获取到的数据 只是 info : HDFS 地址;

写回答

1回答

Michael_PK

2021-08-09

bulkload是将数据以hfile的方式进行操作,这个和es没有关系的呢

你要写入es完全不需要这种,用普通的原生api以batch的方式写入es,完全就OK了。

0
1
lindy_chan
我的数据型式是每几个小时产生的csv文件,所以直接用bulkload 导入速度是比较快的。不过es查询 hbase极慢。 使用普通协处理器 postput插入也是比较慢的。
2021-08-09
共1条回复

Spark进阶 大数据离线与实时项目实战

大数据生态圈实用框架(Spark/Hbase/Redis/Hadoop)整合应用及调优

700 学习 · 190 问题

查看课程