graphar_pyspark.graph

Bidnings to org.apache.graphar.graph.

  1# Licensed to the Apache Software Foundation (ASF) under one
  2# or more contributor license agreements.  See the NOTICE file
  3# distributed with this work for additional information
  4# regarding copyright ownership.  The ASF licenses this file
  5# to you under the Apache License, Version 2.0 (the
  6# "License"); you may not use this file except in compliance
  7# with the License.  You may obtain a copy of the License at
  8#
  9#   http://www.apache.org/licenses/LICENSE-2.0
 10#
 11# Unless required by applicable law or agreed to in writing,
 12# software distributed under the License is distributed on an
 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 14# KIND, either express or implied.  See the License for the
 15# specific language governing permissions and limitations
 16# under the License.
 17
 18"""Bidnings to org.apache.graphar.graph."""
 19
 20from __future__ import annotations
 21
 22from collections.abc import Mapping
 23from dataclasses import dataclass
 24from typing import Optional, Union
 25
 26from py4j.java_gateway import JavaObject
 27from pyspark.sql import DataFrame
 28
 29from graphar_pyspark import GraphArSession, _check_session
 30from graphar_pyspark.enums import FileType
 31from graphar_pyspark.errors import InvalidGraphFormatError
 32from graphar_pyspark.info import GraphInfo
 33
 34
 35@dataclass(frozen=True)
 36class EdgeLabels:
 37    """A triplet that describe edge. Contains source, edge and dest labels. Immutable."""
 38
 39    src_label: str
 40    edge_label: str
 41    dst_label: str
 42
 43
 44@dataclass(frozen=True)
 45class GraphReaderResult:
 46    """A simple immutable class, that represent results of reading a graph with GraphReader."""
 47
 48    vertex_dataframes: Mapping[str, DataFrame]
 49    edge_dataframes: Mapping[EdgeLabels, Mapping[str, DataFrame]]
 50
 51    @staticmethod
 52    def from_scala(
 53        jvm_result: tuple[
 54            dict[str, JavaObject],
 55            dict[tuple[str, str, str], dict[str, JavaObject]],
 56        ],
 57    ) -> "GraphReaderResult":
 58        """Create an instance of the Class from JVM method output.
 59
 60        :param jvm_result: structure, returned from JVM.
 61        :returns: instance of Python Class.
 62        """
 63        first_dict = {}
 64        first_scala_map = jvm_result._1()
 65        first_scala_map_iter = first_scala_map.keySet().iterator()
 66
 67        while first_scala_map_iter.hasNext():
 68            k = first_scala_map_iter.next()
 69            first_dict[k] = DataFrame(first_scala_map.get(k).get(), GraphArSession.ss)
 70
 71        second_dict = {}
 72        second_scala_map = jvm_result._2()
 73        second_scala_map_iter = second_scala_map.keySet().iterator()
 74
 75        while second_scala_map_iter.hasNext():
 76            k = second_scala_map_iter.next()
 77            nested_scala_map = second_scala_map.get(k).get()
 78            nested_scala_map_iter = nested_scala_map.keySet().iterator()
 79            inner_dict = {}
 80
 81            while nested_scala_map_iter.hasNext():
 82                kk = nested_scala_map_iter.next()
 83                inner_dict[kk] = DataFrame(
 84                    nested_scala_map.get(kk).get(),
 85                    GraphArSession.ss,
 86                )
 87
 88            second_dict[EdgeLabels(k._1(), k._2(), k._3())] = inner_dict
 89
 90        return GraphReaderResult(
 91            vertex_dataframes=first_dict,
 92            edge_dataframes=second_dict,
 93        )
 94
 95
 96class GraphReader:
 97    """The helper object for reading graph through the definitions of graph info."""
 98
 99    @staticmethod
