test_setops.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. import numpy as np
  2. import pytest
  3. import pandas as pd
  4. from pandas import Int64Index, TimedeltaIndex, timedelta_range
  5. import pandas._testing as tm
  6. from pandas.tseries.offsets import Hour
  7. class TestTimedeltaIndex:
  8. def test_union(self):
  9. i1 = timedelta_range("1day", periods=5)
  10. i2 = timedelta_range("3day", periods=5)
  11. result = i1.union(i2)
  12. expected = timedelta_range("1day", periods=7)
  13. tm.assert_index_equal(result, expected)
  14. i1 = Int64Index(np.arange(0, 20, 2))
  15. i2 = timedelta_range(start="1 day", periods=10, freq="D")
  16. i1.union(i2) # Works
  17. i2.union(i1) # Fails with "AttributeError: can't set attribute"
  18. def test_union_sort_false(self):
  19. tdi = timedelta_range("1day", periods=5)
  20. left = tdi[3:]
  21. right = tdi[:3]
  22. # Check that we are testing the desired code path
  23. assert left._can_fast_union(right)
  24. result = left.union(right)
  25. tm.assert_index_equal(result, tdi)
  26. result = left.union(right, sort=False)
  27. expected = pd.TimedeltaIndex(["4 Days", "5 Days", "1 Days", "2 Day", "3 Days"])
  28. tm.assert_index_equal(result, expected)
  29. def test_union_coverage(self):
  30. idx = TimedeltaIndex(["3d", "1d", "2d"])
  31. ordered = TimedeltaIndex(idx.sort_values(), freq="infer")
  32. result = ordered.union(idx)
  33. tm.assert_index_equal(result, ordered)
  34. result = ordered[:0].union(ordered)
  35. tm.assert_index_equal(result, ordered)
  36. assert result.freq == ordered.freq
  37. def test_union_bug_1730(self):
  38. rng_a = timedelta_range("1 day", periods=4, freq="3H")
  39. rng_b = timedelta_range("1 day", periods=4, freq="4H")
  40. result = rng_a.union(rng_b)
  41. exp = TimedeltaIndex(sorted(set(rng_a) | set(rng_b)))
  42. tm.assert_index_equal(result, exp)
  43. def test_union_bug_1745(self):
  44. left = TimedeltaIndex(["1 day 15:19:49.695000"])
  45. right = TimedeltaIndex(
  46. ["2 day 13:04:21.322000", "1 day 15:27:24.873000", "1 day 15:31:05.350000"]
  47. )
  48. result = left.union(right)
  49. exp = TimedeltaIndex(sorted(set(left) | set(right)))
  50. tm.assert_index_equal(result, exp)
  51. def test_union_bug_4564(self):
  52. left = timedelta_range("1 day", "30d")
  53. right = left + pd.offsets.Minute(15)
  54. result = left.union(right)
  55. exp = TimedeltaIndex(sorted(set(left) | set(right)))
  56. tm.assert_index_equal(result, exp)
  57. def test_union_freq_infer(self):
  58. # When taking the union of two TimedeltaIndexes, we infer
  59. # a freq even if the arguments don't have freq. This matches
  60. # DatetimeIndex behavior.
  61. tdi = pd.timedelta_range("1 Day", periods=5)
  62. left = tdi[[0, 1, 3, 4]]
  63. right = tdi[[2, 3, 1]]
  64. assert left.freq is None
  65. assert right.freq is None
  66. result = left.union(right)
  67. tm.assert_index_equal(result, tdi)
  68. assert result.freq == "D"
  69. def test_intersection_bug_1708(self):
  70. index_1 = timedelta_range("1 day", periods=4, freq="h")
  71. index_2 = index_1 + pd.offsets.Hour(5)
  72. result = index_1 & index_2
  73. assert len(result) == 0
  74. index_1 = timedelta_range("1 day", periods=4, freq="h")
  75. index_2 = index_1 + pd.offsets.Hour(1)
  76. result = index_1 & index_2
  77. expected = timedelta_range("1 day 01:00:00", periods=3, freq="h")
  78. tm.assert_index_equal(result, expected)
  79. @pytest.mark.parametrize("sort", [None, False])
  80. def test_intersection_equal(self, sort):
  81. # GH 24471 Test intersection outcome given the sort keyword
  82. # for equal indicies intersection should return the original index
  83. first = timedelta_range("1 day", periods=4, freq="h")
  84. second = timedelta_range("1 day", periods=4, freq="h")
  85. intersect = first.intersection(second, sort=sort)
  86. if sort is None:
  87. tm.assert_index_equal(intersect, second.sort_values())
  88. assert tm.equalContents(intersect, second)
  89. # Corner cases
  90. inter = first.intersection(first, sort=sort)
  91. assert inter is first
  92. @pytest.mark.parametrize("period_1, period_2", [(0, 4), (4, 0)])
  93. @pytest.mark.parametrize("sort", [None, False])
  94. def test_intersection_zero_length(self, period_1, period_2, sort):
  95. # GH 24471 test for non overlap the intersection should be zero length
  96. index_1 = timedelta_range("1 day", periods=period_1, freq="h")
  97. index_2 = timedelta_range("1 day", periods=period_2, freq="h")
  98. expected = timedelta_range("1 day", periods=0, freq="h")
  99. result = index_1.intersection(index_2, sort=sort)
  100. tm.assert_index_equal(result, expected)
  101. @pytest.mark.parametrize("sort", [None, False])
  102. def test_zero_length_input_index(self, sort):
  103. # GH 24966 test for 0-len intersections are copied
  104. index_1 = timedelta_range("1 day", periods=0, freq="h")
  105. index_2 = timedelta_range("1 day", periods=3, freq="h")
  106. result = index_1.intersection(index_2, sort=sort)
  107. assert index_1 is not result
  108. assert index_2 is not result
  109. tm.assert_copy(result, index_1)
  110. @pytest.mark.parametrize(
  111. "rng, expected",
  112. # if target has the same name, it is preserved
  113. [
  114. (
  115. timedelta_range("1 day", periods=5, freq="h", name="idx"),
  116. timedelta_range("1 day", periods=4, freq="h", name="idx"),
  117. ),
  118. # if target name is different, it will be reset
  119. (
  120. timedelta_range("1 day", periods=5, freq="h", name="other"),
  121. timedelta_range("1 day", periods=4, freq="h", name=None),
  122. ),
  123. # if no overlap exists return empty index
  124. (
  125. timedelta_range("1 day", periods=10, freq="h", name="idx")[5:],
  126. TimedeltaIndex([], name="idx"),
  127. ),
  128. ],
  129. )
  130. @pytest.mark.parametrize("sort", [None, False])
  131. def test_intersection(self, rng, expected, sort):
  132. # GH 4690 (with tz)
  133. base = timedelta_range("1 day", periods=4, freq="h", name="idx")
  134. result = base.intersection(rng, sort=sort)
  135. if sort is None:
  136. expected = expected.sort_values()
  137. tm.assert_index_equal(result, expected)
  138. assert result.name == expected.name
  139. assert result.freq == expected.freq
  140. @pytest.mark.parametrize(
  141. "rng, expected",
  142. # part intersection works
  143. [
  144. (
  145. TimedeltaIndex(["5 hour", "2 hour", "4 hour", "9 hour"], name="idx"),
  146. TimedeltaIndex(["2 hour", "4 hour"], name="idx"),
  147. ),
  148. # reordered part intersection
  149. (
  150. TimedeltaIndex(["2 hour", "5 hour", "5 hour", "1 hour"], name="other"),
  151. TimedeltaIndex(["1 hour", "2 hour"], name=None),
  152. ),
  153. # reveresed index
  154. (
  155. TimedeltaIndex(["1 hour", "2 hour", "4 hour", "3 hour"], name="idx")[
  156. ::-1
  157. ],
  158. TimedeltaIndex(["1 hour", "2 hour", "4 hour", "3 hour"], name="idx"),
  159. ),
  160. ],
  161. )
  162. @pytest.mark.parametrize("sort", [None, False])
  163. def test_intersection_non_monotonic(self, rng, expected, sort):
  164. # 24471 non-monotonic
  165. base = TimedeltaIndex(["1 hour", "2 hour", "4 hour", "3 hour"], name="idx")
  166. result = base.intersection(rng, sort=sort)
  167. if sort is None:
  168. expected = expected.sort_values()
  169. tm.assert_index_equal(result, expected)
  170. assert result.name == expected.name
  171. # if reveresed order, frequency is still the same
  172. if all(base == rng[::-1]) and sort is None:
  173. assert isinstance(result.freq, Hour)
  174. else:
  175. assert result.freq is None
  176. class TestTimedeltaIndexDifference:
  177. @pytest.mark.parametrize("sort", [None, False])
  178. def test_difference_freq(self, sort):
  179. # GH14323: Difference of TimedeltaIndex should not preserve frequency
  180. index = timedelta_range("0 days", "5 days", freq="D")
  181. other = timedelta_range("1 days", "4 days", freq="D")
  182. expected = TimedeltaIndex(["0 days", "5 days"], freq=None)
  183. idx_diff = index.difference(other, sort)
  184. tm.assert_index_equal(idx_diff, expected)
  185. tm.assert_attr_equal("freq", idx_diff, expected)
  186. other = timedelta_range("2 days", "5 days", freq="D")
  187. idx_diff = index.difference(other, sort)
  188. expected = TimedeltaIndex(["0 days", "1 days"], freq=None)
  189. tm.assert_index_equal(idx_diff, expected)
  190. tm.assert_attr_equal("freq", idx_diff, expected)
  191. @pytest.mark.parametrize("sort", [None, False])
  192. def test_difference_sort(self, sort):
  193. index = pd.TimedeltaIndex(
  194. ["5 days", "3 days", "2 days", "4 days", "1 days", "0 days"]
  195. )
  196. other = timedelta_range("1 days", "4 days", freq="D")
  197. idx_diff = index.difference(other, sort)
  198. expected = TimedeltaIndex(["5 days", "0 days"], freq=None)
  199. if sort is None:
  200. expected = expected.sort_values()
  201. tm.assert_index_equal(idx_diff, expected)
  202. tm.assert_attr_equal("freq", idx_diff, expected)
  203. other = timedelta_range("2 days", "5 days", freq="D")
  204. idx_diff = index.difference(other, sort)
  205. expected = TimedeltaIndex(["1 days", "0 days"], freq=None)
  206. if sort is None:
  207. expected = expected.sort_values()
  208. tm.assert_index_equal(idx_diff, expected)
  209. tm.assert_attr_equal("freq", idx_diff, expected)