graphar_pyspark.writer

Bindings to org.apache.graphar.writer.

  1# Licensed to the Apache Software Foundation (ASF) under one
  2# or more contributor license agreements.  See the NOTICE file
  3# distributed with this work for additional information
  4# regarding copyright ownership.  The ASF licenses this file
  5# to you under the Apache License, Version 2.0 (the
  6# "License"); you may not use this file except in compliance
  7# with the License.  You may obtain a copy of the License at
  8#
  9#   http://www.apache.org/licenses/LICENSE-2.0
 10#
 11# Unless required by applicable law or agreed to in writing,
 12# software distributed under the License is distributed on an
 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 14# KIND, either express or implied.  See the License for the
 15# specific language governing permissions and limitations
 16# under the License.
 17
 18"""Bindings to org.apache.graphar.writer."""
 19
 20
 21from __future__ import annotations
 22
 23import os
 24from typing import Optional
 25
 26from py4j.java_gateway import JavaObject
 27from pyspark.sql import DataFrame
 28
 29from graphar_pyspark import GraphArSession, _check_session
 30from graphar_pyspark.enums import AdjListType
 31from graphar_pyspark.info import EdgeInfo, PropertyGroup, VertexInfo
 32
 33
 34class VertexWriter:
 35    """Writer for vertex DataFrame."""
 36
 37    def __init__(
 38        self,
 39        prefix: Optional[str],
 40        vertex_info: Optional[VertexInfo],
 41        vertex_df: Optional[DataFrame],
 42        num_vertices: Optional[int],
 43        jvm_obj: Optional[JavaObject],
 44    ) -> None:
 45        """One should not use this constructor directly, please use `from_scala` or `from_python`."""
 46        _check_session()
 47        if jvm_obj is not None:
 48            self._jvm_vertex_writer_obj = jvm_obj
 49        else:
 50            num_vertices = -1 if num_vertices is None else num_vertices
 51            self._jvm_vertex_writer_obj = GraphArSession.graphar.writer.VertexWriter(
 52                prefix,
 53                vertex_info.to_scala(),
 54                vertex_df._jdf,
 55                num_vertices,
 56            )
 57
 58    def to_scala(self) -> JavaObject:
 59        """Transform object to JVM representation.
 60
 61        :returns: JavaObject
 62        """
 63        return self._jvm_vertex_writer_obj
 64
 65    @staticmethod
 66    def from_scala(jvm_obj: JavaObject) -> "VertexWriter":
 67        """Create an instance of the Class from the corresponding JVM object.
 68
 69        :param jvm_obj: scala object in JVM.
 70        :returns: instance of Python Class.
 71        """
 72        return VertexWriter(None, None, None, None, jvm_obj)
 73
 74    @staticmethod
 75    def from_python(
 76        prefix: str,
 77        vertex_info: VertexInfo,
 78        vertex_df: DataFrame,
 79        num_vertices: Optional[int],
 80    ) -> "VertexWriter":
 81        """Create an instance of the Class from Python arguments.
 82
 83        :param prefix: the absolute prefix.
 84        :param vertex_info: the vertex info that describes the vertex type.
 85        :param vertex_df: the input vertex DataFrame.
 86        :param num_vertices: the number of vertices, optional
 87        """
 88        if not prefix.endswith(os.sep):
 89            prefix += os.sep
 90        return VertexWriter(prefix, vertex_info, vertex_df, num_vertices, None)
 91
 92    def write_vertex_properties(
 93        self,
 94        property_group: Optional[PropertyGroup] = None,
 95    ) -> None:
 96        """Generate chunks of the property group (or all property groups) for vertex DataFrame.
 97
 98        :param property_group: property group (optional, default is None)
 99        if provided, generate chunks of the property group, otherwise generate for all property groups.
100        """
101        if property_group is not None:
102            self._jvm_vertex_writer_obj.writeVertexProperties(property_group.to_scala())
103        else:
104            self._jvm_vertex_writer_obj.writeVertexProperties()
105
106
107class EdgeWriter:
108    """Writer for edge DataFrame."""
109
110    def __init__(
111        self,
112        prefix: Optional[str],
113        edge_info: Optional[EdgeInfo],
114        adj_list_type: Optional[AdjListType],
115        vertex_num: Optional[int],
116        edge_df: Optional[DataFrame],
117        jvm_obj: Optional[JavaObject],
118    ) -> None:
119        """One should not use this constructor directly, please use `from_scala` or `from_python`."""
120        _check_session()
121        if jvm_obj is not None:
122            self._jvm_edge_writer_obj = jvm_obj
123        else:
124            self._jvm_edge_writer_obj = GraphArSession.graphar.writer.EdgeWriter(
125                prefix,
126                edge_info.to_scala(),
127                adj_list_type.to_scala(),
128                vertex_num,
129                edge_df._jdf,
130            )
131
132    def to_scala(self) -> JavaObject:
133        """Transform object to JVM representation.
134
135        :returns: JavaObject
136        """
137        return self._jvm_edge_writer_obj
138
139    @staticmethod
140    def from_scala(jvm_obj: JavaObject) -> "EdgeWriter":
141        """Create an instance of the Class from the corresponding JVM object.
142
143        :param jvm_obj: scala object in JVM.
144        :returns: instance of Python Class.
145        """
146        return EdgeWriter(None, None, None, None, None, jvm_obj)
147
148    @staticmethod
149    def from_python(
150        prefix: str,
151        edge_info: EdgeInfo,
152        adj_list_type: AdjListType,
153        vertex_num: int,
154        edge_df: DataFrame,
155    ) -> "EdgeWriter":
156        """Create an instance of the Class from Python arguments.
157
158        :param prefix: the absolute prefix.
159        :param edge_info: the edge info that describes the ede type.
160        :param adj_list_type: the adj list type for the edge.
161        :param vertex_num: vertex number of the primary vertex label
162        :param edge_df: the input edge DataFrame.
163        """
164        if not prefix.endswith(os.sep):
165            prefix += os.sep
166        return EdgeWriter(prefix, edge_info, adj_list_type, vertex_num, edge_df, None)
167
168    def write_adj_list(self) -> None:
169        """Generate the chunks of AdjList from edge DataFrame for this edge type."""
170        self._jvm_edge_writer_obj.writeAdjList()
171
172    def write_edge_properties(
173        self,
174        property_group: Optional[PropertyGroup] = None,
175    ) -> None:
176        """Generate the chunks of all or selected property groups from edge DataFrame.
177
178        :param property_group: property group (optional, default is None)
179        if provided, generate the chunks of selected property group, otherwise generate for all groups.
180        """
181        if property_group is not None:
182            self._jvm_edge_writer_obj.writeEdgeProperties(property_group.to_scala())
183        else:
184            self._jvm_edge_writer_obj.writeEdgeProperties()
185
186    def write_edges(self) -> None:
187        """Generate the chunks for the AdjList and all property groups from edge."""
188        self._jvm_edge_writer_obj.writeEdges()
class VertexWriter:
 35class VertexWriter:
 36    """Writer for vertex DataFrame."""
 37
 38    def __init__(
 39        self,
 40        prefix: Optional[str],
 41        vertex_info: Optional[VertexInfo],
 42        vertex_df: Optional[DataFrame],
 43        num_vertices: Optional[int],
 44        jvm_obj: Optional[JavaObject],
 45    ) -> None:
 46        """One should not use this constructor directly, please use `from_scala` or `from_python`."""
 47        _check_session()
 48        if jvm_obj is not None:
 49            self._jvm_vertex_writer_obj = jvm_obj
 50        else:
 51            num_vertices = -1 if num_vertices is None else num_vertices
 52            self._jvm_vertex_writer_obj = GraphArSession.graphar.writer.VertexWriter(
 53                prefix,
 54                vertex_info.to_scala(),
 55                vertex_df._jdf,
 56                num_vertices,
 57            )
 58
 59    def to_scala(self) -> JavaObject:
 60        """Transform object to JVM representation.
 61
 62        :returns: JavaObject
 63        """
 64        return self._jvm_vertex_writer_obj
 65
 66    @staticmethod
 67    def from_scala(jvm_obj: JavaObject) -> "VertexWriter":
 68        """Create an instance of the Class from the corresponding JVM object.
 69
 70        :param jvm_obj: scala object in JVM.
 71        :returns: instance of Python Class.
 72        """
 73        return VertexWriter(None, None, None, None, jvm_obj)
 74
 75    @staticmethod
 76    def from_python(
 77        prefix: str,
 78        vertex_info: VertexInfo,
 79        vertex_df: DataFrame,
 80        num_vertices: Optional[int],
 81    ) -> "VertexWriter":
 82        """Create an instance of the Class from Python arguments.
 83
 84        :param prefix: the absolute prefix.
 85        :param vertex_info: the vertex info that describes the vertex type.
 86        :param vertex_df: the input vertex DataFrame.
 87        :param num_vertices: the number of vertices, optional
 88        """
 89        if not prefix.endswith(os.sep):
 90            prefix += os.sep
 91        return VertexWriter(prefix, vertex_info, vertex_df, num_vertices, None)
 92
 93    def write_vertex_properties(
 94        self,
 95        property_group: Optional[PropertyGroup] = None,
 96    ) -> None:
 97        """Generate chunks of the property group (or all property groups) for vertex DataFrame.
 98
 99        :param property_group: property group (optional, default is None)
100        if provided, generate chunks of the property group, otherwise generate for all property groups.
101        """
102        if property_group is not None:
103            self._jvm_vertex_writer_obj.writeVertexProperties(property_group.to_scala())
104        else:
105            self._jvm_vertex_writer_obj.writeVertexProperties()

