Skip to main content

Principles and Applications of Incremental Match in Streaming Graph Computing

Problem Background

In streaming computing, data rarely arrives all at once but is continuously input and processed. Similarly, in graph computing/graph querying scenarios, vertices and edges are constantly read from data sources to construct graphs incrementally. In incremental graph queries, the graph evolves continuously, leading to different query results across graph versions. When new vertices/edges form an updated graph version, recomputing through the entire graph incurs high overhead and duplicates historical computations. Since historical data has already been processed, ideally only the delta-affected portions should be computed/queried without full-graph re-execution.

GQL (Graph Query Language) is an international standard developed by ISO for graph query languages, used to execute queries on graphs. Apache GeaFlow (Incubating) is an open-source streaming graph engine, specializing in dynamically changing graph data and supporting large-scale, high-concurrency real-time graph computing scenarios. This article introduces GeaFlow’s approach to incremental GQL-based Match queries on dynamic graphs, aiming to execute queries solely on delta data while avoiding redundant full computations.

Current Challenges

The Apache GeaFlow (Incubating) engine adopts a vertex-centric framework, where each vertex sends messages iteratively. Vertices process received messages in subsequent iterations. For GQL queries, traversal starts from initial vertices for pattern matching (e.g., from node A to B to C). In dynamic graphs, if only newly added vertices/edges trigger computation, results may be incomplete, as illustrated below:

The key issue is that Vertex A1 cannot trigger computation if only the delta is considered, yet it belongs to the incremental results. To resolve this, we propose a subgraph expansion method from new vertices. The query is divided into two phases:

  1. Evolve Phase: Propagate EvolveMessage from new vertices to neighbors, adding recipients to the EvolveVertices set.
  2. Traversal Phase: Use EvolveVertices as traversal triggers for the current window.

Solution Workflow

Overall process:

Steps:

  1. Determine the query’s iteration count N. Expand N-1 hops outward (maxEvolveIteration = N-1) to cover the query. The max iteration becomes N + maxEvolveIteration (when N>2).
For example
match(a) iteration count is 1, no Evolve logic needed at this time
match(a)-[e]->(b) iteration count is 2, no Evolve logic needed at this time
match(a)-[e]->(b)->[e2]->(c) iteration count is 3, maximum iteration count is 5
  1. Set threshold T: Only execute incremental logic when N <= T (to avoid expanding to the full graph).
  2. After new window data is added, vertices send EvolveVertexMessage to neighbors for N-1 iterations.
  3. When sending EvolveMessage, vertices include their ID. Receiving vertices store these IDs in targetIdList for later traversal. This propagates delta information upstream, allowing vertices to traverse only neighbors affected by changes.

Core logic in GeaFlowDynamicVCTraversalFunction:

@Override
public void compute(Object vertexId, Iterator<MessageBox> messageIterator) {
TraversalRuntimeContext context = commonFunction.getContext();
if (needIncrTraversal()) {
long iterationId = context.getIterationId();
// sendEvolveMessage to evolve subGraphs when iterationId is less than the plan iteration
if (iterationId < queryMaxIteration - 1) {
evolveIds.add(vertexId);
sendEvolveMessage(vertexId, context);
return;
}

if (iterationId == queryMaxIteration - 1) {
// the current iteration is the end of evolve phase.
evolveIds.add(vertexId);
return;
}
// traversal
commonFunction.compute(vertexId, messageIterator);

} else {
commonFunction.compute(vertexId, messageIterator);
}
}

Evolve Conditions:

  • Query iterations >2 (no Evolve needed for ≤2 hops).
  • Query iterations ≤ Threshold.
  • windowId >1 (skip initial graph construction).
  • No starting vertex filter in GQL (e.g., Match(a:person where a.id=1) excludes Evolve).

Demo

In Apache GeaFlow (Incubating), configure incremental graphs via windowSize for vertex/edge tables:

CREATE GRAPH modern (
Vertex person (
id bigint ID,
name varchar,
age int
),
Edge knows (
srcId bigint SOURCE ID,
targetId bigint DESTINATION ID,
weight double
),
) WITH (
storeType='rocksdb',
shardCount = 1
);

CREATE TABLE modern_vertex (
id varchar,
type varchar,
name varchar,
other varchar
) WITH (
type='file',
geaflow.dsl.file.path = 'resource:///data/incr_modern_vertex.txt',
geaflow.dsl.window.size = 20
);

CREATE TABLE modern_edge (
srcId bigint,
targetId bigint,
type varchar,
weight double
) WITH (
type='file',
geaflow.dsl.file.path = 'resource:///data/incr_modern_edge.txt',
geaflow.dsl.window.size = 3
);

INSERT INTO modern.person
SELECT cast(id as bigint), name, cast(other as int) as age
FROM modern_vertex WHERE type = 'person'
;


INSERT INTO modern.knows
SELECT srcId, targetId, weight
FROM modern_edge WHERE type = 'knows'
;

CREATE TABLE tbl_result (
a_id BIGINT,
b_id BIGINT,
c_id BIGINT,
d_id BIGINT
) WITH (
type='file',
geaflow.dsl.file.path='${target}'
);

USE GRAPH modern;

INSERT INTO tbl_result
SELECT
a_id, b_id, c_id,d_id
FROM (
MATCH (a:person) -[e:knows]->(b:person)<-[e2:knows]-(c:person)<-[e3:knows]-(d:person) where a.id!=c.id
RETURN a.id as a_id,b.id as b_id,c.id as c_id , d.id as d_id
)
;

In this demo, vertex window size is 20, and edge window size is 3, meaning each window loads 20 vertices and 3 edges. A 3-hop query is executed. The demo is available in IncrMatchTest.java and can be run directly.

Conclusion and Outlook

In dynamic/streaming graph scenarios, graph nodes and edges change in real time. When querying such graphs, we can often trigger computation only on the incremental part using historical information, avoiding full graph traversal. Apache GeaFlow (Incubating) uses a subgraph expansion-based incremental match method, applied within a vertex-centric distributed graph computing framework, to support incremental querying in dynamic graph scenarios. In the future, we aim to implement more complex incremental matching logic for advanced use cases.