threads_test.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import unittest
  2. from pygame.threads import FuncResult, tmap, WorkerQueue, Empty, STOP
  3. from pygame import threads
  4. from pygame.compat import xrange_
  5. import time
  6. class WorkerQueueTypeTest(unittest.TestCase):
  7. def test_usage_with_different_functions(self):
  8. def f(x):
  9. return x+1
  10. def f2(x):
  11. return x+2
  12. wq = WorkerQueue()
  13. fr = FuncResult(f)
  14. fr2 = FuncResult(f2)
  15. wq.do(fr, 1)
  16. wq.do(fr2, 1)
  17. wq.wait()
  18. wq.stop()
  19. self.assertEqual(fr.result, 2)
  20. self.assertEqual(fr2.result, 3)
  21. def todo_test_do(self):
  22. # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.do:
  23. # puts a function on a queue for running later.
  24. #
  25. self.fail()
  26. def test_stop(self):
  27. """Ensure stop() stops the worker queue"""
  28. wq = WorkerQueue()
  29. self.assertGreater(len(wq.pool), 0)
  30. for t in wq.pool:
  31. self.assertTrue(t.isAlive())
  32. for i in xrange_(200):
  33. wq.do(lambda x: x+1, i)
  34. wq.stop()
  35. for t in wq.pool:
  36. self.assertFalse(t.isAlive())
  37. self.assertIs(wq.queue.get(), STOP)
  38. def todo_test_threadloop(self):
  39. # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.threadloop:
  40. # Loops until all of the tasks are finished.
  41. self.fail()
  42. def test_wait(self):
  43. # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.wait:
  44. # waits until all tasks are complete.
  45. wq = WorkerQueue()
  46. for i in xrange_(2000): wq.do(lambda x: x+1, i)
  47. wq.wait()
  48. self.assertRaises(Empty, wq.queue.get_nowait)
  49. wq.stop()
  50. class ThreadsModuleTest(unittest.TestCase):
  51. def todo_test_benchmark_workers(self):
  52. "tags:long_running"
  53. # __doc__ (as of 2008-06-28) for pygame.threads.benchmark_workers:
  54. # does a little test to see if workers are at all faster.
  55. # Returns the number of workers which works best.
  56. # Takes a little bit of time to run, so you should only really call
  57. # it once.
  58. # You can pass in benchmark data, and functions if you want.
  59. # a_bench_func - f(data)
  60. # the_data - data to work on.
  61. self.fail()
  62. def test_init(self):
  63. """Ensure init() sets up the worker queue"""
  64. threads.init(8)
  65. self.assertIsInstance(threads._wq, WorkerQueue)
  66. threads.quit()
  67. def test_quit(self):
  68. """Ensure quit() cleans up the worker queue"""
  69. threads.init(8)
  70. threads.quit()
  71. self.assertIsNone(threads._wq)
  72. def test_tmap(self):
  73. # __doc__ (as of 2008-06-28) for pygame.threads.tmap:
  74. # like map, but uses a thread pool to execute.
  75. # num_workers - the number of worker threads that will be used. If pool
  76. # is passed in, then the num_workers arg is ignored.
  77. # worker_queue - you can optionally pass in an existing WorkerQueue.
  78. # wait - True means that the results are returned when everything is finished.
  79. # False means that we return the [worker_queue, results] right away instead.
  80. # results, is returned as a list of FuncResult instances.
  81. # stop_on_error -
  82. func, data = lambda x:x+1, xrange_(100)
  83. tmapped = list(tmap(func, data))
  84. mapped = list(map(func, data))
  85. self.assertEqual(tmapped, mapped)
  86. def todo_test_tmap__None_func_and_multiple_sequences(self):
  87. """Using a None as func and multiple sequences"""
  88. self.fail()
  89. res = tmap(None, [1,2,3,4])
  90. res2 = tmap(None, [1,2,3,4], [22, 33, 44, 55])
  91. res3 = tmap(None, [1,2,3,4], [22, 33, 44, 55, 66])
  92. res4 = tmap(None, [1,2,3,4,5], [22, 33, 44, 55])
  93. self.assertEqual([1, 2, 3, 4], res)
  94. self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55)], res2)
  95. self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55), (None, 66)], res3)
  96. self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55), (5,None)], res4)
  97. def test_tmap__wait(self):
  98. r = range(1000)
  99. wq, results = tmap(lambda x:x, r, num_workers = 5, wait=False)
  100. wq.wait()
  101. r2 = map(lambda x:x.result, results)
  102. self.assertEqual(list(r), list(r2))
  103. def test_FuncResult(self):
  104. """Ensure FuncResult sets its result and exception attributes"""
  105. # Results are stored in result attribute
  106. fr = FuncResult(lambda x:x+1)
  107. fr(2)
  108. self.assertEqual(fr.result, 3)
  109. # Exceptions are store in exception attribute
  110. self.assertIsNone(fr.exception, "no exception should be raised")
  111. exception = ValueError('rast')
  112. def x(sdf):
  113. raise exception
  114. fr = FuncResult(x)
  115. fr(None)
  116. self.assertIs(fr.exception, exception)
  117. ################################################################################
  118. if __name__ == '__main__':
  119. unittest.main()