Writer for vertex DataFrame.

VertexWriter( prefix: Optional[str], vertex_info: Optional[graphar_pyspark.info.VertexInfo], vertex_df: Optional[pyspark.sql.dataframe.DataFrame], num_vertices: Optional[int], jvm_obj: Optional[py4j.java_gateway.JavaObject])
38    def __init__(
39        self,
40        prefix: Optional[str],
41        vertex_info: Optional[VertexInfo],
42        vertex_df: Optional[DataFrame],
43        num_vertices: Optional[int],
44        jvm_obj: Optional[JavaObject],
45    ) -> None:
46        """One should not use this constructor directly, please use `from_scala` or `from_python`."""
47        _check_session()
48        if jvm_obj is not None:
49            self._jvm_vertex_writer_obj = jvm_obj
50        else:
51            num_vertices = -1 if num_vertices is None else num_vertices
52            self._jvm_vertex_writer_obj = GraphArSession.graphar.writer.VertexWriter(
53                prefix,
54                vertex_info.to_scala(),
55                vertex_df._jdf,
56                num_vertices,
57            )

One should not use this constructor directly, please use from_scala or from_python.

def to_scala(self) -> py4j.java_gateway.JavaObject:
59    def to_scala(self) -> JavaObject:
60        """Transform object to JVM representation.
61
62        :returns: JavaObject
63        """
64        return self._jvm_vertex_writer_obj