100    def read(
101        graph_info: Union[GraphInfo, str],
102    ) -> GraphReaderResult:
103        """Read the graph as vertex and edge DataFrames with the graph info yaml file or GraphInfo object.
104
105        :param graph_info: The path of the graph info yaml or GraphInfo instance.
106        :returns: GraphReaderResults, that contains vertex and edge dataframes.
107        """
108        _check_session()
109        if isinstance(graph_info, str):
110            graph_info = GraphInfo.load_graph_info(graph_info)
111
112        jvm_result = GraphArSession.graphar.graph.GraphReader.readWithGraphInfo(
113            graph_info.to_scala(),
114            GraphArSession.jss,
115        )
116        return GraphReaderResult.from_scala(jvm_result)
117
118
119class GraphWriter:
120    """The helper class for writing graph."""
121
122    def __init__(self, jvm_obj: JavaObject) -> None:
123        """One should not use this constructor directly, please use `from_scala` or `from_python`."""
124        _check_session()
125        self._jvm_graph_writer_obj = jvm_obj
126
127    def to_scala(self) -> JavaObject:
128        """Transform object to JVM representation.
129
130        :returns: JavaObject
131        """
132        return self._jvm_graph_writer_obj
133
134    @staticmethod
135    def from_scala(jvm_obj: JavaObject) -> "GraphWriter":
136        """Create an instance of the Class from the corresponding JVM object.
137
138        :param jvm_obj: scala object in JVM.
139        :returns: instance of Python Class.
140        """
141        return GraphWriter(jvm_obj)
142
143    @staticmethod
144    def from_python() -> "GraphWriter":
145        """Create an instance of the Class from Python arguments."""
146        return GraphWriter(GraphArSession.graphar.graph.GraphWriter())
147
148    def put_vertex_data(self, label: str, df: DataFrame, primary_key: str) -> None:
149        """Put the vertex DataFrame into writer.
150
151        :param label: label of vertex.
152        :param df: DataFrame of the vertex type.
153        :param primary_key: primary key of the vertex type, default is empty, which take the first property column as primary key.
154        """
155        self._jvm_graph_writer_obj.PutVertexData(label, df._jdf, primary_key)
156
157    def put_edge_data(self, relation: tuple[str, str, str], df: DataFrame) -> None:
158        """Put the egde datafrme into writer.
159
160        :param relation: 3-Tuple (source label, edge label, target label) to indicate edge type.
161        :param df: data frame of edge type.
162        """
163        relation_jvm = GraphArSession.jvm.scala.Tuple3(
164            relation[0], relation[1], relation[2],
165        )
166        self._jvm_graph_writer_obj.PutEdgeData(relation_jvm, df._jdf)
167
168    def write_with_graph_info(self, graph_info: Union[GraphInfo, str]) -> None:
169        """Write the graph data in GraphAr format with graph info.
170
171        Note: original method is `write` but there is not directly overloading in Python.
172
173        :param graph_info: the graph info object for the graph or the path to graph info object.
174        """
175        if isinstance(graph_info, str):
176            self._jvm_graph_writer_obj.write(graph_info, GraphArSession.jss)
177        else:
178            self._jvm_graph_writer_obj.write(graph_info.to_scala(), GraphArSession.jss)
179
180    def write(
181        self,
182        path: str,
183        name: str = "graph",
184        vertex_chunk_size: Optional[int] = None,
185        edge_chunk_size: Optional[int] = None,
186        file_type: Optional[FileType] = None,
187        version: Optional[str] = None,
188    ) -> None:
189        """Write graph data in GraphAr format.
190
191        Note: for default parameters check org.apache.graphar.GeneralParams;
192        For this method None for any of arguments means that the default value will be used.
193
194        :param path: the directory to write.
195        :param name: the name of graph, default is 'grpah'
196        :param vertex_chunk_size: the chunk size for vertices, default is 2^18
197        :param edge_chunk_size: the chunk size for edges, default is 2^22
198        :param file_type: the file type for data payload file, support [parquet, orc, csv, json], default is parquet.
199        :param version: version of GraphAr format, default is v1.
200        """
201        if vertex_chunk_size is None:
202            vertex_chunk_size = (
203                GraphArSession.graphar.GeneralParams.defaultVertexChunkSize
204            )
205
206        if edge_chunk_size is None:
207            edge_chunk_size = GraphArSession.graphar.GeneralParams.defaultEdgeChunkSize
208
209        file_type = (
210            GraphArSession.graphar.GeneralParams.defaultFileType
211            if file_type is None
212            else file_type.value
213        )
214
215        if version is None:
216            version = GraphArSession.graphar.GeneralParams.defaultVersion
217
218        self._jvm_graph_writer_obj.write(
219            path,
220            GraphArSession.jss,
221            name,
222            vertex_chunk_size,
223            edge_chunk_size,
224            file_type,
225            version,
226        )
227
228
229class GraphTransformer:
230    """The helper object for transforming graphs through the definitions of their infos."""
231
232    @staticmethod
233    def transform(
234        source_graph_info: Union[str, GraphInfo],
235        dest_graph_info: Union[str, GraphInfo],
236    ) -> None:
237        """Transform the graphs following the meta data provided or defined in info files.
238
239        Note: both arguments should be strings or GrapInfo instances! Mixed arguments type is not supported.
240
241        :param source_graph_info: The path of the graph info yaml file for the source graph OR the info object for the source graph.
242        :param dest_graph_info: The path of the graph info yaml file for the destination graph OR the info object for the destination graph.
243        :raise InvalidGraphFormatException: if you pass mixed format of source and dest graph info.
244        """
245        _check_session()
246        if isinstance(source_graph_info, str) and isinstance(dest_graph_info, str):
247            GraphArSession.graphar.graph.GraphTransformer.transform(
248                source_graph_info,
249                dest_graph_info,
250                GraphArSession.jss,
251            )
252        elif isinstance(source_graph_info, GraphInfo) and isinstance(
253            dest_graph_info,
254            GraphInfo,
255        ):
256            GraphArSession.graphar.graph.GraphTransformer.transform(
257                source_graph_info.to_scala(),
258                dest_graph_info.to_scala(),
259                GraphArSession.jss,
260            )
261        else:
262            msg = "Both src and dst graph info objects should be of the same type. "
263            msg += f"But {type(source_graph_info)} and {type(dest_graph_info)} were provided!"
264            raise InvalidGraphFormatError(msg)
@dataclass(frozen=True)
class EdgeLabels:
36@dataclass(frozen=True)
37class EdgeLabels:
38    """A triplet that describe edge. Contains source, edge and dest labels. Immutable."""
39
40    src_label: str
41    edge_label: str
42    dst_label: str

