apply.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. import abc
  2. import inspect
  3. from typing import TYPE_CHECKING, Any, Dict, Iterator, Tuple, Type, Union
  4. import numpy as np
  5. from pandas._libs import reduction as libreduction
  6. from pandas.util._decorators import cache_readonly
  7. from pandas.core.dtypes.common import (
  8. is_dict_like,
  9. is_extension_array_dtype,
  10. is_list_like,
  11. is_sequence,
  12. )
  13. from pandas.core.dtypes.generic import ABCSeries
  14. from pandas.core.construction import create_series_with_explicit_dtype
  15. if TYPE_CHECKING:
  16. from pandas import DataFrame, Series, Index
  17. ResType = Dict[int, Any]
  18. def frame_apply(
  19. obj: "DataFrame",
  20. func,
  21. axis=0,
  22. raw: bool = False,
  23. result_type=None,
  24. ignore_failures: bool = False,
  25. args=None,
  26. kwds=None,
  27. ):
  28. """ construct and return a row or column based frame apply object """
  29. axis = obj._get_axis_number(axis)
  30. klass: Type[FrameApply]
  31. if axis == 0:
  32. klass = FrameRowApply
  33. elif axis == 1:
  34. klass = FrameColumnApply
  35. return klass(
  36. obj,
  37. func,
  38. raw=raw,
  39. result_type=result_type,
  40. ignore_failures=ignore_failures,
  41. args=args,
  42. kwds=kwds,
  43. )
  44. class FrameApply(metaclass=abc.ABCMeta):
  45. # ---------------------------------------------------------------
  46. # Abstract Methods
  47. axis: int
  48. @property
  49. @abc.abstractmethod
  50. def result_index(self) -> "Index":
  51. pass
  52. @property
  53. @abc.abstractmethod
  54. def result_columns(self) -> "Index":
  55. pass
  56. @property
  57. @abc.abstractmethod
  58. def series_generator(self) -> Iterator["Series"]:
  59. pass
  60. @abc.abstractmethod
  61. def wrap_results_for_axis(
  62. self, results: ResType, res_index: "Index"
  63. ) -> Union["Series", "DataFrame"]:
  64. pass
  65. # ---------------------------------------------------------------
  66. def __init__(
  67. self,
  68. obj: "DataFrame",
  69. func,
  70. raw: bool,
  71. result_type,
  72. ignore_failures: bool,
  73. args,
  74. kwds,
  75. ):
  76. self.obj = obj
  77. self.raw = raw
  78. self.ignore_failures = ignore_failures
  79. self.args = args or ()
  80. self.kwds = kwds or {}
  81. if result_type not in [None, "reduce", "broadcast", "expand"]:
  82. raise ValueError(
  83. "invalid value for result_type, must be one "
  84. "of {None, 'reduce', 'broadcast', 'expand'}"
  85. )
  86. self.result_type = result_type
  87. # curry if needed
  88. if (kwds or args) and not isinstance(func, (np.ufunc, str)):
  89. def f(x):
  90. return func(x, *args, **kwds)
  91. else:
  92. f = func
  93. self.f = f
  94. @property
  95. def res_columns(self) -> "Index":
  96. return self.result_columns
  97. @property
  98. def columns(self) -> "Index":
  99. return self.obj.columns
  100. @property
  101. def index(self) -> "Index":
  102. return self.obj.index
  103. @cache_readonly
  104. def values(self):
  105. return self.obj.values
  106. @cache_readonly
  107. def dtypes(self) -> "Series":
  108. return self.obj.dtypes
  109. @property
  110. def agg_axis(self) -> "Index":
  111. return self.obj._get_agg_axis(self.axis)
  112. def get_result(self):
  113. """ compute the results """
  114. # dispatch to agg
  115. if is_list_like(self.f) or is_dict_like(self.f):
  116. return self.obj.aggregate(self.f, axis=self.axis, *self.args, **self.kwds)
  117. # all empty
  118. if len(self.columns) == 0 and len(self.index) == 0:
  119. return self.apply_empty_result()
  120. # string dispatch
  121. if isinstance(self.f, str):
  122. # Support for `frame.transform('method')`
  123. # Some methods (shift, etc.) require the axis argument, others
  124. # don't, so inspect and insert if necessary.
  125. func = getattr(self.obj, self.f)
  126. sig = inspect.getfullargspec(func)
  127. if "axis" in sig.args:
  128. self.kwds["axis"] = self.axis
  129. return func(*self.args, **self.kwds)
  130. # ufunc
  131. elif isinstance(self.f, np.ufunc):
  132. with np.errstate(all="ignore"):
  133. results = self.obj._data.apply("apply", func=self.f)
  134. return self.obj._constructor(
  135. data=results, index=self.index, columns=self.columns, copy=False
  136. )
  137. # broadcasting
  138. if self.result_type == "broadcast":
  139. return self.apply_broadcast(self.obj)
  140. # one axis empty
  141. elif not all(self.obj.shape):
  142. return self.apply_empty_result()
  143. # raw
  144. elif self.raw and not self.obj._is_mixed_type:
  145. return self.apply_raw()
  146. return self.apply_standard()
  147. def apply_empty_result(self):
  148. """
  149. we have an empty result; at least 1 axis is 0
  150. we will try to apply the function to an empty
  151. series in order to see if this is a reduction function
  152. """
  153. # we are not asked to reduce or infer reduction
  154. # so just return a copy of the existing object
  155. if self.result_type not in ["reduce", None]:
  156. return self.obj.copy()
  157. # we may need to infer
  158. should_reduce = self.result_type == "reduce"
  159. from pandas import Series
  160. if not should_reduce:
  161. try:
  162. r = self.f(Series([], dtype=np.float64))
  163. except Exception:
  164. pass
  165. else:
  166. should_reduce = not isinstance(r, Series)
  167. if should_reduce:
  168. if len(self.agg_axis):
  169. r = self.f(Series([], dtype=np.float64))
  170. else:
  171. r = np.nan
  172. return self.obj._constructor_sliced(r, index=self.agg_axis)
  173. else:
  174. return self.obj.copy()
  175. def apply_raw(self):
  176. """ apply to the values as a numpy array """
  177. try:
  178. result = libreduction.compute_reduction(self.values, self.f, axis=self.axis)
  179. except ValueError as err:
  180. if "Function does not reduce" not in str(err):
  181. # catch only ValueError raised intentionally in libreduction
  182. raise
  183. # We expect np.apply_along_axis to give a two-dimensional result, or
  184. # also raise.
  185. result = np.apply_along_axis(self.f, self.axis, self.values)
  186. # TODO: mixed type case
  187. if result.ndim == 2:
  188. return self.obj._constructor(result, index=self.index, columns=self.columns)
  189. else:
  190. return self.obj._constructor_sliced(result, index=self.agg_axis)
  191. def apply_broadcast(self, target: "DataFrame") -> "DataFrame":
  192. result_values = np.empty_like(target.values)
  193. # axis which we want to compare compliance
  194. result_compare = target.shape[0]
  195. for i, col in enumerate(target.columns):
  196. res = self.f(target[col])
  197. ares = np.asarray(res).ndim
  198. # must be a scalar or 1d
  199. if ares > 1:
  200. raise ValueError("too many dims to broadcast")
  201. elif ares == 1:
  202. # must match return dim
  203. if result_compare != len(res):
  204. raise ValueError("cannot broadcast result")
  205. result_values[:, i] = res
  206. # we *always* preserve the original index / columns
  207. result = self.obj._constructor(
  208. result_values, index=target.index, columns=target.columns
  209. )
  210. return result
  211. def apply_standard(self):
  212. # try to reduce first (by default)
  213. # this only matters if the reduction in values is of different dtype
  214. # e.g. if we want to apply to a SparseFrame, then can't directly reduce
  215. # we cannot reduce using non-numpy dtypes,
  216. # as demonstrated in gh-12244
  217. if (
  218. self.result_type in ["reduce", None]
  219. and not self.dtypes.apply(is_extension_array_dtype).any()
  220. # Disallow complex_internals since libreduction shortcut raises a TypeError
  221. and not self.agg_axis._has_complex_internals
  222. ):
  223. values = self.values
  224. index = self.obj._get_axis(self.axis)
  225. labels = self.agg_axis
  226. empty_arr = np.empty(len(index), dtype=values.dtype)
  227. # Preserve subclass for e.g. test_subclassed_apply
  228. dummy = self.obj._constructor_sliced(
  229. empty_arr, index=index, dtype=values.dtype
  230. )
  231. try:
  232. result = libreduction.compute_reduction(
  233. values, self.f, axis=self.axis, dummy=dummy, labels=labels
  234. )
  235. except ValueError as err:
  236. if "Function does not reduce" not in str(err):
  237. # catch only ValueError raised intentionally in libreduction
  238. raise
  239. except TypeError:
  240. # e.g. test_apply_ignore_failures we just ignore
  241. if not self.ignore_failures:
  242. raise
  243. except ZeroDivisionError:
  244. # reached via numexpr; fall back to python implementation
  245. pass
  246. else:
  247. return self.obj._constructor_sliced(result, index=labels)
  248. # compute the result using the series generator
  249. results, res_index = self.apply_series_generator()
  250. # wrap results
  251. return self.wrap_results(results, res_index)
  252. def apply_series_generator(self) -> Tuple[ResType, "Index"]:
  253. series_gen = self.series_generator
  254. res_index = self.result_index
  255. keys = []
  256. results = {}
  257. if self.ignore_failures:
  258. successes = []
  259. for i, v in enumerate(series_gen):
  260. try:
  261. results[i] = self.f(v)
  262. except Exception:
  263. pass
  264. else:
  265. keys.append(v.name)
  266. successes.append(i)
  267. # so will work with MultiIndex
  268. if len(successes) < len(res_index):
  269. res_index = res_index.take(successes)
  270. else:
  271. for i, v in enumerate(series_gen):
  272. results[i] = self.f(v)
  273. keys.append(v.name)
  274. return results, res_index
  275. def wrap_results(
  276. self, results: ResType, res_index: "Index"
  277. ) -> Union["Series", "DataFrame"]:
  278. from pandas import Series
  279. # see if we can infer the results
  280. if len(results) > 0 and 0 in results and is_sequence(results[0]):
  281. return self.wrap_results_for_axis(results, res_index)
  282. # dict of scalars
  283. # the default dtype of an empty Series will be `object`, but this
  284. # code can be hit by df.mean() where the result should have dtype
  285. # float64 even if it's an empty Series.
  286. constructor_sliced = self.obj._constructor_sliced
  287. if constructor_sliced is Series:
  288. result = create_series_with_explicit_dtype(
  289. results, dtype_if_empty=np.float64
  290. )
  291. else:
  292. result = constructor_sliced(results)
  293. result.index = res_index
  294. return result
  295. class FrameRowApply(FrameApply):
  296. axis = 0
  297. def apply_broadcast(self, target: "DataFrame") -> "DataFrame":
  298. return super().apply_broadcast(target)
  299. @property
  300. def series_generator(self):
  301. return (self.obj._ixs(i, axis=1) for i in range(len(self.columns)))
  302. @property
  303. def result_index(self) -> "Index":
  304. return self.columns
  305. @property
  306. def result_columns(self) -> "Index":
  307. return self.index
  308. def wrap_results_for_axis(
  309. self, results: ResType, res_index: "Index"
  310. ) -> "DataFrame":
  311. """ return the results for the rows """
  312. result = self.obj._constructor(data=results)
  313. if not isinstance(results[0], ABCSeries):
  314. if len(result.index) == len(self.res_columns):
  315. result.index = self.res_columns
  316. if len(result.columns) == len(res_index):
  317. result.columns = res_index
  318. return result
  319. class FrameColumnApply(FrameApply):
  320. axis = 1
  321. def apply_broadcast(self, target: "DataFrame") -> "DataFrame":
  322. result = super().apply_broadcast(target.T)
  323. return result.T
  324. @property
  325. def series_generator(self):
  326. constructor = self.obj._constructor_sliced
  327. return (
  328. constructor(arr, index=self.columns, name=name)
  329. for i, (arr, name) in enumerate(zip(self.values, self.index))
  330. )
  331. @property
  332. def result_index(self) -> "Index":
  333. return self.index
  334. @property
  335. def result_columns(self) -> "Index":
  336. return self.columns
  337. def wrap_results_for_axis(
  338. self, results: ResType, res_index: "Index"
  339. ) -> Union["Series", "DataFrame"]:
  340. """ return the results for the columns """
  341. result: Union["Series", "DataFrame"]
  342. # we have requested to expand
  343. if self.result_type == "expand":
  344. result = self.infer_to_same_shape(results, res_index)
  345. # we have a non-series and don't want inference
  346. elif not isinstance(results[0], ABCSeries):
  347. from pandas import Series
  348. result = Series(results)
  349. result.index = res_index
  350. # we may want to infer results
  351. else:
  352. result = self.infer_to_same_shape(results, res_index)
  353. return result
  354. def infer_to_same_shape(self, results: ResType, res_index: "Index") -> "DataFrame":
  355. """ infer the results to the same shape as the input object """
  356. result = self.obj._constructor(data=results)
  357. result = result.T
  358. # set the index
  359. result.index = res_index
  360. # infer dtypes
  361. result = result.infer_objects()
  362. return result