背景

客户提了一个bug,前端访问超时。

  1. 先用postman调用超时的接口,发现执行时间大约4min,前端接口默认超时时间是15s;

    2.用arthas排查之后,主要耗时在:有一段逻辑需要给查询到的血缘信息中所有entity添加自定义信息,代码中是循环调用接口获取数据,血缘复杂之后,实体很多,而atlas返回数据有瓶颈。

第一次优化

考虑 for循环guid单个接口比较慢,添加方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public AtlasEntity.AtlasEntitiesWithExtInfo getAtlasEntityByGuids(List<String> guids) {

if (CollectionUtils.isEmpty(guids)){
return new AtlasEntity.AtlasEntitiesWithExtInfo();
}

try {
return atlasClientV2.getEntitiesByGuids(guids);
} catch (AtlasServiceException e) {
log.error(e.getMessage());
return null;
}
}

通过直接传入所有guid的方式,减少http接口调用次数,来获取数据。

将在for循环中单次调用再执行所需要的数据抽离出来,先一次取出,后面处理逻辑直接在list中读数据循环。

重新发布之后,时间缩减为2min20s左右。

第二次优化

通过查看atlas源代码,发现

1
2
3
4
5
6
7
8
for (String guid : guids) {
AtlasVertex vertex = getEntityVertex(guid);

AtlasEntity entity = mapVertexToAtlasEntity(vertex, ret, isMinExtInfo);

ret.addEntity(entity);
}

1
2
3
4
public AtlasVertex getEntityVertex(String guid) throws AtlasBaseException {
AtlasVertex ret = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
return ret;
}

atlas在取到所有guid之后,自身还在for循环单个查询entity。

考虑到以hbase作为存储,优化hbase参数,增加了hbase的线程数等。优化之后,时间缩减为30s

第三次优化

hbase的查询效率应该还可以优化,但是更加细致的参数调优需要时间,只能后面再优化,考虑优化代码。

因为atlas对于一次传递多个guid的处理方式也是通过单个循环的,所以这里考虑在项目中多线程调用单个执行的接口。

  1. 修改atlas web接口的线程数
  2. 并发调用
1
guids.parallelStream().map(atlasUtilService::getAtlasEntityByGuid).collect(Collectors.toList());

到这里,atlas接口调用缩减为2.8s左右。至此,前端调用已经不报错了

最终版

上面使用的parallelStream使用的是公用线程池,考虑使用自定义线程,提高并发执行效率。

1
2
3
4
5
6
try {
entities = forkJoinPool.submit(()->guids.parallelStream().map(atlasUtilService::getAtlasEntityByGuid).collect(Collectors.toList())).get();
} catch (InterruptedException | ExecutionException e) {
log.error("atlas 查询guid{}失败",guids,e);
throw new RuntimeException(e);
}

通过arthas,看到方法执行的调用栈中有两次获取所有数据,方法嵌套的比较深,考虑不修改数据函数签名的前提下,缓存数据。

1
2
private final static ThreadLocal<AtlasEntity.AtlasEntitiesWithExtInfo> LineageRelations = new ThreadLocal<>();

最后代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
AtlasEntity.AtlasEntitiesWithExtInfo atlasEntitiesWithExtInfo = Optional.ofNullable(LineageRelations.get()).orElseGet(()->{
final List<String> guids = atlasLineageInfo.getGuidEntityMap().values().stream()
.map(AtlasEntityHeader::getGuid).collect(Collectors.toList());

int cupNum = Runtime.getRuntime().availableProcessors();
ForkJoinPool forkJoinPool = new ForkJoinPool(cupNum*2);
List<AtlasEntity> entities ;
try {
entities = forkJoinPool.submit(()->guids.parallelStream().map(atlasUtilService::getAtlasEntityByGuid).collect(Collectors.toList())).get();
} catch (InterruptedException | ExecutionException e) {
log.error("atlas 查询guid{}失败",guids,e);
throw new RuntimeException(e);
}
LineageRelations.set(new AtlasEntity.AtlasEntitiesWithExtInfo(entities));
return LineageRelations.get();

});

接口调用平均 0.7s