A triplet that describe edge. Contains source, edge and dest labels. Immutable.

EdgeLabels(src_label: str, edge_label: str, dst_label: str)
src_label: str
edge_label: str
dst_label: str
@dataclass(frozen=True)
class GraphReaderResult:
45@dataclass(frozen=True)
46class GraphReaderResult:
47    """A simple immutable class, that represent results of reading a graph with GraphReader."""
48
49    vertex_dataframes: Mapping[str, DataFrame]
50    edge_dataframes: Mapping[EdgeLabels, Mapping[str, DataFrame]]
51
52    @staticmethod
53    def from_scala(
54        jvm_result: tuple[
55            dict[str, JavaObject],
56            dict[tuple[str, str, str], dict[str, JavaObject]],
57        ],
58    ) -> "GraphReaderResult":
59        """Create an instance of the Class from JVM method output.
60
61        :param jvm_result: structure, returned from JVM.
62        :returns: instance of Python Class.
63        """
64        first_dict = {}
65        first_scala_map = jvm_result._1()
66        first_scala_map_iter = first_scala_map.keySet().iterator()
67
68        while first_scala_map_iter.hasNext():
69            k = first_scala_map_iter.next()
70            first_dict[k] = DataFrame(first_scala_map.get(k).get(), GraphArSession.ss)
71
72        second_dict = {}
73        second_scala_map = jvm_result._2()
74        second_scala_map_iter = second_scala_map.keySet().iterator()
75
76        while second_scala_map_iter.hasNext():
77            k = second_scala_map_iter.next()
78            nested_scala_map = second_scala_map.get(k).get()
79            nested_scala_map_iter = nested_scala_map.keySet().iterator()
80            inner_dict = {}
81
82            while nested_scala_map_iter.hasNext():
83                kk = nested_scala_map_iter.next()
84                inner_dict[kk] = DataFrame(
85                    nested_scala_map.get(kk).get(),
86                    GraphArSession.ss,
87                )
88
89            second_dict[EdgeLabels(k._1(), k._2(), k._3())] = inner_dict
90
91        return GraphReaderResult(
92            vertex_dataframes=first_dict,
93            edge_dataframes=second_dict,
94        )

