_odfreader.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. from typing import List
  2. from pandas._typing import FilePathOrBuffer, Scalar
  3. from pandas.compat._optional import import_optional_dependency
  4. import pandas as pd
  5. from pandas.io.excel._base import _BaseExcelReader
  6. class _ODFReader(_BaseExcelReader):
  7. """
  8. Read tables out of OpenDocument formatted files.
  9. Parameters
  10. ----------
  11. filepath_or_buffer: string, path to be parsed or
  12. an open readable stream.
  13. """
  14. def __init__(self, filepath_or_buffer: FilePathOrBuffer):
  15. import_optional_dependency("odf")
  16. super().__init__(filepath_or_buffer)
  17. @property
  18. def _workbook_class(self):
  19. from odf.opendocument import OpenDocument
  20. return OpenDocument
  21. def load_workbook(self, filepath_or_buffer: FilePathOrBuffer):
  22. from odf.opendocument import load
  23. return load(filepath_or_buffer)
  24. @property
  25. def empty_value(self) -> str:
  26. """Property for compat with other readers."""
  27. return ""
  28. @property
  29. def sheet_names(self) -> List[str]:
  30. """Return a list of sheet names present in the document"""
  31. from odf.table import Table
  32. tables = self.book.getElementsByType(Table)
  33. return [t.getAttribute("name") for t in tables]
  34. def get_sheet_by_index(self, index: int):
  35. from odf.table import Table
  36. tables = self.book.getElementsByType(Table)
  37. return tables[index]
  38. def get_sheet_by_name(self, name: str):
  39. from odf.table import Table
  40. tables = self.book.getElementsByType(Table)
  41. for table in tables:
  42. if table.getAttribute("name") == name:
  43. return table
  44. raise ValueError(f"sheet {name} not found")
  45. def get_sheet_data(self, sheet, convert_float: bool) -> List[List[Scalar]]:
  46. """Parse an ODF Table into a list of lists
  47. """
  48. from odf.table import CoveredTableCell, TableCell, TableRow
  49. covered_cell_name = CoveredTableCell().qname
  50. table_cell_name = TableCell().qname
  51. cell_names = {covered_cell_name, table_cell_name}
  52. sheet_rows = sheet.getElementsByType(TableRow)
  53. empty_rows = 0
  54. max_row_len = 0
  55. table: List[List[Scalar]] = []
  56. for i, sheet_row in enumerate(sheet_rows):
  57. sheet_cells = [x for x in sheet_row.childNodes if x.qname in cell_names]
  58. empty_cells = 0
  59. table_row: List[Scalar] = []
  60. for j, sheet_cell in enumerate(sheet_cells):
  61. if sheet_cell.qname == table_cell_name:
  62. value = self._get_cell_value(sheet_cell, convert_float)
  63. else:
  64. value = self.empty_value
  65. column_repeat = self._get_column_repeat(sheet_cell)
  66. # Queue up empty values, writing only if content succeeds them
  67. if value == self.empty_value:
  68. empty_cells += column_repeat
  69. else:
  70. table_row.extend([self.empty_value] * empty_cells)
  71. empty_cells = 0
  72. table_row.extend([value] * column_repeat)
  73. if max_row_len < len(table_row):
  74. max_row_len = len(table_row)
  75. row_repeat = self._get_row_repeat(sheet_row)
  76. if self._is_empty_row(sheet_row):
  77. empty_rows += row_repeat
  78. else:
  79. # add blank rows to our table
  80. table.extend([[self.empty_value]] * empty_rows)
  81. empty_rows = 0
  82. for _ in range(row_repeat):
  83. table.append(table_row)
  84. # Make our table square
  85. for row in table:
  86. if len(row) < max_row_len:
  87. row.extend([self.empty_value] * (max_row_len - len(row)))
  88. return table
  89. def _get_row_repeat(self, row) -> int:
  90. """Return number of times this row was repeated
  91. Repeating an empty row appeared to be a common way
  92. of representing sparse rows in the table.
  93. """
  94. from odf.namespaces import TABLENS
  95. return int(row.attributes.get((TABLENS, "number-rows-repeated"), 1))
  96. def _get_column_repeat(self, cell) -> int:
  97. from odf.namespaces import TABLENS
  98. return int(cell.attributes.get((TABLENS, "number-columns-repeated"), 1))
  99. def _is_empty_row(self, row) -> bool:
  100. """Helper function to find empty rows
  101. """
  102. for column in row.childNodes:
  103. if len(column.childNodes) > 0:
  104. return False
  105. return True
  106. def _get_cell_value(self, cell, convert_float: bool) -> Scalar:
  107. from odf.namespaces import OFFICENS
  108. cell_type = cell.attributes.get((OFFICENS, "value-type"))
  109. if cell_type == "boolean":
  110. if str(cell) == "TRUE":
  111. return True
  112. return False
  113. if cell_type is None:
  114. return self.empty_value
  115. elif cell_type == "float":
  116. # GH5394
  117. cell_value = float(cell.attributes.get((OFFICENS, "value")))
  118. if cell_value == 0.0: # NA handling
  119. return str(cell)
  120. if convert_float:
  121. val = int(cell_value)
  122. if val == cell_value:
  123. return val
  124. return cell_value
  125. elif cell_type == "percentage":
  126. cell_value = cell.attributes.get((OFFICENS, "value"))
  127. return float(cell_value)
  128. elif cell_type == "string":
  129. return str(cell)
  130. elif cell_type == "currency":
  131. cell_value = cell.attributes.get((OFFICENS, "value"))
  132. return float(cell_value)
  133. elif cell_type == "date":
  134. cell_value = cell.attributes.get((OFFICENS, "date-value"))
  135. return pd.to_datetime(cell_value)
  136. elif cell_type == "time":
  137. return pd.to_datetime(str(cell)).time()
  138. else:
  139. raise ValueError(f"Unrecognized type {cell_type}")