123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483 |
- import sys
- import re
- import weakref
- import gc
- import ctypes
- import unittest
- import pygame
- from pygame.bufferproxy import BufferProxy
- from pygame.compat import as_bytes
- try:
- BufferError
- except NameError:
- from pygame import BufferError
- class BufferProxyTest(unittest.TestCase):
- view_keywords = {'shape': (5, 4, 3),
- 'typestr': '|u1',
- 'data': (0, True),
- 'strides': (4, 20, 1)}
- def test_module_name(self):
- self.assertEqual(pygame.bufferproxy.__name__,
- "pygame.bufferproxy")
- def test_class_name(self):
- self.assertEqual(BufferProxy.__name__, "BufferProxy")
- def test___array_struct___property(self):
- kwds = self.view_keywords
- v = BufferProxy(kwds)
- d = pygame.get_array_interface(v)
- self.assertEqual(len(d), 5)
- self.assertEqual(d['version'], 3)
- self.assertEqual(d['shape'], kwds['shape'])
- self.assertEqual(d['typestr'], kwds['typestr'])
- self.assertEqual(d['data'], kwds['data'])
- self.assertEqual(d['strides'], kwds['strides'])
- def test___array_interface___property(self):
- kwds = self.view_keywords
- v = BufferProxy(kwds)
- d = v.__array_interface__
- self.assertEqual(len(d), 5)
- self.assertEqual(d['version'], 3)
- self.assertEqual(d['shape'], kwds['shape'])
- self.assertEqual(d['typestr'], kwds['typestr'])
- self.assertEqual(d['data'], kwds['data'])
- self.assertEqual(d['strides'], kwds['strides'])
- def test_parent_property(self):
- kwds = dict(self.view_keywords)
- p = []
- kwds['parent'] = p
- v = BufferProxy(kwds)
- self.assertIs(v.parent, p)
- def test_before(self):
- def callback(parent):
- success.append(parent is p)
- class MyException(Exception):
- pass
- def raise_exception(parent):
- raise MyException("Just a test.")
- kwds = dict(self.view_keywords)
- p = []
- kwds['parent'] = p
- # For array interface
- success = []
- kwds['before'] = callback
- v = BufferProxy(kwds)
- self.assertEqual(len(success), 0)
- d = v.__array_interface__
- self.assertEqual(len(success), 1)
- self.assertTrue(success[0])
- d = v.__array_interface__
- self.assertEqual(len(success), 1)
- d = v = None
- gc.collect()
- self.assertEqual(len(success), 1)
- # For array struct
- success = []
- kwds['before'] = callback
- v = BufferProxy(kwds)
- self.assertEqual(len(success), 0)
- c = v.__array_struct__
- self.assertEqual(len(success), 1)
- self.assertTrue(success[0])
- c = v.__array_struct__
- self.assertEqual(len(success), 1)
- c = v = None
- gc.collect()
- self.assertEqual(len(success), 1)
- # Callback raises an exception
- kwds['before'] = raise_exception
- v = BufferProxy(kwds)
- self.assertRaises(MyException, lambda : v.__array_struct__)
- def test_after(self):
- def callback(parent):
- success.append(parent is p)
- kwds = dict(self.view_keywords)
- p = []
- kwds['parent'] = p
- # For array interface
- success = []
- kwds['after'] = callback
- v = BufferProxy(kwds)
- self.assertEqual(len(success), 0)
- d = v.__array_interface__
- self.assertEqual(len(success), 0)
- d = v.__array_interface__
- self.assertEqual(len(success), 0)
- d = v = None
- gc.collect()
- self.assertEqual(len(success), 1)
- self.assertTrue(success[0])
- # For array struct
- success = []
- kwds['after'] = callback
- v = BufferProxy(kwds)
- self.assertEqual(len(success), 0)
- c = v.__array_struct__
- self.assertEqual(len(success), 0)
- c = v.__array_struct__
- self.assertEqual(len(success), 0)
- c = v = None
- gc.collect()
- self.assertEqual(len(success), 1)
- self.assertTrue(success[0])
- def test_attribute(self):
- v = BufferProxy(self.view_keywords)
- self.assertRaises(AttributeError, getattr, v, 'undefined')
- v.undefined = 12;
- self.assertEqual(v.undefined, 12)
- del v.undefined
- self.assertRaises(AttributeError, getattr, v, 'undefined')
- def test_weakref(self):
- v = BufferProxy(self.view_keywords)
- weak_v = weakref.ref(v)
- self.assertIs(weak_v(), v)
- v = None
- gc.collect()
- self.assertIsNone(weak_v())
- def test_gc(self):
- """refcount agnostic check that contained objects are freed"""
- def before_callback(parent):
- return r[0]
- def after_callback(parent):
- return r[1]
- class Obj(object):
- pass
- p = Obj()
- a = Obj()
- r = [Obj(), Obj()]
- weak_p = weakref.ref(p)
- weak_a = weakref.ref(a)
- weak_r0 = weakref.ref(r[0])
- weak_r1 = weakref.ref(r[1])
- weak_before = weakref.ref(before_callback)
- weak_after = weakref.ref(after_callback)
- kwds = dict(self.view_keywords)
- kwds['parent'] = p
- kwds['before'] = before_callback
- kwds['after'] = after_callback
- v = BufferProxy(kwds)
- v.some_attribute = a
- weak_v = weakref.ref(v)
- kwds = p = a = before_callback = after_callback = None
- gc.collect()
- self.assertTrue(weak_p() is not None)
- self.assertTrue(weak_a() is not None)
- self.assertTrue(weak_before() is not None)
- self.assertTrue(weak_after() is not None)
- v = None
- [gc.collect() for x in range(4)]
- self.assertTrue(weak_v() is None)
- self.assertTrue(weak_p() is None)
- self.assertTrue(weak_a() is None)
- self.assertTrue(weak_before() is None)
- self.assertTrue(weak_after() is None)
- self.assertTrue(weak_r0() is not None)
- self.assertTrue(weak_r1() is not None)
- r = None
- gc.collect()
- self.assertTrue(weak_r0() is None)
- self.assertTrue(weak_r1() is None)
- # Cycle removal
- kwds = dict(self.view_keywords)
- kwds['parent'] = []
- v = BufferProxy(kwds)
- v.some_attribute = v
- tracked = True
- for o in gc.get_objects():
- if o is v:
- break
- else:
- tracked = False
- self.assertTrue(tracked)
- kwds['parent'].append(v)
- kwds = None
- gc.collect()
- n1 = len(gc.garbage)
- v = None
- gc.collect()
- n2 = len(gc.garbage)
- self.assertEqual(n2, n1)
- def test_c_api(self):
- api = pygame.bufferproxy._PYGAME_C_API
- api_type = type(pygame.base._PYGAME_C_API)
- self.assertIsInstance(api, api_type)
- def test_repr(self):
- v = BufferProxy(self.view_keywords)
- cname = BufferProxy.__name__
- oname, ovalue = re.findall(r"<([^)]+)\(([^)]+)\)>", repr(v))[0]
- self.assertEqual(oname, cname)
- self.assertEqual(v.length, int(ovalue))
- def test_subclassing(self):
- class MyBufferProxy(BufferProxy):
- def __repr__(self):
- return "*%s*" % (BufferProxy.__repr__(self),)
- kwds = dict(self.view_keywords)
- kwds['parent'] = 0
- v = MyBufferProxy(kwds)
- self.assertEqual(v.parent, 0)
- r = repr(v)
- self.assertEqual(r[:2], '*<')
- self.assertEqual(r[-2:], '>*')
- @unittest.skipIf(not pygame.HAVE_NEWBUF, 'newbuf not implemented')
- def NEWBUF_test_newbuf(self):
- from ctypes import string_at
- from pygame.tests.test_utils import buftools
- Exporter = buftools.Exporter
- Importer = buftools.Importer
- exp = Exporter((10,), 'B', readonly=True)
- b = BufferProxy(exp)
- self.assertEqual(b.length, exp.len)
- self.assertEqual(b.raw, string_at(exp.buf, exp.len))
- d = b.__array_interface__
- try:
- self.assertEqual(d['typestr'], '|u1')
- self.assertEqual(d['shape'], exp.shape)
- self.assertEqual(d['strides'], exp.strides)
- self.assertEqual(d['data'], (exp.buf, True))
- finally:
- d = None
- exp = Exporter((3,), '=h')
- b = BufferProxy(exp)
- self.assertEqual(b.length, exp.len)
- self.assertEqual(b.raw, string_at(exp.buf, exp.len))
- d = b.__array_interface__
- try:
- lil_endian = pygame.get_sdl_byteorder() == pygame.LIL_ENDIAN
- f = '{}i{}'.format('<' if lil_endian else '>', exp.itemsize)
- self.assertEqual(d['typestr'], f)
- self.assertEqual(d['shape'], exp.shape)
- self.assertEqual(d['strides'], exp.strides)
- self.assertEqual(d['data'], (exp.buf, False))
- finally:
- d = None
- exp = Exporter((10, 2), '=i')
- b = BufferProxy(exp)
- imp = Importer(b, buftools.PyBUF_RECORDS)
- self.assertTrue(imp.obj is b)
- self.assertEqual(imp.buf, exp.buf)
- self.assertEqual(imp.ndim, exp.ndim)
- self.assertEqual(imp.format, exp.format)
- self.assertEqual(imp.readonly, exp.readonly)
- self.assertEqual(imp.itemsize, exp.itemsize)
- self.assertEqual(imp.len, exp.len)
- self.assertEqual(imp.shape, exp.shape)
- self.assertEqual(imp.strides, exp.strides)
- self.assertTrue(imp.suboffsets is None)
- d = {'typestr': '|u1',
- 'shape': (10,),
- 'strides': (1,),
- 'data': (9, True)} # 9? Will not reading the data anyway.
- b = BufferProxy(d)
- imp = Importer(b, buftools.PyBUF_SIMPLE)
- self.assertTrue(imp.obj is b)
- self.assertEqual(imp.buf, 9)
- self.assertEqual(imp.len, 10)
- self.assertEqual(imp.format, None)
- self.assertEqual(imp.itemsize, 1)
- self.assertEqual(imp.ndim, 0)
- self.assertTrue(imp.readonly)
- self.assertTrue(imp.shape is None)
- self.assertTrue(imp.strides is None)
- self.assertTrue(imp.suboffsets is None)
- try:
- pygame.bufferproxy.get_segcount
- except AttributeError:
- pass
- else:
- def test_oldbuf_arg(self):
- self.OLDBUF_test_oldbuf_arg()
- def OLDBUF_test_oldbuf_arg(self):
- from pygame.bufferproxy import (get_segcount, get_read_buffer,
- get_write_buffer)
- content = as_bytes('\x01\x00\x00\x02') * 12
- memory = ctypes.create_string_buffer(content)
- memaddr = ctypes.addressof(memory)
- def raise_exception(o):
- raise ValueError("An exception")
- bf = BufferProxy({'shape': (len(content),),
- 'typestr': '|u1',
- 'data': (memaddr, False),
- 'strides': (1,)})
- seglen, segaddr = get_read_buffer(bf, 0)
- self.assertEqual(segaddr, 0)
- self.assertEqual(seglen, 0)
- seglen, segaddr = get_write_buffer(bf, 0)
- self.assertEqual(segaddr, 0)
- self.assertEqual(seglen, 0)
- segcount, buflen = get_segcount(bf)
- self.assertEqual(segcount, 1)
- self.assertEqual(buflen, len(content))
- seglen, segaddr = get_read_buffer(bf, 0)
- self.assertEqual(segaddr, memaddr)
- self.assertEqual(seglen, len(content))
- seglen, segaddr = get_write_buffer(bf, 0)
- self.assertEqual(segaddr, memaddr)
- self.assertEqual(seglen, len(content))
- bf = BufferProxy({'shape': (len(content),),
- 'typestr': '|u1',
- 'data': (memaddr, True),
- 'strides': (1,)})
- segcount, buflen = get_segcount(bf)
- self.assertEqual(segcount, 1)
- self.assertEqual(buflen, len(content))
- seglen, segaddr = get_read_buffer(bf, 0)
- self.assertEqual(segaddr, memaddr)
- self.assertEqual(seglen, len(content))
- self.assertRaises(ValueError, get_write_buffer, bf, 0)
- bf = BufferProxy({'shape': (len(content),),
- 'typestr': '|u1',
- 'data': (memaddr, True),
- 'strides': (1,),
- 'before': raise_exception})
- segcount, buflen = get_segcount(bf)
- self.assertEqual(segcount, 0)
- self.assertEqual(buflen, 0)
- bf = BufferProxy({'shape': (3, 4),
- 'typestr': '|u4',
- 'data': (memaddr, True),
- 'strides': (12, 4)})
- segcount, buflen = get_segcount(bf)
- self.assertEqual(segcount, 3 * 4)
- self.assertEqual(buflen, 3 * 4 * 4)
- for i in range(0, 4):
- seglen, segaddr = get_read_buffer(bf, i)
- self.assertEqual(segaddr, memaddr + i * 4)
- self.assertEqual(seglen, 4)
- class BufferProxyLegacyTest(unittest.TestCase):
- content = as_bytes('\x01\x00\x00\x02') * 12
- buffer = ctypes.create_string_buffer(content)
- data = (ctypes.addressof(buffer), True)
- def test_length(self):
- # __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.length:
- # The size of the buffer data in bytes.
- bf = BufferProxy({'shape': (3, 4),
- 'typestr': '|u4',
- 'data': self.data,
- 'strides': (12, 4)})
- self.assertEqual(bf.length, len(self.content))
- bf = BufferProxy({'shape': (3, 3),
- 'typestr': '|u4',
- 'data': self.data,
- 'strides': (12, 4)})
- self.assertEqual(bf.length, 3*3*4)
- def test_raw(self):
- # __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.raw:
- # The raw buffer data as string. The string may contain NUL bytes.
- bf = BufferProxy({'shape': (len(self.content),),
- 'typestr': '|u1',
- 'data': self.data})
- self.assertEqual(bf.raw, self.content)
- bf = BufferProxy({'shape': (3, 4),
- 'typestr': '|u4',
- 'data': self.data,
- 'strides': (4, 12)})
- self.assertEqual(bf.raw, self.content)
- bf = BufferProxy({'shape': (3, 4),
- 'typestr': '|u1',
- 'data': self.data,
- 'strides': (16, 4)})
- self.assertRaises(ValueError, getattr, bf, 'raw')
- def test_write(self):
- # __doc__ (as of 2008-08-02) for pygame.bufferproxy.BufferProxy.write:
- # B.write (bufferproxy, buffer, offset) -> None
- #
- # Writes raw data to the bufferproxy.
- #
- # Writes the raw data from buffer to the BufferProxy object, starting
- # at the specified offset within the BufferProxy.
- # If the length of the passed buffer exceeds the length of the
- # BufferProxy (reduced by the offset), an IndexError will be raised.
- from ctypes import c_byte, sizeof, addressof, string_at, memset
- nullbyte = '\x00'.encode('latin_1')
- Buf = c_byte * 10
- data_buf = Buf(*range(1, 3 * sizeof(Buf) + 1, 3))
- data = string_at(data_buf, sizeof(data_buf))
- buf = Buf()
- bp = BufferProxy({'typestr': '|u1',
- 'shape': (sizeof(buf),),
- 'data': (addressof(buf), False)})
- try:
- self.assertEqual(bp.raw, nullbyte * sizeof(Buf))
- bp.write(data)
- self.assertEqual(bp.raw, data)
- memset(buf, 0, sizeof(buf))
- bp.write(data[:3], 2)
- raw = bp.raw
- self.assertEqual(raw[:2], nullbyte * 2)
- self.assertEqual(raw[2:5], data[:3])
- self.assertEqual(raw[5:], nullbyte * (sizeof(Buf) - 5))
- bp.write(data[:3], bp.length - 3)
- raw = bp.raw
- self.assertEqual(raw[-3:], data[:3])
- self.assertRaises(IndexError, bp.write, data, 1)
- self.assertRaises(IndexError, bp.write, data[:5], -1)
- self.assertRaises(IndexError, bp.write, data[:5], bp.length)
- self.assertRaises(TypeError, bp.write, 12)
- bp = BufferProxy({'typestr': '|u1',
- 'shape': (sizeof(buf),),
- 'data': (addressof(buf), True)})
- self.assertRaises(pygame.BufferError, bp.write, '123'.encode('latin_1'))
- finally:
- # Make sure bp is garbage collected before buf
- bp = None
- gc.collect()
- if __name__ == '__main__':
- unittest.main()
|