A simple immutable class, that represent results of reading a graph with GraphReader.

GraphReaderResult( vertex_dataframes: collections.abc.Mapping[str, pyspark.sql.dataframe.DataFrame], edge_dataframes: collections.abc.Mapping[EdgeLabels, collections.abc.Mapping[str, pyspark.sql.dataframe.DataFrame]])
vertex_dataframes: collections.abc.Mapping[str, pyspark.sql.dataframe.DataFrame]
edge_dataframes: collections.abc.Mapping[EdgeLabels, collections.abc.Mapping[str, pyspark.sql.dataframe.DataFrame]]
@staticmethod
def from_scala( jvm_result: tuple[dict[str, py4j.java_gateway.JavaObject], dict[tuple[str, str, str], dict[str, py4j.java_gateway.JavaObject]]]) -> GraphReaderResult:
52    @staticmethod
53    def from_scala(
54        jvm_result: tuple[
55            dict[str, JavaObject],
56            dict[tuple[str, str, str], dict[str, JavaObject]],
57        ],
58    ) -> "GraphReaderResult":
59        """Create an instance of the Class from JVM method output.
60
61        :param jvm_result: structure, returned from JVM.
62        :returns: instance of Python Class.
63        """
64        first_dict = {}
65        first_scala_map = jvm_result._1()
66        first_scala_map_iter = first_scala_map.keySet().iterator()
67
68        while first_scala_map_iter.hasNext():
69            k = first_scala_map_iter.next()
70            first_dict[k] = DataFrame(first_scala_map.get(k).get(), GraphArSession.ss)
71
72        second_dict = {}
73        second_scala_map = jvm_result._2()
74        second_scala_map_iter = second_scala_map.keySet().iterator()
75
76        while second_scala_map_iter.hasNext():
77            k = second_scala_map_iter.next()
78            nested_scala_map = second_scala_map.get(k).get()
79            nested_scala_map_iter = nested_scala_map.keySet().iterator()
80            inner_dict = {}
81
82            while nested_scala_map_iter.hasNext():
83                kk = nested_scala_map_iter.next()
84                inner_dict[kk] = DataFrame(
85                    nested_scala_map.get(kk).get(),
86                    GraphArSession.ss,
87                )
88
89            second_dict[EdgeLabels(k._1(), k._2(), k._3())] = inner_dict
90
91        return GraphReaderResult(
92            vertex_dataframes=first_dict,
93            edge_dataframes=second_dict,
94        )

Create an instance of the Class from JVM method output.

Parameters
  • jvm_result: structure, returned from JVM. :returns: instance of Python Class.
class GraphReader:
 97class GraphReader:
 98    """The helper object for reading graph through the definitions of graph info."""
 99
100    @staticmethod
101    def read(
102        graph_info: Union[GraphInfo, str],
103    ) -> GraphReaderResult:
104        """Read the graph as vertex and edge DataFrames with the graph info yaml file or GraphInfo object.
105
106        :param graph_info: The path of the graph info yaml or GraphInfo instance.
107        :returns: GraphReaderResults, that contains vertex and edge dataframes.
108        """
109        _check_session()
110        if isinstance(graph_info, str):
111            graph_info = GraphInfo.load_graph_info(graph_info)
112
113        jvm_result = GraphArSession.graphar.graph.GraphReader.readWithGraphInfo(
114            graph_info.to_scala(),
115            GraphArSession.jss,
116        )
117        return GraphReaderResult.from_scala(jvm_result)

