Examples
Co-Work with Apache Spark
Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters. The GraphAr Spark library is developed to make the integration of GraphAr with Spark easy. This library allows users to efficiently generate, load, and transform GraphAr format files, and to integrate GraphAr with other Spark-compatible systems.
Examples of this co-working integration have been provided as showcases.
Examples
Transform GraphAr format files
We provide an example in TestGraphTransformer.scala, which demonstrates how to conduct data transformation at the graph level. TransformExample.scala is another example for graph data conversion between different file types or different adjList types, which is implemented at the vertex/edge table level. To do this, the original data is first loaded into a Spark DataFrame using the GraphAr Spark Reader. Then, the DataFrame is written into generated GraphAr format files through a GraphAr Spark Writer, following the meta data defined in a new information file.
Compute with GraphX
Another important use case of GraphAr is to use it as a data source for graph computing or analytics; ComputeExample.scala provides an example of constructing a GraphX graph from reading GraphAr format files and executing a connected-components computation. Also, executing queries with Spark SQL and running other graph analytic algorithms can be implemented in a similar fashion.
Import/Export graphs of Neo4j
Neo4j graph database provides a spark connector for integration with Spark. The Neo4j Spark Connector, combined with the GraphAr Spark library, enables us to migrate graph data between Neo4j and GraphAr. This is also a key application of GraphAr in which it acts as a persistent storage of graph data in the database.
We provide two example programs that demonstrate how GraphAr can be used in conjunction with Neo4j. It utilizes one of the built-in Neo4j datasets, the Movie Graph, which is a mini graph application containing actors and directors that are related through the movies they have collaborated on. Given some necessary information like the chunk size, the prefix of the file path, and the file type, the program can read the graph data from Neo4j and write it into GraphAr files. When exporting graph data from Neo4j and writing to GraphAr, please refer to the following code, with Neo4j2GraphAr.scala providing a complete example.
def main(args: Array[String]): Unit = {
// connect to the Neo4j instance
val spark = SparkSession.builder()
.appName("Neo4j to GraphAr for Movie Graph")
.config("neo4j.url", "bolt://localhost:7687")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", sys.env.get("Neo4j_USR").get)
.config("neo4j.authentication.basic.password", sys.env.get("Neo4j_PWD").get)
.config("spark.master", "local")
.getOrCreate()
// initialize a graph writer
val writer: GraphWriter = new GraphWriter()
// put movie graph data into writer
readAndPutDataIntoWriter(writer, spark)
// write in GraphAr format
val outputPath: String = args(0)
val vertexChunkSize: Long = args(1).toLong
val edgeChunkSize: Long = args(2).toLong
val fileType: String = args(3)
writer.write(outputPath, spark, "MovieGraph", vertexChunkSize, edgeChunkSize, fileType)
}
The readAndPutDataIntoWriter
method read the vertex and edge from Neo4j to DataFrame
(Refer to Neo4j docs for more details),
and then put then into a GraphAr GraphWriter.
def readAndPutDataIntoWriter(writer: GraphWriter, spark: SparkSession): Unit = {
// read vertices with label "Person" from Neo4j as a DataFrame
val person_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (n:Person) RETURN n.name AS name, n.born as born")
.load()
// put into writer, vertex label is "Person"
writer.PutVertexData("Person", person_df)
// read vertices with label "Movie" from Neo4j as a DataFrame
val movie_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (n:Movie) RETURN n.title AS title, n.tagline as tagline")
.load()
// put into writer, vertex label is "Movie"
writer.PutVertexData("Movie", movie_df)
// read edges with type "Person"->"PRODUCED"->"Movie" from Neo4j as a DataFrame
val produced_edge_df = spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (a:Person)-[r:PRODUCED]->(b:Movie) return a.name as src, b.title as dst")
.load()
// put into writer, source vertex label is "Person", edge label is "PRODUCED"
// target vertex label is "Movie"
writer.PutEdgeData(("Person", "PRODUCED", "Movie"), produced_edge_df)
Finally, the write
method writes the graph data in GraphAr format to the specified path.
Additionally, when importing data from GraphAr to create/update instances in Neo4j, please refer to the following code:
def main(args: Array[String]): Unit = {
// connect to the Neo4j instance
val spark = SparkSession.builder()
.appName("GraphAr to Neo4j for Movie Graph")
.config("neo4j.url", "bolt://localhost:7687")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", sys.env.get("Neo4j_USR").get)
.config("neo4j.authentication.basic.password", sys.env.get("Neo4j_PWD").get)
.config("spark.master", "local")
.getOrCreate()
// path to the graph information file
val graphInfoPath: String = args(0)
val graphInfo = GraphInfo.loadGraphInfo(graphInfoPath, spark)
val graphData = GraphReader.read(graphInfoPath, spark)
val vertexData = graphData._1
val edgeData = graphData._2
putVertexDataIntoNeo4j(graphInfo, vertexData, spark)
putEdgeDataIntoNeo4j(graphInfo, vertexData, edgeData, spark)
}
Pass the graph information file path to loadGraphInfo
method to get the graph information.
Then, read the graph data from GraphAr files with GraphReader
as DataFrame pair,
_1
for vertices and _2
for edges.
The putVertexDataIntoNeo4j
and putEdgeDataIntoNeo4j
methods creates or updates the vertices DataFrame and edges DataFrame in Neo4j.
def putVertexDataIntoNeo4j(graphInfo: GraphInfo,
vertexData: Map[String, DataFrame],
spark: SparkSession): Unit = {
// write each vertex type to Neo4j
vertexData.foreach { case (key, df) => {
val primaryKey = graphInfo.getVertexInfo(key).getPrimaryKey()
// the vertex index column is not needed in Neo4j
// write to Neo4j, refer to https://neo4j.com/docs/spark/current/writing/
df.drop(GeneralParams.vertexIndexCol).write.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", ":" + key)
.option("node.keys", primaryKey)
.save()
}}
}
def putEdgeDataIntoNeo4j(graphInfo: GraphInfo,
vertexData: Map[String, DataFrame],
edgeData: Map[(String, String, String), Map[String, DataFrame]],
spark: SparkSession): Unit = {
// write each edge type to Neo4j
edgeData.foreach { case (key, value) => {
// key is (source vertex label, edge label, target vertex label)
val sourceLabel = key._1
val edgeLabel = key._2
val targetLabel = key._3
val sourcePrimaryKey = graphInfo.getVertexInfo(sourceLabel).getPrimaryKey()
val targetPrimaryKey = graphInfo.getVertexInfo(targetLabel).getPrimaryKey()
val sourceDf = vertexData(sourceLabel)
val targetDf = vertexData(targetLabel)
// convert the source and target index column to the primary key column
val df = Utils.joinEdgesWithVertexPrimaryKey(value.head._2, sourceDf, targetDf, sourcePrimaryKey, targetPrimaryKey) // use the first DataFrame of (adj_list_type_str, DataFrame) map
val properties = if (edgeLabel == "REVIEWED") "rating,summary" else ""
// write to Neo4j, refer to https://neo4j.com/docs/spark/current/writing/
df.write.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("relationship", edgeLabel)
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":" + sourceLabel)
.option("relationship.source.save.mode", "match")
.option("relationship.source.node.keys", "src:" + sourcePrimaryKey)
.option("relationship.target.labels", ":" + targetLabel)
.option("relationship.target.save.mode", "match")
.option("relationship.target.node.keys", "dst:" + targetPrimaryKey)
.option("relationship.properties", properties)
.save()
}}
}
Finally, you will see the graph in Neo4j Browser after running the above code.
See GraphAr2Neo4j.scala for the complete example.
- The Neo4j Spark Connector offers different save modes and writing options, such as Append(CREATE) or Overwrite(MERGE). Please refer to its documentation for more information and take the most appropriate method while using.
- The Neo4j Spark Connector supports to use Spark structured streaming API, which works differently from Spark batching. One can utilize this API to read/write a stream from/to Neo4j, avoiding to maintain all data in the memory.