跳到主要内容

流图计算之增量match原理与应用

问题背景

在流式计算中,数据往往不是全部一批到来,而会源源不断地进行输入和计算,在图计算/图查询领域,也存在类似的场景,图的点边不断地从数据源读取,进行构图,从而形成增量图。在增量图查询中,图随时发生着变化,在不同的图版本中,进行图查询的结果也会有所不同。对于某一次新增的点边,构成了一个新的版本的图,如果重新对全图(即当前所有点边)进行图遍历,开销较大,并且也会和历史数据有重复。由于历史的数据已经计算过一遍,理想情况下,只需要对增量所影响的部分进行计算/查询,而不需要对全图重新进行查询。

GQL(Graph Query Language)是国际标准化组织(ISO)为标准化图查询语言所制定的一个标准,用于在图上执行查询的语言。Apache GeaFlow (Incubating) 是开源的流图计算引擎,专注于处理动态变化的图数据,支持大规模、高并发的实时图计算场景。本文将介绍在 Apache GeaFlow (Incubating) 引擎中,对增量图使用 GQL 进行增量 Match 的方法,目的尽可能地只对增量的数据进行查询,避免冗余的全量计算。

画板

当前问题

Apache GeaFlow (Incubating) 引擎基于点中心框架(vertex center),通过迭代的方式,每一轮迭代中,每个点向其他点发送消息,并在下一轮收到消息时进行处理、分析。在 Geaflow 的框架中,GQL 的查询需要从前往后进行 Traversal 遍历走图,即从起始节点开始出发,进行扩散,依次进行点边匹配,直到匹配到所需要的查询 pattern。在动态图里场景,如果只使用当前批次新增的点边触发计算,增量的结果会有缺失,例如下面例子所示。

画板

如上问题关键在于如果只考虑增量的部分,则点 A1 无法触发计算,但是点 A1 实际包含于增量结果中。所以需要设法让点 A1 参与计算,我们考虑一种从新增点扩充子图的方法,将 a 触发。将整个查询分为 2 个阶段,Evolve 扩展阶段和 Traversal 阶段。在 Evolve 阶段中,从起始点开始,向邻居发送 EvolveMessage,后续的 iteration 中,收到 EvolveMessage 的点加入到 EvolveVertices 集合中。而后的 Traversal 阶段则会使用 EvolveVertices 里的点触发遍历,即表示当前窗口的触发点。

方案步骤

整体流程示例图如下:

  1. 首先得到 query 的计划的迭代次数 N,需向外扩充 N-1 度(maxEvolveIteration=N-1),即可覆盖当前 query。框架的最大迭代数将设置为 N + maxEvolveIteration(N>2)
例如
match(a)迭代数为1,此时不需要Evolve逻辑
match(a)-[e]->(b)迭代数为2,此时不需要Evolve逻辑
match(a)-[e]->(b)->[e2]->(c)迭代数为3 最大迭代数5
  1. 由于当迭代数较大时,扩充子图可能可能扩充到全图,设置一个阈值 T, 当 N<=T 才执行这个增量逻辑。
  2. 在每个 window 数据加入图中后,对于新增的点边,每个点会向邻居发送 EvolveVertexMessage,执行 N-1 次迭代,将 N-1 度子图扩充进来。即当前迭代小于 maxEvolveIteration(N-1)时,发送 EvolveVertexMessage。
  3. 每个点在向邻居点发送 EvolveMessage 时,需要将自己的 id 放在消息中,收到消息的点记录其发送点的 id, 添加到 targetIdList,在后续 traversal 阶段中使用。此步骤作用是下游节点将增量信息反向传递给上游,上游点在进行遍历时可以得知下游的增量影响部分,从而只遍历这些含有动态信息的下游点,而不需要再遍历所有邻居点。

反向扩展的主要逻辑在 GeaFlowDynamicVCTraversalFunction 中,GeaFlowDynamicVCTraversalFunction 继承自 IncVertexCentricFunction,在 Geaflow 中 IncVertexCentricFunction 是一个表示增量 VC 方法(点中心)的接口,在每次迭代中,都会对当前收到消息的点进行触发,执行 compute 方法中的逻辑。