The helper object for reading graph through the definitions of graph info.

@staticmethod
def read( graph_info: Union[graphar_pyspark.info.GraphInfo, str]) -> GraphReaderResult:
100    @staticmethod
101    def read(
102        graph_info: Union[GraphInfo, str],
103    ) -> GraphReaderResult:
104        """Read the graph as vertex and edge DataFrames with the graph info yaml file or GraphInfo object.
105
106        :param graph_info: The path of the graph info yaml or GraphInfo instance.
107        :returns: GraphReaderResults, that contains vertex and edge dataframes.
108        """
109        _check_session()
110        if isinstance(graph_info, str):
111            graph_info = GraphInfo.load_graph_info(graph_info)
112
113        jvm_result = GraphArSession.graphar.graph.GraphReader.readWithGraphInfo(
114            graph_info.to_scala(),
115            GraphArSession.jss,
116        )
117        return GraphReaderResult.from_scala(jvm_result)

Read the graph as vertex and edge DataFrames with the graph info yaml file or GraphInfo object.

Parameters
  • graph_info: The path of the graph info yaml or GraphInfo instance. :returns: GraphReaderResults, that contains vertex and edge dataframes.
class GraphWriter:
120class GraphWriter:
121    """The helper class for writing graph."""
122
123    def __init__(self, jvm_obj: JavaObject) -> None:
124        """One should not use this constructor directly, please use `from_scala` or `from_python`."""
125        _check_session()
126        self._jvm_graph_writer_obj = jvm_obj
127
128    def to_scala(self) -> JavaObject:
129        """Transform object to JVM representation.
130
131        :returns: JavaObject
132        """
133        return self._jvm_graph_writer_obj
134
135    @staticmethod
136    def from_scala(jvm_obj: JavaObject) -> "GraphWriter":
137        """Create an instance of the Class from the corresponding JVM object.
138
139        :param jvm_obj: scala object in JVM.
140        :returns: instance of Python Class.
141        """
142        return GraphWriter(jvm_obj)
143
144    @staticmethod
145    def from_python() -> "GraphWriter":
146        """Create an instance of the Class from Python arguments."""
147        return GraphWriter(GraphArSession.graphar.graph.GraphWriter())
148
149    def put_vertex_data(self, label: str, df: DataFrame, primary_key: str) -> None:
150        """Put the vertex DataFrame into writer.
151
152        :param label: label of vertex.
153        :param df: DataFrame of the vertex type.
154        :param primary_key: primary key of the vertex type, default is empty, which take the first property column as primary key.
155        """
156        self._jvm_graph_writer_obj.PutVertexData(label, df._jdf, primary_key)
157
158    def put_edge_data(self, relation: tuple[str, str, str], df: DataFrame) -> None:
159        """Put the egde datafrme into writer.
160
161        :param relation: 3-Tuple (source label, edge label, target label) to indicate edge type.
162        :param df: data frame of edge type.
163        """
164        relation_jvm = GraphArSession.jvm.scala.Tuple3(
165            relation[0], relation[1], relation[2],
166        )
167        self._jvm_graph_writer_obj.PutEdgeData(relation_jvm, df._jdf)
168
169    def write_with_graph_info(self, graph_info: Union[GraphInfo, str]) -> None:
170        """Write the graph data in GraphAr format with graph info.
171
172        Note: original method is `write` but there is not directly overloading in Python.
173
174        :param graph_info: the graph info object for the graph or the path to graph info object.
175        """
176        if isinstance(graph_info, str):
177            self._jvm_graph_writer_obj.write(graph_info, GraphArSession.jss)
178        else:
179            self._jvm_graph_writer_obj.write(graph_info.to_scala(), GraphArSession.jss)
180
181    def write(
182        self,
183        path: str,
184        name: str = "graph",
185        vertex_chunk_size: Optional[int] = None,
186        edge_chunk_size: Optional[int] = None,
187        file_type: Optional[FileType] = None,
188        version: Optional[str] = None,
189    ) -> None:
190        """Write graph data in GraphAr format.
191
192        Note: for default parameters check org.apache.graphar.GeneralParams;
193        For this method None for any of arguments means that the default value will be used.
194
195        :param path: the directory to write.
196        :param name: the name of graph, default is 'grpah'
197        :param vertex_chunk_size: the chunk size for vertices, default is 2^18
198        :param edge_chunk_size: the chunk size for edges, default is 2^22
199        :param file_type: the file type for data payload file, support [parquet, orc, csv, json], default is parquet.
200        :param version: version of GraphAr format, default is v1.
201        """
202        if vertex_chunk_size is None:
203            vertex_chunk_size = (
204                GraphArSession.graphar.GeneralParams.defaultVertexChunkSize
205            )
206
207        if edge_chunk_size is None:
208            edge_chunk_size = GraphArSession.graphar.GeneralParams.defaultEdgeChunkSize
209
210        file_type = (
211            GraphArSession.graphar.GeneralParams.defaultFileType
212            if file_type is None
213            else file_type.value
214        )
215
216        if version is None:
217            version = GraphArSession.graphar.GeneralParams.defaultVersion
218
219        self._jvm_graph_writer_obj.write(
220            path,
221            GraphArSession.jss,
222            name,
223            vertex_chunk_size,
224            edge_chunk_size,
225            file_type,
226            version,
227        )