Transform object to JVM representation.

:returns: JavaObject

@staticmethod
def from_scala( jvm_obj: py4j.java_gateway.JavaObject) -> VertexWriter:
66    @staticmethod
67    def from_scala(jvm_obj: JavaObject) -> "VertexWriter":
68        """Create an instance of the Class from the corresponding JVM object.
69
70        :param jvm_obj: scala object in JVM.
71        :returns: instance of Python Class.
72        """
73        return VertexWriter(None, None, None, None, jvm_obj)

Create an instance of the Class from the corresponding JVM object.

Parameters
  • jvm_obj: scala object in JVM. :returns: instance of Python Class.
@staticmethod
def from_python( prefix: str, vertex_info: graphar_pyspark.info.VertexInfo, vertex_df: pyspark.sql.dataframe.DataFrame, num_vertices: Optional[int]) -> VertexWriter:
75    @staticmethod
76    def from_python(
77        prefix: str,
78        vertex_info: VertexInfo,
79        vertex_df: DataFrame,
80        num_vertices: Optional[int],
81    ) -> "VertexWriter":
82        """Create an instance of the Class from Python arguments.
83
84        :param prefix: the absolute prefix.
85        :param vertex_info: the vertex info that describes the vertex type.
86        :param vertex_df: the input vertex DataFrame.
87        :param num_vertices: the number of vertices, optional
88        """
89        if not prefix.endswith(os.sep):
90            prefix += os.sep
91        return VertexWriter(prefix, vertex_info, vertex_df, num_vertices, None)

Create an instance of the Class from Python arguments.

Parameters
  • prefix: the absolute prefix.
  • vertex_info: the vertex info that describes the vertex type.
  • vertex_df: the input vertex DataFrame.
  • num_vertices: the number of vertices, optional
