bufferproxy_test.py 16 KB


  1. import sys
  2. import re
  3. import weakref
  4. import gc
  5. import ctypes
  6. import unittest
  7. import pygame
  8. from pygame.bufferproxy import BufferProxy
  9. from pygame.compat import as_bytes
  10. try:
  11. BufferError
  12. except NameError:
  13. from pygame import BufferError
  14. class BufferProxyTest(unittest.TestCase):
  15. view_keywords = {'shape': (5, 4, 3),
  16. 'typestr': '|u1',
  17. 'data': (0, True),
  18. 'strides': (4, 20, 1)}
  19. def test_module_name(self):
  20. self.assertEqual(pygame.bufferproxy.__name__,
  21. "pygame.bufferproxy")
  22. def test_class_name(self):
  23. self.assertEqual(BufferProxy.__name__, "BufferProxy")
  24. def test___array_struct___property(self):
  25. kwds = self.view_keywords
  26. v = BufferProxy(kwds)
  27. d = pygame.get_array_interface(v)
  28. self.assertEqual(len(d), 5)
  29. self.assertEqual(d['version'], 3)
  30. self.assertEqual(d['shape'], kwds['shape'])
  31. self.assertEqual(d['typestr'], kwds['typestr'])
  32. self.assertEqual(d['data'], kwds['data'])
  33. self.assertEqual(d['strides'], kwds['strides'])
  34. def test___array_interface___property(self):
  35. kwds = self.view_keywords
  36. v = BufferProxy(kwds)
  37. d = v.__array_interface__
  38. self.assertEqual(len(d), 5)
  39. self.assertEqual(d['version'], 3)
  40. self.assertEqual(d['shape'], kwds['shape'])
  41. self.assertEqual(d['typestr'], kwds['typestr'])
  42. self.assertEqual(d['data'], kwds['data'])
  43. self.assertEqual(d['strides'], kwds['strides'])
  44. def test_parent_property(self):
  45. kwds = dict(self.view_keywords)
  46. p = []
  47. kwds['parent'] = p
  48. v = BufferProxy(kwds)
  49. self.assertIs(v.parent, p)
  50. def test_before(self):
  51. def callback(parent):
  52. success.append(parent is p)
  53. class MyException(Exception):
  54. pass
  55. def raise_exception(parent):
  56. raise MyException("Just a test.")
  57. kwds = dict(self.view_keywords)
  58. p = []
  59. kwds['parent'] = p
  60. # For array interface
  61. success = []
  62. kwds['before'] = callback
  63. v = BufferProxy(kwds)
  64. self.assertEqual(len(success), 0)
  65. d = v.__array_interface__
  66. self.assertEqual(len(success), 1)
  67. self.assertTrue(success[0])
  68. d = v.__array_interface__
  69. self.assertEqual(len(success), 1)
  70. d = v = None
  71. gc.collect()
  72. self.assertEqual(len(success), 1)
  73. # For array struct
  74. success = []
  75. kwds['before'] = callback
  76. v = BufferProxy(kwds)
  77. self.assertEqual(len(success), 0)
  78. c = v.__array_struct__
  79. self.assertEqual(len(success), 1)
  80. self.assertTrue(success[0])
  81. c = v.__array_struct__
  82. self.assertEqual(len(success), 1)
  83. c = v = None
  84. gc.collect()
  85. self.assertEqual(len(success), 1)
  86. # Callback raises an exception
  87. kwds['before'] = raise_exception
  88. v = BufferProxy(kwds)
  89. self.assertRaises(MyException, lambda : v.__array_struct__)
  90. def test_after(self):
  91. def callback(parent):
  92. success.append(parent is p)
  93. kwds = dict(self.view_keywords)
  94. p = []
  95. kwds['parent'] = p
  96. # For array interface
  97. success = []
  98. kwds['after'] = callback
  99. v = BufferProxy(kwds)
  100. self.assertEqual(len(success), 0)
  101. d = v.__array_interface__
  102. self.assertEqual(len(success), 0)
  103. d = v.__array_interface__
  104. self.assertEqual(len(success), 0)
  105. d = v = None
  106. gc.collect()
  107. self.assertEqual(len(success), 1)
  108. self.assertTrue(success[0])
  109. # For array struct
  110. success = []
  111. kwds['after'] = callback
  112. v = BufferProxy(kwds)
  113. self.assertEqual(len(success), 0)
  114. c = v.__array_struct__
  115. self.assertEqual(len(success), 0)
  116. c = v.__array_struct__
  117. self.assertEqual(len(success), 0)
  118. c = v = None
  119. gc.collect()
  120. self.assertEqual(len(success), 1)
  121. self.assertTrue(success[0])
  122. def test_attribute(self):
  123. v = BufferProxy(self.view_keywords)
  124. self.assertRaises(AttributeError, getattr, v, 'undefined')
  125. v.undefined = 12;
  126. self.assertEqual(v.undefined, 12)
  127. del v.undefined
  128. self.assertRaises(AttributeError, getattr, v, 'undefined')
  129. def test_weakref(self):
  130. v = BufferProxy(self.view_keywords)
  131. weak_v = weakref.ref(v)
  132. self.assertIs(weak_v(), v)
  133. v = None
  134. gc.collect()
  135. self.assertIsNone(weak_v())
  136. def test_gc(self):
  137. """refcount agnostic check that contained objects are freed"""
  138. def before_callback(parent):
  139. return r[0]
  140. def after_callback(parent):
  141. return r[1]
  142. class Obj(object):
  143. pass
  144. p = Obj()
  145. a = Obj()
  146. r = [Obj(), Obj()]
  147. weak_p = weakref.ref(p)
  148. weak_a = weakref.ref(a)
  149. weak_r0 = weakref.ref(r[0])
  150. weak_r1 = weakref.ref(r[1])
  151. weak_before = weakref.ref(before_callback)
  152. weak_after = weakref.ref(after_callback)
  153. kwds = dict(self.view_keywords)
  154. kwds['parent'] = p
  155. kwds['before'] = before_callback
  156. kwds['after'] = after_callback
  157. v = BufferProxy(kwds)
  158. v.some_attribute = a
  159. weak_v = weakref.ref(v)
  160. kwds = p = a = before_callback = after_callback = None
  161. gc.collect()
  162. self.assertTrue(weak_p() is not None)
  163. self.assertTrue(weak_a() is not None)
  164. self.assertTrue(weak_before() is not None)
  165. self.assertTrue(weak_after() is not None)
  166. v = None
  167. [gc.collect() for x in range(4)]
  168. self.assertTrue(weak_v() is None)
  169. self.assertTrue(weak_p() is None)
  170. self.assertTrue(weak_a() is None)
  171. self.assertTrue(weak_before() is None)
  172. self.assertTrue(weak_after() is None)
  173. self.assertTrue(weak_r0() is not None)
  174. self.assertTrue(weak_r1() is not None)
  175. r = None
  176. gc.collect()
  177. self.assertTrue(weak_r0() is None)
  178. self.assertTrue(weak_r1() is None)
  179. # Cycle removal
  180. kwds = dict(self.view_keywords)
  181. kwds['parent'] = []
  182. v = BufferProxy(kwds)
  183. v.some_attribute = v
  184. tracked = True
  185. for o in gc.get_objects():
  186. if o is v:
  187. break
  188. else:
  189. tracked = False
  190. self.assertTrue(tracked)
  191. kwds['parent'].append(v)
  192. kwds = None
  193. gc.collect()
  194. n1 = len(gc.garbage)
  195. v = None
  196. gc.collect()
  197. n2 = len(gc.garbage)
  198. self.assertEqual(n2, n1)
  199. def test_c_api(self):
  200. api = pygame.bufferproxy._PYGAME_C_API
  201. api_type = type(pygame.base._PYGAME_C_API)
  202. self.assertIsInstance(api, api_type)
  203. def test_repr(self):
  204. v = BufferProxy(self.view_keywords)
  205. cname = BufferProxy.__name__
  206. oname, ovalue = re.findall(r"<([^)]+)\(([^)]+)\)>", repr(v))[0]
  207. self.assertEqual(oname, cname)
  208. self.assertEqual(v.length, int(ovalue))
  209. def test_subclassing(self):
  210. class MyBufferProxy(BufferProxy):
  211. def __repr__(self):
  212. return "*%s*" % (BufferProxy.__repr__(self),)
  213. kwds = dict(self.view_keywords)
  214. kwds['parent'] = 0
  215. v = MyBufferProxy(kwds)
  216. self.assertEqual(v.parent, 0)
  217. r = repr(v)
  218. self.assertEqual(r[:2], '*<')
  219. self.assertEqual(r[-2:], '>*')
  220. @unittest.skipIf(not pygame.HAVE_NEWBUF, 'newbuf not implemented')
  221. def NEWBUF_test_newbuf(self):
  222. from ctypes import string_at
  223. from pygame.tests.test_utils import buftools
  224. Exporter = buftools.Exporter
  225. Importer = buftools.Importer
  226. exp = Exporter((10,), 'B', readonly=True)
  227. b = BufferProxy(exp)
  228. self.assertEqual(b.length, exp.len)
  229. self.assertEqual(b.raw, string_at(exp.buf, exp.len))
  230. d = b.__array_interface__
  231. try:
  232. self.assertEqual(d['typestr'], '|u1')
  233. self.assertEqual(d['shape'], exp.shape)
  234. self.assertEqual(d['strides'], exp.strides)
  235. self.assertEqual(d['data'], (exp.buf, True))
  236. finally:
  237. d = None
  238. exp = Exporter((3,), '=h')
  239. b = BufferProxy(exp)
  240. self.assertEqual(b.length, exp.len)
  241. self.assertEqual(b.raw, string_at(exp.buf, exp.len))
  242. d = b.__array_interface__
  243. try:
  244. lil_endian = pygame.get_sdl_byteorder() == pygame.LIL_ENDIAN
  245. f = '{}i{}'.format('<' if lil_endian else '>', exp.itemsize)
  246. self.assertEqual(d['typestr'], f)
  247. self.assertEqual(d['shape'], exp.shape)
  248. self.assertEqual(d['strides'], exp.strides)
  249. self.assertEqual(d['data'], (exp.buf, False))
  250. finally:
  251. d = None
  252. exp = Exporter((10, 2), '=i')
  253. b = BufferProxy(exp)
  254. imp = Importer(b, buftools.PyBUF_RECORDS)
  255. self.assertTrue(imp.obj is b)
  256. self.assertEqual(imp.buf, exp.buf)
  257. self.assertEqual(imp.ndim, exp.ndim)
  258. self.assertEqual(imp.format, exp.format)
  259. self.assertEqual(imp.readonly, exp.readonly)
  260. self.assertEqual(imp.itemsize, exp.itemsize)
  261. self.assertEqual(imp.len, exp.len)
  262. self.assertEqual(imp.shape, exp.shape)
  263. self.assertEqual(imp.strides, exp.strides)
  264. self.assertTrue(imp.suboffsets is None)
  265. d = {'typestr': '|u1',
  266. 'shape': (10,),
  267. 'strides': (1,),
  268. 'data': (9, True)} # 9? Will not reading the data anyway.
  269. b = BufferProxy(d)
  270. imp = Importer(b, buftools.PyBUF_SIMPLE)
  271. self.assertTrue(imp.obj is b)
  272. self.assertEqual(imp.buf, 9)
  273. self.assertEqual(imp.len, 10)
  274. self.assertEqual(imp.format, None)
  275. self.assertEqual(imp.itemsize, 1)
  276. self.assertEqual(imp.ndim, 0)
  277. self.assertTrue(imp.readonly)
  278. self.assertTrue(imp.shape is None)
  279. self.assertTrue(imp.strides is None)
  280. self.assertTrue(imp.suboffsets is None)
  281. try:
  282. pygame.bufferproxy.get_segcount
  283. except AttributeError:
  284. pass
  285. else:
  286. def test_oldbuf_arg(self):
  287. self.OLDBUF_test_oldbuf_arg()
  288. def OLDBUF_test_oldbuf_arg(self):
  289. from pygame.bufferproxy import (get_segcount, get_read_buffer,
  290. get_write_buffer)
  291. content = as_bytes('\x01\x00\x00\x02') * 12
  292. memory = ctypes.create_string_buffer(content)
  293. memaddr = ctypes.addressof(memory)
  294. def raise_exception(o):
  295. raise ValueError("An exception")
  296. bf = BufferProxy({'shape': (len(content),),
  297. 'typestr': '|u1',
  298. 'data': (memaddr, False),
  299. 'strides': (1,)})
  300. seglen, segaddr = get_read_buffer(bf, 0)
  301. self.assertEqual(segaddr, 0)
  302. self.assertEqual(seglen, 0)
  303. seglen, segaddr = get_write_buffer(bf, 0)
  304. self.assertEqual(segaddr, 0)
  305. self.assertEqual(seglen, 0)
  306. segcount, buflen = get_segcount(bf)
  307. self.assertEqual(segcount, 1)
  308. self.assertEqual(buflen, len(content))
  309. seglen, segaddr = get_read_buffer(bf, 0)
  310. self.assertEqual(segaddr, memaddr)
  311. self.assertEqual(seglen, len(content))
  312. seglen, segaddr = get_write_buffer(bf, 0)
  313. self.assertEqual(segaddr, memaddr)
  314. self.assertEqual(seglen, len(content))
  315. bf = BufferProxy({'shape': (len(content),),
  316. 'typestr': '|u1',
  317. 'data': (memaddr, True),
  318. 'strides': (1,)})
  319. segcount, buflen = get_segcount(bf)
  320. self.assertEqual(segcount, 1)
  321. self.assertEqual(buflen, len(content))
  322. seglen, segaddr = get_read_buffer(bf, 0)
  323. self.assertEqual(segaddr, memaddr)
  324. self.assertEqual(seglen, len(content))
  325. self.assertRaises(ValueError, get_write_buffer, bf, 0)
  326. bf = BufferProxy({'shape': (len(content),),
  327. 'typestr': '|u1',
  328. 'data': (memaddr, True),
  329. 'strides': (1,),
  330. 'before': raise_exception})
  331. segcount, buflen = get_segcount(bf)
  332. self.assertEqual(segcount, 0)
  333. self.assertEqual(buflen, 0)
  334. bf = BufferProxy({'shape': (3, 4),
  335. 'typestr': '|u4',
  336. 'data': (memaddr, True),
  337. 'strides': (12, 4)})
  338. segcount, buflen = get_segcount(bf)
  339. self.assertEqual(segcount, 3 * 4)
  340. self.assertEqual(buflen, 3 * 4 * 4)
  341. for i in range(0, 4):
  342. seglen, segaddr = get_read_buffer(bf, i)
  343. self.assertEqual(segaddr, memaddr + i * 4)
  344. self.assertEqual(seglen, 4)
  345. class BufferProxyLegacyTest(unittest.TestCase):
  346. content = as_bytes('\x01\x00\x00\x02') * 12
  347. buffer = ctypes.create_string_buffer(content)
  348. data = (ctypes.addressof(buffer), True)
  349. def test_length(self):
  350. # __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.length:
  351. # The size of the buffer data in bytes.
  352. bf = BufferProxy({'shape': (3, 4),
  353. 'typestr': '|u4',
  354. 'data': self.data,
  355. 'strides': (12, 4)})
  356. self.assertEqual(bf.length, len(self.content))
  357. bf = BufferProxy({'shape': (3, 3),
  358. 'typestr': '|u4',
  359. 'data': self.data,
  360. 'strides': (12, 4)})
  361. self.assertEqual(bf.length, 3*3*4)
  362. def test_raw(self):
  363. # __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.raw:
  364. # The raw buffer data as string. The string may contain NUL bytes.
  365. bf = BufferProxy({'shape': (len(self.content),),
  366. 'typestr': '|u1',
  367. 'data': self.data})
  368. self.assertEqual(bf.raw, self.content)
  369. bf = BufferProxy({'shape': (3, 4),
  370. 'typestr': '|u4',
  371. 'data': self.data,
  372. 'strides': (4, 12)})
  373. self.assertEqual(bf.raw, self.content)
  374. bf = BufferProxy({'shape': (3, 4),
  375. 'typestr': '|u1',
  376. 'data': self.data,
  377. 'strides': (16, 4)})
  378. self.assertRaises(ValueError, getattr, bf, 'raw')
  379. def test_write(self):
  380. # __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.write:
  381. # B.write (bufferproxy, buffer, offset) -> None
  382. #
  383. # Writes raw data to the bufferproxy.
  384. #
  385. # Writes the raw data from buffer to the BufferProxy object, starting
  386. # at the specified offset within the BufferProxy.
  387. # If the length of the passed buffer exceeds the length of the
  388. # BufferProxy (reduced by the offset), an IndexError will be raised.
  389. from ctypes import c_byte, sizeof, addressof, string_at, memset
  390. nullbyte = '\x00'.encode('latin_1')
  391. Buf = c_byte * 10
  392. data_buf = Buf(*range(1, 3 * sizeof(Buf) + 1, 3))
  393. data = string_at(data_buf, sizeof(data_buf))
  394. buf = Buf()
  395. bp = BufferProxy({'typestr': '|u1',
  396. 'shape': (sizeof(buf),),
  397. 'data': (addressof(buf), False)})
  398. try:
  399. self.assertEqual(bp.raw, nullbyte * sizeof(Buf))
  400. bp.write(data)
  401. self.assertEqual(bp.raw, data)
  402. memset(buf, 0, sizeof(buf))
  403. bp.write(data[:3], 2)
  404. raw = bp.raw
  405. self.assertEqual(raw[:2], nullbyte * 2)
  406. self.assertEqual(raw[2:5], data[:3])
  407. self.assertEqual(raw[5:], nullbyte * (sizeof(Buf) - 5))
  408. bp.write(data[:3], bp.length - 3)
  409. raw = bp.raw
  410. self.assertEqual(raw[-3:], data[:3])
  411. self.assertRaises(IndexError, bp.write, data, 1)
  412. self.assertRaises(IndexError, bp.write, data[:5], -1)
  413. self.assertRaises(IndexError, bp.write, data[:5], bp.length)
  414. self.assertRaises(TypeError, bp.write, 12)
  415. bp = BufferProxy({'typestr': '|u1',
  416. 'shape': (sizeof(buf),),
  417. 'data': (addressof(buf), True)})
  418. self.assertRaises(pygame.BufferError, bp.write, '123'.encode('latin_1'))
  419. finally:
  420. # Make sure bp is garbage collected before buf
  421. bp = None
  422. gc.collect()
  423. if __name__ == '__main__':
  424. unittest.main()