The helper class for writing graph.

GraphWriter(jvm_obj: py4j.java_gateway.JavaObject)
123    def __init__(self, jvm_obj: JavaObject) -> None:
124        """One should not use this constructor directly, please use `from_scala` or `from_python`."""
125        _check_session()
126        self._jvm_graph_writer_obj = jvm_obj

One should not use this constructor directly, please use from_scala or from_python.

def to_scala(self) -> py4j.java_gateway.JavaObject:
128    def to_scala(self) -> JavaObject:
129        """Transform object to JVM representation.
130
131        :returns: JavaObject
132        """
133        return self._jvm_graph_writer_obj

Transform object to JVM representation.

:returns: JavaObject

@staticmethod
def from_scala( jvm_obj: py4j.java_gateway.JavaObject) -> GraphWriter:
135    @staticmethod
136    def from_scala(jvm_obj: JavaObject) -> "GraphWriter":
137        """Create an instance of the Class from the corresponding JVM object.
138
139        :param jvm_obj: scala object in JVM.
140        :returns: instance of Python Class.
141        """
142        return GraphWriter(jvm_obj)

Create an instance of the Class from the corresponding JVM object.

Parameters
  • jvm_obj: scala object in JVM. :returns: instance of Python Class.
@staticmethod
def from_python() -> GraphWriter:
144    @staticmethod
145    def from_python() -> "GraphWriter":
146        """Create an instance of the Class from Python arguments."""
147        return GraphWriter(GraphArSession.graphar.graph.GraphWriter())

Create an instance of the Class from Python arguments.

def put_vertex_data( self, label: str, df: pyspark.sql.dataframe.DataFrame, primary_key: str) -> None:
149    def put_vertex_data(self, label: str, df: DataFrame, primary_key: str) -> None:
150        """Put the vertex DataFrame into writer.
151
152        :param label: label of vertex.
153        :param df: DataFrame of the vertex type.
154        :param primary_key: primary key of the vertex type, default is empty, which take the first property column as primary key.
155        """
156        self._jvm_graph_writer_obj.PutVertexData(label, df._jdf, primary_key)

Put the vertex DataFrame into writer.