def write_vertex_properties( self, property_group: Optional[graphar_pyspark.info.PropertyGroup] = None) -> None:
 93    def write_vertex_properties(
 94        self,
 95        property_group: Optional[PropertyGroup] = None,
 96    ) -> None:
 97        """Generate chunks of the property group (or all property groups) for vertex DataFrame.
 98
 99        :param property_group: property group (optional, default is None)
100        if provided, generate chunks of the property group, otherwise generate for all property groups.
101        """
102        if property_group is not None:
103            self._jvm_vertex_writer_obj.writeVertexProperties(property_group.to_scala())
104        else:
105            self._jvm_vertex_writer_obj.writeVertexProperties()

Generate chunks of the property group (or all property groups) for vertex DataFrame.

Parameters
  • property_group: property group (optional, default is None) if provided, generate chunks of the property group, otherwise generate for all property groups.
class EdgeWriter:
108class EdgeWriter:
109    """Writer for edge DataFrame."""
110
111    def __init__(
112        self,
113        prefix: Optional[str],
114        edge_info: Optional[EdgeInfo],
115        adj_list_type: Optional[AdjListType],
116        vertex_num: Optional[int],
117        edge_df: Optional[DataFrame],
118        jvm_obj: Optional[JavaObject],
119    ) -> None:
120        """One should not use this constructor directly, please use `from_scala` or `from_python`."""
121        _check_session()
122        if jvm_obj is not None:
123            self._jvm_edge_writer_obj = jvm_obj
124        else:
125            self._jvm_edge_writer_obj = GraphArSession.graphar.writer.EdgeWriter(
126                prefix,
127                edge_info.to_scala(),
128                adj_list_type.to_scala(),
129                vertex_num,
130                edge_df._jdf,
131            )
132
133    def to_scala(self) -> JavaObject:
134        """Transform object to JVM representation.
135
136        :returns: JavaObject
137        """
138        return self._jvm_edge_writer_obj
139
140    @staticmethod
141    def from_scala(jvm_obj: JavaObject) -> "EdgeWriter":
142        """Create an instance of the Class from the corresponding JVM object.
143
144        :param jvm_obj: scala object in JVM.
145        :returns: instance of Python Class.
146        """
147        return EdgeWriter(None, None, None, None, None, jvm_obj)
148
149    @staticmethod
150    def from_python(
151        prefix: str,
152        edge_info: EdgeInfo,
153        adj_list_type: AdjListType,
154        vertex_num: int,
155        edge_df: DataFrame,
156    ) -> "EdgeWriter":
157        """Create an instance of the Class from Python arguments.
158
159        :param prefix: the absolute prefix.
160        :param edge_info: the edge info that describes the ede type.
161        :param adj_list_type: the adj list type for the edge.
162        :param vertex_num: vertex number of the primary vertex label
163        :param edge_df: the input edge DataFrame.
164        """
165        if not prefix.endswith(os.sep):
166            prefix += os.sep
167        return EdgeWriter(prefix, edge_info, adj_list_type, vertex_num, edge_df, None)
168
169    def write_adj_list(self) -> None:
170        """Generate the chunks of AdjList from edge DataFrame for this edge type."""
171        self._jvm_edge_writer_obj.writeAdjList()
172
173    def write_edge_properties(
174        self,
175        property_group: Optional[PropertyGroup] = None,
176    ) -> None:
177        """Generate the chunks of all or selected property groups from edge DataFrame.
178
179        :param property_group: property group (optional, default is None)
180        if provided, generate the chunks of selected property group, otherwise generate for all groups.
181        """
182        if property_group is not None:
183            self._jvm_edge_writer_obj.writeEdgeProperties(property_group.to_scala())
184        else:
185            self._jvm_edge_writer_obj.writeEdgeProperties()
186
187    def write_edges(self) -> None:
188        """Generate the chunks for the AdjList and all property groups from edge."""
189        self._jvm_edge_writer_obj.writeEdges()

Writer for edge DataFrame.

