sas_xport.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. """
  2. Read a SAS XPort format file into a Pandas DataFrame.
  3. Based on code from Jack Cushman (github.com/jcushman/xport).
  4. The file format is defined here:
  5. https://support.sas.com/techsup/technote/ts140.pdf
  6. """
  7. from collections import abc
  8. from datetime import datetime
  9. from io import BytesIO
  10. import struct
  11. import warnings
  12. import numpy as np
  13. from pandas.util._decorators import Appender
  14. import pandas as pd
  15. from pandas.io.common import get_filepath_or_buffer
  16. _correct_line1 = (
  17. "HEADER RECORD*******LIBRARY HEADER RECORD!!!!!!!"
  18. "000000000000000000000000000000 "
  19. )
  20. _correct_header1 = (
  21. "HEADER RECORD*******MEMBER HEADER RECORD!!!!!!!000000000000000001600000000"
  22. )
  23. _correct_header2 = (
  24. "HEADER RECORD*******DSCRPTR HEADER RECORD!!!!!!!"
  25. "000000000000000000000000000000 "
  26. )
  27. _correct_obs_header = (
  28. "HEADER RECORD*******OBS HEADER RECORD!!!!!!!"
  29. "000000000000000000000000000000 "
  30. )
  31. _fieldkeys = [
  32. "ntype",
  33. "nhfun",
  34. "field_length",
  35. "nvar0",
  36. "name",
  37. "label",
  38. "nform",
  39. "nfl",
  40. "num_decimals",
  41. "nfj",
  42. "nfill",
  43. "niform",
  44. "nifl",
  45. "nifd",
  46. "npos",
  47. "_",
  48. ]
  49. _base_params_doc = """\
  50. Parameters
  51. ----------
  52. filepath_or_buffer : string or file-like object
  53. Path to SAS file or object implementing binary read method."""
  54. _params2_doc = """\
  55. index : identifier of index column
  56. Identifier of column that should be used as index of the DataFrame.
  57. encoding : string
  58. Encoding for text data.
  59. chunksize : int
  60. Read file `chunksize` lines at a time, returns iterator."""
  61. _format_params_doc = """\
  62. format : string
  63. File format, only `xport` is currently supported."""
  64. _iterator_doc = """\
  65. iterator : boolean, default False
  66. Return XportReader object for reading file incrementally."""
  67. _read_sas_doc = """Read a SAS file into a DataFrame.
  68. %(_base_params_doc)s
  69. %(_format_params_doc)s
  70. %(_params2_doc)s
  71. %(_iterator_doc)s
  72. Returns
  73. -------
  74. DataFrame or XportReader
  75. Examples
  76. --------
  77. Read a SAS Xport file:
  78. >>> df = pd.read_sas('filename.XPT')
  79. Read a Xport file in 10,000 line chunks:
  80. >>> itr = pd.read_sas('filename.XPT', chunksize=10000)
  81. >>> for chunk in itr:
  82. >>> do_something(chunk)
  83. """ % {
  84. "_base_params_doc": _base_params_doc,
  85. "_format_params_doc": _format_params_doc,
  86. "_params2_doc": _params2_doc,
  87. "_iterator_doc": _iterator_doc,
  88. }
  89. _xport_reader_doc = """\
  90. Class for reading SAS Xport files.
  91. %(_base_params_doc)s
  92. %(_params2_doc)s
  93. Attributes
  94. ----------
  95. member_info : list
  96. Contains information about the file
  97. fields : list
  98. Contains information about the variables in the file
  99. """ % {
  100. "_base_params_doc": _base_params_doc,
  101. "_params2_doc": _params2_doc,
  102. }
  103. _read_method_doc = """\
  104. Read observations from SAS Xport file, returning as data frame.
  105. Parameters
  106. ----------
  107. nrows : int
  108. Number of rows to read from data file; if None, read whole
  109. file.
  110. Returns
  111. -------
  112. A DataFrame.
  113. """
  114. def _parse_date(datestr: str) -> datetime:
  115. """ Given a date in xport format, return Python date. """
  116. try:
  117. # e.g. "16FEB11:10:07:55"
  118. return datetime.strptime(datestr, "%d%b%y:%H:%M:%S")
  119. except ValueError:
  120. return pd.NaT
  121. def _split_line(s: str, parts):
  122. """
  123. Parameters
  124. ----------
  125. s: str
  126. Fixed-length string to split
  127. parts: list of (name, length) pairs
  128. Used to break up string, name '_' will be filtered from output.
  129. Returns
  130. -------
  131. Dict of name:contents of string at given location.
  132. """
  133. out = {}
  134. start = 0
  135. for name, length in parts:
  136. out[name] = s[start : start + length].strip()
  137. start += length
  138. del out["_"]
  139. return out
  140. def _handle_truncated_float_vec(vec, nbytes):
  141. # This feature is not well documented, but some SAS XPORT files
  142. # have 2-7 byte "truncated" floats. To read these truncated
  143. # floats, pad them with zeros on the right to make 8 byte floats.
  144. #
  145. # References:
  146. # https://github.com/jcushman/xport/pull/3
  147. # The R "foreign" library
  148. if nbytes != 8:
  149. vec1 = np.zeros(len(vec), np.dtype("S8"))
  150. dtype = np.dtype("S%d,S%d" % (nbytes, 8 - nbytes))
  151. vec2 = vec1.view(dtype=dtype)
  152. vec2["f0"] = vec
  153. return vec2
  154. return vec
  155. def _parse_float_vec(vec):
  156. """
  157. Parse a vector of float values representing IBM 8 byte floats into
  158. native 8 byte floats.
  159. """
  160. dtype = np.dtype(">u4,>u4")
  161. vec1 = vec.view(dtype=dtype)
  162. xport1 = vec1["f0"]
  163. xport2 = vec1["f1"]
  164. # Start by setting first half of ieee number to first half of IBM
  165. # number sans exponent
  166. ieee1 = xport1 & 0x00FFFFFF
  167. # The fraction bit to the left of the binary point in the ieee
  168. # format was set and the number was shifted 0, 1, 2, or 3
  169. # places. This will tell us how to adjust the ibm exponent to be a
  170. # power of 2 ieee exponent and how to shift the fraction bits to
  171. # restore the correct magnitude.
  172. shift = np.zeros(len(vec), dtype=np.uint8)
  173. shift[np.where(xport1 & 0x00200000)] = 1
  174. shift[np.where(xport1 & 0x00400000)] = 2
  175. shift[np.where(xport1 & 0x00800000)] = 3
  176. # shift the ieee number down the correct number of places then
  177. # set the second half of the ieee number to be the second half
  178. # of the ibm number shifted appropriately, ored with the bits
  179. # from the first half that would have been shifted in if we
  180. # could shift a double. All we are worried about are the low
  181. # order 3 bits of the first half since we're only shifting by
  182. # 1, 2, or 3.
  183. ieee1 >>= shift
  184. ieee2 = (xport2 >> shift) | ((xport1 & 0x00000007) << (29 + (3 - shift)))
  185. # clear the 1 bit to the left of the binary point
  186. ieee1 &= 0xFFEFFFFF
  187. # set the exponent of the ieee number to be the actual exponent
  188. # plus the shift count + 1023. Or this into the first half of the
  189. # ieee number. The ibm exponent is excess 64 but is adjusted by 65
  190. # since during conversion to ibm format the exponent is
  191. # incremented by 1 and the fraction bits left 4 positions to the
  192. # right of the radix point. (had to add >> 24 because C treats &
  193. # 0x7f as 0x7f000000 and Python doesn't)
  194. ieee1 |= ((((((xport1 >> 24) & 0x7F) - 65) << 2) + shift + 1023) << 20) | (
  195. xport1 & 0x80000000
  196. )
  197. ieee = np.empty((len(ieee1),), dtype=">u4,>u4")
  198. ieee["f0"] = ieee1
  199. ieee["f1"] = ieee2
  200. ieee = ieee.view(dtype=">f8")
  201. ieee = ieee.astype("f8")
  202. return ieee
  203. class XportReader(abc.Iterator):
  204. __doc__ = _xport_reader_doc
  205. def __init__(
  206. self, filepath_or_buffer, index=None, encoding="ISO-8859-1", chunksize=None
  207. ):
  208. self._encoding = encoding
  209. self._lines_read = 0
  210. self._index = index
  211. self._chunksize = chunksize
  212. if isinstance(filepath_or_buffer, str):
  213. (
  214. filepath_or_buffer,
  215. encoding,
  216. compression,
  217. should_close,
  218. ) = get_filepath_or_buffer(filepath_or_buffer, encoding=encoding)
  219. if isinstance(filepath_or_buffer, (str, bytes)):
  220. self.filepath_or_buffer = open(filepath_or_buffer, "rb")
  221. else:
  222. # Copy to BytesIO, and ensure no encoding
  223. contents = filepath_or_buffer.read()
  224. try:
  225. contents = contents.encode(self._encoding)
  226. except UnicodeEncodeError:
  227. pass
  228. self.filepath_or_buffer = BytesIO(contents)
  229. self._read_header()
  230. def close(self):
  231. self.filepath_or_buffer.close()
  232. def _get_row(self):
  233. return self.filepath_or_buffer.read(80).decode()
  234. def _read_header(self):
  235. self.filepath_or_buffer.seek(0)
  236. # read file header
  237. line1 = self._get_row()
  238. if line1 != _correct_line1:
  239. self.close()
  240. raise ValueError("Header record is not an XPORT file.")
  241. line2 = self._get_row()
  242. fif = [["prefix", 24], ["version", 8], ["OS", 8], ["_", 24], ["created", 16]]
  243. file_info = _split_line(line2, fif)
  244. if file_info["prefix"] != "SAS SAS SASLIB":
  245. self.close()
  246. raise ValueError("Header record has invalid prefix.")
  247. file_info["created"] = _parse_date(file_info["created"])
  248. self.file_info = file_info
  249. line3 = self._get_row()
  250. file_info["modified"] = _parse_date(line3[:16])
  251. # read member header
  252. header1 = self._get_row()
  253. header2 = self._get_row()
  254. headflag1 = header1.startswith(_correct_header1)
  255. headflag2 = header2 == _correct_header2
  256. if not (headflag1 and headflag2):
  257. self.close()
  258. raise ValueError("Member header not found")
  259. # usually 140, could be 135
  260. fieldnamelength = int(header1[-5:-2])
  261. # member info
  262. mem = [
  263. ["prefix", 8],
  264. ["set_name", 8],
  265. ["sasdata", 8],
  266. ["version", 8],
  267. ["OS", 8],
  268. ["_", 24],
  269. ["created", 16],
  270. ]
  271. member_info = _split_line(self._get_row(), mem)
  272. mem = [["modified", 16], ["_", 16], ["label", 40], ["type", 8]]
  273. member_info.update(_split_line(self._get_row(), mem))
  274. member_info["modified"] = _parse_date(member_info["modified"])
  275. member_info["created"] = _parse_date(member_info["created"])
  276. self.member_info = member_info
  277. # read field names
  278. types = {1: "numeric", 2: "char"}
  279. fieldcount = int(self._get_row()[54:58])
  280. datalength = fieldnamelength * fieldcount
  281. # round up to nearest 80
  282. if datalength % 80:
  283. datalength += 80 - datalength % 80
  284. fielddata = self.filepath_or_buffer.read(datalength)
  285. fields = []
  286. obs_length = 0
  287. while len(fielddata) >= fieldnamelength:
  288. # pull data for one field
  289. field, fielddata = (
  290. fielddata[:fieldnamelength],
  291. fielddata[fieldnamelength:],
  292. )
  293. # rest at end gets ignored, so if field is short, pad out
  294. # to match struct pattern below
  295. field = field.ljust(140)
  296. fieldstruct = struct.unpack(">hhhh8s40s8shhh2s8shhl52s", field)
  297. field = dict(zip(_fieldkeys, fieldstruct))
  298. del field["_"]
  299. field["ntype"] = types[field["ntype"]]
  300. fl = field["field_length"]
  301. if field["ntype"] == "numeric" and ((fl < 2) or (fl > 8)):
  302. self.close()
  303. msg = f"Floating field width {fl} is not between 2 and 8."
  304. raise TypeError(msg)
  305. for k, v in field.items():
  306. try:
  307. field[k] = v.strip()
  308. except AttributeError:
  309. pass
  310. obs_length += field["field_length"]
  311. fields += [field]
  312. header = self._get_row()
  313. if not header == _correct_obs_header:
  314. self.close()
  315. raise ValueError("Observation header not found.")
  316. self.fields = fields
  317. self.record_length = obs_length
  318. self.record_start = self.filepath_or_buffer.tell()
  319. self.nobs = self._record_count()
  320. self.columns = [x["name"].decode() for x in self.fields]
  321. # Setup the dtype.
  322. dtypel = [
  323. ("s" + str(i), "S" + str(field["field_length"]))
  324. for i, field in enumerate(self.fields)
  325. ]
  326. dtype = np.dtype(dtypel)
  327. self._dtype = dtype
  328. def __next__(self):
  329. return self.read(nrows=self._chunksize or 1)
  330. def _record_count(self) -> int:
  331. """
  332. Get number of records in file.
  333. This is maybe suboptimal because we have to seek to the end of
  334. the file.
  335. Side effect: returns file position to record_start.
  336. """
  337. self.filepath_or_buffer.seek(0, 2)
  338. total_records_length = self.filepath_or_buffer.tell() - self.record_start
  339. if total_records_length % 80 != 0:
  340. warnings.warn("xport file may be corrupted")
  341. if self.record_length > 80:
  342. self.filepath_or_buffer.seek(self.record_start)
  343. return total_records_length // self.record_length
  344. self.filepath_or_buffer.seek(-80, 2)
  345. last_card = self.filepath_or_buffer.read(80)
  346. last_card = np.frombuffer(last_card, dtype=np.uint64)
  347. # 8 byte blank
  348. ix = np.flatnonzero(last_card == 2314885530818453536)
  349. if len(ix) == 0:
  350. tail_pad = 0
  351. else:
  352. tail_pad = 8 * len(ix)
  353. self.filepath_or_buffer.seek(self.record_start)
  354. return (total_records_length - tail_pad) // self.record_length
  355. def get_chunk(self, size=None):
  356. """
  357. Reads lines from Xport file and returns as dataframe
  358. Parameters
  359. ----------
  360. size : int, defaults to None
  361. Number of lines to read. If None, reads whole file.
  362. Returns
  363. -------
  364. DataFrame
  365. """
  366. if size is None:
  367. size = self._chunksize
  368. return self.read(nrows=size)
  369. def _missing_double(self, vec):
  370. v = vec.view(dtype="u1,u1,u2,u4")
  371. miss = (v["f1"] == 0) & (v["f2"] == 0) & (v["f3"] == 0)
  372. miss1 = (
  373. ((v["f0"] >= 0x41) & (v["f0"] <= 0x5A))
  374. | (v["f0"] == 0x5F)
  375. | (v["f0"] == 0x2E)
  376. )
  377. miss &= miss1
  378. return miss
  379. @Appender(_read_method_doc)
  380. def read(self, nrows=None):
  381. if nrows is None:
  382. nrows = self.nobs
  383. read_lines = min(nrows, self.nobs - self._lines_read)
  384. read_len = read_lines * self.record_length
  385. if read_len <= 0:
  386. self.close()
  387. raise StopIteration
  388. raw = self.filepath_or_buffer.read(read_len)
  389. data = np.frombuffer(raw, dtype=self._dtype, count=read_lines)
  390. df = pd.DataFrame(index=range(read_lines))
  391. for j, x in enumerate(self.columns):
  392. vec = data["s" + str(j)]
  393. ntype = self.fields[j]["ntype"]
  394. if ntype == "numeric":
  395. vec = _handle_truncated_float_vec(vec, self.fields[j]["field_length"])
  396. miss = self._missing_double(vec)
  397. v = _parse_float_vec(vec)
  398. v[miss] = np.nan
  399. elif self.fields[j]["ntype"] == "char":
  400. v = [y.rstrip() for y in vec]
  401. if self._encoding is not None:
  402. v = [y.decode(self._encoding) for y in v]
  403. df[x] = v
  404. if self._index is None:
  405. df.index = range(self._lines_read, self._lines_read + read_lines)
  406. else:
  407. df = df.set_index(self._index)
  408. self._lines_read += read_lines
  409. return df