parquet.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. """ parquet compat """
  2. from typing import Any, Dict, Optional
  3. from warnings import catch_warnings
  4. from pandas.compat._optional import import_optional_dependency
  5. from pandas.errors import AbstractMethodError
  6. from pandas import DataFrame, get_option
  7. from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url
  8. def get_engine(engine: str) -> "BaseImpl":
  9. """ return our implementation """
  10. if engine == "auto":
  11. engine = get_option("io.parquet.engine")
  12. if engine == "auto":
  13. # try engines in this order
  14. try:
  15. return PyArrowImpl()
  16. except ImportError:
  17. pass
  18. try:
  19. return FastParquetImpl()
  20. except ImportError:
  21. pass
  22. raise ImportError(
  23. "Unable to find a usable engine; "
  24. "tried using: 'pyarrow', 'fastparquet'.\n"
  25. "pyarrow or fastparquet is required for parquet "
  26. "support"
  27. )
  28. if engine == "pyarrow":
  29. return PyArrowImpl()
  30. elif engine == "fastparquet":
  31. return FastParquetImpl()
  32. raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")
  33. class BaseImpl:
  34. @staticmethod
  35. def validate_dataframe(df: DataFrame):
  36. if not isinstance(df, DataFrame):
  37. raise ValueError("to_parquet only supports IO with DataFrames")
  38. # must have value column names (strings only)
  39. if df.columns.inferred_type not in {"string", "unicode", "empty"}:
  40. raise ValueError("parquet must have string column names")
  41. # index level names must be strings
  42. valid_names = all(
  43. isinstance(name, str) for name in df.index.names if name is not None
  44. )
  45. if not valid_names:
  46. raise ValueError("Index level names must be strings")
  47. def write(self, df: DataFrame, path, compression, **kwargs):
  48. raise AbstractMethodError(self)
  49. def read(self, path, columns=None, **kwargs):
  50. raise AbstractMethodError(self)
  51. class PyArrowImpl(BaseImpl):
  52. def __init__(self):
  53. import_optional_dependency(
  54. "pyarrow", extra="pyarrow is required for parquet support."
  55. )
  56. import pyarrow.parquet
  57. # import utils to register the pyarrow extension types
  58. import pandas.core.arrays._arrow_utils # noqa
  59. self.api = pyarrow
  60. def write(
  61. self,
  62. df: DataFrame,
  63. path,
  64. compression="snappy",
  65. coerce_timestamps="ms",
  66. index: Optional[bool] = None,
  67. partition_cols=None,
  68. **kwargs,
  69. ):
  70. self.validate_dataframe(df)
  71. path, _, _, _ = get_filepath_or_buffer(path, mode="wb")
  72. from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)}
  73. if index is not None:
  74. from_pandas_kwargs["preserve_index"] = index
  75. table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
  76. if partition_cols is not None:
  77. self.api.parquet.write_to_dataset(
  78. table,
  79. path,
  80. compression=compression,
  81. coerce_timestamps=coerce_timestamps,
  82. partition_cols=partition_cols,
  83. **kwargs,
  84. )
  85. else:
  86. self.api.parquet.write_table(
  87. table,
  88. path,
  89. compression=compression,
  90. coerce_timestamps=coerce_timestamps,
  91. **kwargs,
  92. )
  93. def read(self, path, columns=None, **kwargs):
  94. path, _, _, should_close = get_filepath_or_buffer(path)
  95. kwargs["use_pandas_metadata"] = True
  96. result = self.api.parquet.read_table(
  97. path, columns=columns, **kwargs
  98. ).to_pandas()
  99. if should_close:
  100. path.close()
  101. return result
  102. class FastParquetImpl(BaseImpl):
  103. def __init__(self):
  104. # since pandas is a dependency of fastparquet
  105. # we need to import on first use
  106. fastparquet = import_optional_dependency(
  107. "fastparquet", extra="fastparquet is required for parquet support."
  108. )
  109. self.api = fastparquet
  110. def write(
  111. self,
  112. df: DataFrame,
  113. path,
  114. compression="snappy",
  115. index=None,
  116. partition_cols=None,
  117. **kwargs,
  118. ):
  119. self.validate_dataframe(df)
  120. # thriftpy/protocol/compact.py:339:
  121. # DeprecationWarning: tostring() is deprecated.
  122. # Use tobytes() instead.
  123. if "partition_on" in kwargs and partition_cols is not None:
  124. raise ValueError(
  125. "Cannot use both partition_on and "
  126. "partition_cols. Use partition_cols for "
  127. "partitioning data"
  128. )
  129. elif "partition_on" in kwargs:
  130. partition_cols = kwargs.pop("partition_on")
  131. if partition_cols is not None:
  132. kwargs["file_scheme"] = "hive"
  133. if is_s3_url(path) or is_gcs_url(path):
  134. # if path is s3:// or gs:// we need to open the file in 'wb' mode.
  135. # TODO: Support 'ab'
  136. path, _, _, _ = get_filepath_or_buffer(path, mode="wb")
  137. # And pass the opened file to the fastparquet internal impl.
  138. kwargs["open_with"] = lambda path, _: path
  139. else:
  140. path, _, _, _ = get_filepath_or_buffer(path)
  141. with catch_warnings(record=True):
  142. self.api.write(
  143. path,
  144. df,
  145. compression=compression,
  146. write_index=index,
  147. partition_on=partition_cols,
  148. **kwargs,
  149. )
  150. def read(self, path, columns=None, **kwargs):
  151. if is_s3_url(path):
  152. from pandas.io.s3 import get_file_and_filesystem
  153. # When path is s3:// an S3File is returned.
  154. # We need to retain the original path(str) while also
  155. # pass the S3File().open function to fsatparquet impl.
  156. s3, filesystem = get_file_and_filesystem(path)
  157. try:
  158. parquet_file = self.api.ParquetFile(path, open_with=filesystem.open)
  159. finally:
  160. s3.close()
  161. else:
  162. path, _, _, _ = get_filepath_or_buffer(path)
  163. parquet_file = self.api.ParquetFile(path)
  164. return parquet_file.to_pandas(columns=columns, **kwargs)
  165. def to_parquet(
  166. df: DataFrame,
  167. path,
  168. engine: str = "auto",
  169. compression="snappy",
  170. index: Optional[bool] = None,
  171. partition_cols=None,
  172. **kwargs,
  173. ):
  174. """
  175. Write a DataFrame to the parquet format.
  176. Parameters
  177. ----------
  178. df : DataFrame
  179. path : str
  180. File path or Root Directory path. Will be used as Root Directory path
  181. while writing a partitioned dataset.
  182. .. versionchanged:: 0.24.0
  183. engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
  184. Parquet library to use. If 'auto', then the option
  185. ``io.parquet.engine`` is used. The default ``io.parquet.engine``
  186. behavior is to try 'pyarrow', falling back to 'fastparquet' if
  187. 'pyarrow' is unavailable.
  188. compression : {'snappy', 'gzip', 'brotli', None}, default 'snappy'
  189. Name of the compression to use. Use ``None`` for no compression.
  190. index : bool, default None
  191. If ``True``, include the dataframe's index(es) in the file output. If
  192. ``False``, they will not be written to the file.
  193. If ``None``, similar to ``True`` the dataframe's index(es)
  194. will be saved. However, instead of being saved as values,
  195. the RangeIndex will be stored as a range in the metadata so it
  196. doesn't require much space and is faster. Other indexes will
  197. be included as columns in the file output.
  198. .. versionadded:: 0.24.0
  199. partition_cols : str or list, optional, default None
  200. Column names by which to partition the dataset
  201. Columns are partitioned in the order they are given
  202. .. versionadded:: 0.24.0
  203. kwargs
  204. Additional keyword arguments passed to the engine
  205. """
  206. if isinstance(partition_cols, str):
  207. partition_cols = [partition_cols]
  208. impl = get_engine(engine)
  209. return impl.write(
  210. df,
  211. path,
  212. compression=compression,
  213. index=index,
  214. partition_cols=partition_cols,
  215. **kwargs,
  216. )
  217. def read_parquet(path, engine: str = "auto", columns=None, **kwargs):
  218. """
  219. Load a parquet object from the file path, returning a DataFrame.
  220. .. versionadded:: 0.21.0
  221. Parameters
  222. ----------
  223. path : str, path object or file-like object
  224. Any valid string path is acceptable. The string could be a URL. Valid
  225. URL schemes include http, ftp, s3, and file. For file URLs, a host is
  226. expected. A local file could be:
  227. ``file://localhost/path/to/table.parquet``.
  228. A file URL can also be a path to a directory that contains multiple
  229. partitioned parquet files. Both pyarrow and fastparquet support
  230. paths to directories as well as file URLs. A directory path could be:
  231. ``file://localhost/path/to/tables``
  232. If you want to pass in a path object, pandas accepts any
  233. ``os.PathLike``.
  234. By file-like object, we refer to objects with a ``read()`` method,
  235. such as a file handler (e.g. via builtin ``open`` function)
  236. or ``StringIO``.
  237. engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
  238. Parquet library to use. If 'auto', then the option
  239. ``io.parquet.engine`` is used. The default ``io.parquet.engine``
  240. behavior is to try 'pyarrow', falling back to 'fastparquet' if
  241. 'pyarrow' is unavailable.
  242. columns : list, default=None
  243. If not None, only these columns will be read from the file.
  244. .. versionadded:: 0.21.1
  245. **kwargs
  246. Any additional kwargs are passed to the engine.
  247. Returns
  248. -------
  249. DataFrame
  250. """
  251. impl = get_engine(engine)
  252. return impl.read(path, columns=columns, **kwargs)