@Override
public void compute(Object vertexId, Iterator<MessageBox> messageIterator) {
TraversalRuntimeContext context = commonFunction.getContext();
if (needIncrTraversal()) {
long iterationId = context.getIterationId();
// sendEvolveMessage to evolve subGraphs when iterationId is less than the plan iteration
if (iterationId < queryMaxIteration - 1) {
evolveIds.add(vertexId);
sendEvolveMessage(vertexId, context);
return;
}

if (iterationId == queryMaxIteration - 1) {
// the current iteration is the end of evolve phase.
evolveIds.add(vertexId);
return;
}
// traversal
commonFunction.compute(vertexId, messageIterator);

} else {
commonFunction.compute(vertexId, messageIterator);
}
}

具体示例如下:

画板

总结进行 Evolve 扩展的条件:

  1. query 的迭代次数>2:当 match 小于两跳时不需要 Evolve。
  2. query 的迭代次数<=Threshold:如果迭代数太多可能扩展到全图。
  3. windowId>1:第一次构图不需要进行 Evolve 阶段。
  4. GQL 语句中没有起始点:如果有起始点,则只需使用起始点计算,不需要扩展子图,例如查询语句 Match(a:person where a.id = 1))return a.name。

Demo 示例

在 Apache GeaFlow (Incubating) 中,通过设置点表或边表的 windowSize 来默认实现增量逻辑,即每一批读入 windowSize 大小的点边数据,来构建增量图。

CREATE GRAPH modern (
Vertex person (
id bigint ID,
name varchar,
age int
),
Edge knows (
srcId bigint SOURCE ID,
targetId bigint DESTINATION ID,
weight double
),
) WITH (
storeType='rocksdb',
shardCount = 1
);

CREATE TABLE modern_vertex (
id varchar,
type varchar,
name varchar,
other varchar
) WITH (
type='file',
geaflow.dsl.file.path = 'resource:///data/incr_modern_vertex.txt',
geaflow.dsl.window.size = 20
);

CREATE TABLE modern_edge (
srcId bigint,
targetId bigint,
type varchar,
weight double
) WITH (
type='file',
geaflow.dsl.file.path = 'resource:///data/incr_modern_edge.txt',
geaflow.dsl.window.size = 3
);

INSERT INTO modern.person
SELECT cast(id as bigint), name, cast(other as int) as age
FROM modern_vertex WHERE type = 'person'
;


INSERT INTO modern.knows
SELECT srcId, targetId, weight
FROM modern_edge WHERE type = 'knows'
;

CREATE TABLE tbl_result (
a_id BIGINT,
b_id BIGINT,
c_id BIGINT,
d_id BIGINT
) WITH (
type='file',
geaflow.dsl.file.path='${target}'
);

USE GRAPH modern;

INSERT INTO tbl_result
SELECT
a_id, b_id, c_id,d_id
FROM (
MATCH (a:person) -[e:knows]->(b:person)<-[e2:knows]-(c:person)<-[e3:knows]-(d:person) where a.id!=c.id
RETURN a.id as a_id,b.id as b_id,c.id as c_id , d.id as d_id
)
;

在 Demo 中,设置点 windowSize 为 20,边 windowSize 为 3,即构图时每个 window 导入 20 个点,3 条边。并执行 3 跳的查询语句。示例 Demo 在 IncrMatchTest.java 中, 可直接运行执行 Demo。

总结和展望

在动态图/流图的场景中,图的点边是在实时变化的,在进行图查询时,对于不同窗口数据的图,我们往往可以根据一些历史信息,只对增量的部分触发计算,来进行增量地计算,避免触发全图的遍历。Apache GeaFlow (Incubating) 使用了一种基于子图扩展的增量 match 方法,应用于点中心分布式图计算框架,在动态图场景下进行增量的查询,未来期望实现更多更复杂场景下的增量匹配逻辑。