Parameters
  • label: label of vertex.
  • df: DataFrame of the vertex type.
  • primary_key: primary key of the vertex type, default is empty, which take the first property column as primary key.
def put_edge_data( self, relation: tuple[str, str, str], df: pyspark.sql.dataframe.DataFrame) -> None:
158    def put_edge_data(self, relation: tuple[str, str, str], df: DataFrame) -> None:
159        """Put the egde datafrme into writer.
160
161        :param relation: 3-Tuple (source label, edge label, target label) to indicate edge type.
162        :param df: data frame of edge type.
163        """
164        relation_jvm = GraphArSession.jvm.scala.Tuple3(
165            relation[0], relation[1], relation[2],
166        )
167        self._jvm_graph_writer_obj.PutEdgeData(relation_jvm, df._jdf)

Put the egde datafrme into writer.

Parameters
  • relation: 3-Tuple (source label, edge label, target label) to indicate edge type.
  • df: data frame of edge type.
def write_with_graph_info(self, graph_info: Union[graphar_pyspark.info.GraphInfo, str]) -> None:
169    def write_with_graph_info(self, graph_info: Union[GraphInfo, str]) -> None:
170        """Write the graph data in GraphAr format with graph info.
171
172        Note: original method is `write` but there is not directly overloading in Python.
173
174        :param graph_info: the graph info object for the graph or the path to graph info object.
175        """
176        if isinstance(graph_info, str):
177            self._jvm_graph_writer_obj.write(graph_info, GraphArSession.jss)
178        else:
179            self._jvm_graph_writer_obj.write(graph_info.to_scala(), GraphArSession.jss)

Write the graph data in GraphAr format with graph info.

Note: original method is write but there is not directly overloading in Python.

Parameters
  • graph_info: the graph info object for the graph or the path to graph info object.
def write( self, path: str, name: str = 'graph', vertex_chunk_size: Optional[int] = None, edge_chunk_size: Optional[int] = None, file_type: Optional[graphar_pyspark.enums.FileType] = None, version: Optional[str] = None) -> None:
181    def write(
182        self,
183        path: str,
184        name: str = "graph",
185        vertex_chunk_size: Optional[int] = None,
186        edge_chunk_size: Optional[int] = None,
187        file_type: Optional[FileType] = None,
188        version: Optional[str] = None,
189    ) -> None:
190        """Write graph data in GraphAr format.
191
192        Note: for default parameters check org.apache.graphar.GeneralParams;
193        For this method None for any of arguments means that the default value will be used.
194
195        :param path: the directory to write.
196        :param name: the name of graph, default is 'grpah'
197        :param vertex_chunk_size: the chunk size for vertices, default is 2^18
198        :param edge_chunk_size: the chunk size for edges, default is 2^22
199        :param file_type: the file type for data payload file, support [parquet, orc, csv, json], default is parquet.
200        :param version: version of GraphAr format, default is v1.
201        """
202        if vertex_chunk_size is None:
203            vertex_chunk_size = (
204                GraphArSession.graphar.GeneralParams.defaultVertexChunkSize
205            )
206
207        if edge_chunk_size is None:
208            edge_chunk_size = GraphArSession.graphar.GeneralParams.defaultEdgeChunkSize
209
210        file_type = (
211            GraphArSession.graphar.GeneralParams.defaultFileType
212            if file_type is None
213            else file_type.value
214        )
215
216        if version is None:
217            version = GraphArSession.graphar.GeneralParams.defaultVersion
218
219        self._jvm_graph_writer_obj.write(
220            path,
221            GraphArSession.jss,
222            name,
223            vertex_chunk_size,
224            edge_chunk_size,
225            file_type,
226            version,
227        )

Write graph data in GraphAr format.

Note: for default parameters check org.apache.graphar.GeneralParams; For this method None for any of arguments means that the default value will be used.