EdgeWriter( prefix: Optional[str], edge_info: Optional[graphar_pyspark.info.EdgeInfo], adj_list_type: Optional[graphar_pyspark.enums.AdjListType], vertex_num: Optional[int], edge_df: Optional[pyspark.sql.dataframe.DataFrame], jvm_obj: Optional[py4j.java_gateway.JavaObject])
111    def __init__(
112        self,
113        prefix: Optional[str],
114        edge_info: Optional[EdgeInfo],
115        adj_list_type: Optional[AdjListType],
116        vertex_num: Optional[int],
117        edge_df: Optional[DataFrame],
118        jvm_obj: Optional[JavaObject],
119    ) -> None:
120        """One should not use this constructor directly, please use `from_scala` or `from_python`."""
121        _check_session()
122        if jvm_obj is not None:
123            self._jvm_edge_writer_obj = jvm_obj
124        else:
125            self._jvm_edge_writer_obj = GraphArSession.graphar.writer.EdgeWriter(
126                prefix,
127                edge_info.to_scala(),
128                adj_list_type.to_scala(),
129                vertex_num,
130                edge_df._jdf,
131            )

One should not use this constructor directly, please use from_scala or from_python.

def to_scala(self) -> py4j.java_gateway.JavaObject:
133    def to_scala(self) -> JavaObject:
134        """Transform object to JVM representation.
135
136        :returns: JavaObject
137        """
138        return self._jvm_edge_writer_obj

Transform object to JVM representation.

:returns: JavaObject

@staticmethod
def from_scala( jvm_obj: py4j.java_gateway.JavaObject) -> EdgeWriter:
140    @staticmethod
141    def from_scala(jvm_obj: JavaObject) -> "EdgeWriter":
142        """Create an instance of the Class from the corresponding JVM object.
143
144        :param jvm_obj: scala object in JVM.
145        :returns: instance of Python Class.
146        """
147        return EdgeWriter(None, None, None, None, None, jvm_obj)

Create an instance of the Class from the corresponding JVM object.

Parameters
  • jvm_obj: scala object in JVM. :returns: instance of Python Class.
@staticmethod
def from_python( prefix: str, edge_info: graphar_pyspark.info.EdgeInfo, adj_list_type: graphar_pyspark.enums.AdjListType, vertex_num: int, edge_df: pyspark.sql.dataframe.DataFrame) -> EdgeWriter:
149    @staticmethod
150    def from_python(
151        prefix: str,
152        edge_info: EdgeInfo,
153        adj_list_type: AdjListType,
154        vertex_num: int,
155        edge_df: DataFrame,
156    ) -> "EdgeWriter":
157        """Create an instance of the Class from Python arguments.
158
159        :param prefix: the absolute prefix.
160        :param edge_info: the edge info that describes the ede type.
161        :param adj_list_type: the adj list type for the edge.
162        :param vertex_num: vertex number of the primary vertex label
163        :param edge_df: the input edge DataFrame.
164        """
165        if not prefix.endswith(os.sep):
166            prefix += os.sep
167        return EdgeWriter(prefix, edge_info, adj_list_type, vertex_num, edge_df, None)

Create an instance of the Class from Python arguments.

Parameters
  • prefix: the absolute prefix.
  • edge_info: the edge info that describes the ede type.
  • adj_list_type: the adj list type for the edge.
  • vertex_num: vertex number of the primary vertex label
  • edge_df: the input edge DataFrame.
def write_adj_list(self) -> None:
169    def write_adj_list(self) -> None:
170        """Generate the chunks of AdjList from edge DataFrame for this edge type."""
171        self._jvm_edge_writer_obj.writeAdjList()

Generate the chunks of AdjList from edge DataFrame for this edge type.

def write_edge_properties( self, property_group: Optional[graphar_pyspark.info.PropertyGroup] = None) -> None:
173    def write_edge_properties(
174        self,
175        property_group: Optional[PropertyGroup] = None,
176    ) -> None:
177        """Generate the chunks of all or selected property groups from edge DataFrame.
178
179        :param property_group: property group (optional, default is None)
180        if provided, generate the chunks of selected property group, otherwise generate for all groups.
181        """
182        if property_group is not None:
183            self._jvm_edge_writer_obj.writeEdgeProperties(property_group.to_scala())
184        else:
185            self._jvm_edge_writer_obj.writeEdgeProperties()

Generate the chunks of all or selected property groups from edge DataFrame.

Parameters
  • property_group: property group (optional, default is None) if provided, generate the chunks of selected property group, otherwise generate for all groups.
def write_edges(self) -> None:
187    def write_edges(self) -> None:
188        """Generate the chunks for the AdjList and all property groups from edge."""
189        self._jvm_edge_writer_obj.writeEdges()

Generate the chunks for the AdjList and all property groups from edge.