Parameters
  • path: the directory to write.
  • name: the name of graph, default is 'grpah'
  • vertex_chunk_size: the chunk size for vertices, default is 2^18
  • edge_chunk_size: the chunk size for edges, default is 2^22
  • file_type: the file type for data payload file, support [parquet, orc, csv, json], default is parquet.
  • version: version of GraphAr format, default is v1.
class GraphTransformer:
230class GraphTransformer:
231    """The helper object for transforming graphs through the definitions of their infos."""
232
233    @staticmethod
234    def transform(
235        source_graph_info: Union[str, GraphInfo],
236        dest_graph_info: Union[str, GraphInfo],
237    ) -> None:
238        """Transform the graphs following the meta data provided or defined in info files.
239
240        Note: both arguments should be strings or GrapInfo instances! Mixed arguments type is not supported.
241
242        :param source_graph_info: The path of the graph info yaml file for the source graph OR the info object for the source graph.
243        :param dest_graph_info: The path of the graph info yaml file for the destination graph OR the info object for the destination graph.
244        :raise InvalidGraphFormatException: if you pass mixed format of source and dest graph info.
245        """
246        _check_session()
247        if isinstance(source_graph_info, str) and isinstance(dest_graph_info, str):
248            GraphArSession.graphar.graph.GraphTransformer.transform(
249                source_graph_info,
250                dest_graph_info,
251                GraphArSession.jss,
252            )
253        elif isinstance(source_graph_info, GraphInfo) and isinstance(
254            dest_graph_info,
255            GraphInfo,
256        ):
257            GraphArSession.graphar.graph.GraphTransformer.transform(
258                source_graph_info.to_scala(),
259                dest_graph_info.to_scala(),
260                GraphArSession.jss,
261            )
262        else:
263            msg = "Both src and dst graph info objects should be of the same type. "
264            msg += f"But {type(source_graph_info)} and {type(dest_graph_info)} were provided!"
265            raise InvalidGraphFormatError(msg)

The helper object for transforming graphs through the definitions of their infos.

@staticmethod
def transform( source_graph_info: Union[str, graphar_pyspark.info.GraphInfo], dest_graph_info: Union[str, graphar_pyspark.info.GraphInfo]) -> None:
233    @staticmethod
234    def transform(
235        source_graph_info: Union[str, GraphInfo],
236        dest_graph_info: Union[str, GraphInfo],
237    ) -> None:
238        """Transform the graphs following the meta data provided or defined in info files.
239
240        Note: both arguments should be strings or GrapInfo instances! Mixed arguments type is not supported.
241
242        :param source_graph_info: The path of the graph info yaml file for the source graph OR the info object for the source graph.
243        :param dest_graph_info: The path of the graph info yaml file for the destination graph OR the info object for the destination graph.
244        :raise InvalidGraphFormatException: if you pass mixed format of source and dest graph info.
245        """
246        _check_session()
247        if isinstance(source_graph_info, str) and isinstance(dest_graph_info, str):
248            GraphArSession.graphar.graph.GraphTransformer.transform(
249                source_graph_info,
250                dest_graph_info,
251                GraphArSession.jss,
252            )
253        elif isinstance(source_graph_info, GraphInfo) and isinstance(
254            dest_graph_info,
255            GraphInfo,
256        ):
257            GraphArSession.graphar.graph.GraphTransformer.transform(
258                source_graph_info.to_scala(),
259                dest_graph_info.to_scala(),
260                GraphArSession.jss,
261            )
262        else:
263            msg = "Both src and dst graph info objects should be of the same type. "
264            msg += f"But {type(source_graph_info)} and {type(dest_graph_info)} were provided!"
265            raise InvalidGraphFormatError(msg)

Transform the graphs following the meta data provided or defined in info files.

Note: both arguments should be strings or GrapInfo instances! Mixed arguments type is not supported.

Parameters
  • source_graph_info: The path of the graph info yaml file for the source graph OR the info object for the source graph.
  • dest_graph_info: The path of the graph info yaml file for the destination graph OR the info object for the destination graph. :raise InvalidGraphFormatException: if you pass mixed format of source and dest graph info.