stream.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. package fx
  2. import (
  3. "sort"
  4. "sync"
  5. "github.com/wuntsong-org/go-zero-plus/core/collection"
  6. "github.com/wuntsong-org/go-zero-plus/core/lang"
  7. "github.com/wuntsong-org/go-zero-plus/core/threading"
  8. )
  9. const (
  10. defaultWorkers = 16
  11. minWorkers = 1
  12. )
  13. type (
  14. rxOptions struct {
  15. unlimitedWorkers bool
  16. workers int
  17. }
  18. // FilterFunc defines the method to filter a Stream.
  19. FilterFunc func(item any) bool
  20. // ForAllFunc defines the method to handle all elements in a Stream.
  21. ForAllFunc func(pipe <-chan any)
  22. // ForEachFunc defines the method to handle each element in a Stream.
  23. ForEachFunc func(item any)
  24. // GenerateFunc defines the method to send elements into a Stream.
  25. GenerateFunc func(source chan<- any)
  26. // KeyFunc defines the method to generate keys for the elements in a Stream.
  27. KeyFunc func(item any) any
  28. // LessFunc defines the method to compare the elements in a Stream.
  29. LessFunc func(a, b any) bool
  30. // MapFunc defines the method to map each element to another object in a Stream.
  31. MapFunc func(item any) any
  32. // Option defines the method to customize a Stream.
  33. Option func(opts *rxOptions)
  34. // ParallelFunc defines the method to handle elements parallelly.
  35. ParallelFunc func(item any)
  36. // ReduceFunc defines the method to reduce all the elements in a Stream.
  37. ReduceFunc func(pipe <-chan any) (any, error)
  38. // WalkFunc defines the method to walk through all the elements in a Stream.
  39. WalkFunc func(item any, pipe chan<- any)
  40. // A Stream is a stream that can be used to do stream processing.
  41. Stream struct {
  42. source <-chan any
  43. }
  44. )
  45. // Concat returns a concatenated Stream.
  46. func Concat(s Stream, others ...Stream) Stream {
  47. return s.Concat(others...)
  48. }
  49. // From constructs a Stream from the given GenerateFunc.
  50. func From(generate GenerateFunc) Stream {
  51. source := make(chan any)
  52. threading.GoSafe(func() {
  53. defer close(source)
  54. generate(source)
  55. })
  56. return Range(source)
  57. }
  58. // Just converts the given arbitrary items to a Stream.
  59. func Just(items ...any) Stream {
  60. source := make(chan any, len(items))
  61. for _, item := range items {
  62. source <- item
  63. }
  64. close(source)
  65. return Range(source)
  66. }
  67. // Range converts the given channel to a Stream.
  68. func Range(source <-chan any) Stream {
  69. return Stream{
  70. source: source,
  71. }
  72. }
  73. // AllMach returns whether all elements of this stream match the provided predicate.
  74. // May not evaluate the predicate on all elements if not necessary for determining the result.
  75. // If the stream is empty then true is returned and the predicate is not evaluated.
  76. func (s Stream) AllMach(predicate func(item any) bool) bool {
  77. for item := range s.source {
  78. if !predicate(item) {
  79. // make sure the former goroutine not block, and current func returns fast.
  80. go drain(s.source)
  81. return false
  82. }
  83. }
  84. return true
  85. }
  86. // AnyMach returns whether any elements of this stream match the provided predicate.
  87. // May not evaluate the predicate on all elements if not necessary for determining the result.
  88. // If the stream is empty then false is returned and the predicate is not evaluated.
  89. func (s Stream) AnyMach(predicate func(item any) bool) bool {
  90. for item := range s.source {
  91. if predicate(item) {
  92. // make sure the former goroutine not block, and current func returns fast.
  93. go drain(s.source)
  94. return true
  95. }
  96. }
  97. return false
  98. }
  99. // Buffer buffers the items into a queue with size n.
  100. // It can balance the producer and the consumer if their processing throughput don't match.
  101. func (s Stream) Buffer(n int) Stream {
  102. if n < 0 {
  103. n = 0
  104. }
  105. source := make(chan any, n)
  106. go func() {
  107. for item := range s.source {
  108. source <- item
  109. }
  110. close(source)
  111. }()
  112. return Range(source)
  113. }
  114. // Concat returns a Stream that concatenated other streams
  115. func (s Stream) Concat(others ...Stream) Stream {
  116. source := make(chan any)
  117. go func() {
  118. group := threading.NewRoutineGroup()
  119. group.Run(func() {
  120. for item := range s.source {
  121. source <- item
  122. }
  123. })
  124. for _, each := range others {
  125. each := each
  126. group.Run(func() {
  127. for item := range each.source {
  128. source <- item
  129. }
  130. })
  131. }
  132. group.Wait()
  133. close(source)
  134. }()
  135. return Range(source)
  136. }
  137. // Count counts the number of elements in the result.
  138. func (s Stream) Count() (count int) {
  139. for range s.source {
  140. count++
  141. }
  142. return
  143. }
  144. // Distinct removes the duplicated items base on the given KeyFunc.
  145. func (s Stream) Distinct(fn KeyFunc) Stream {
  146. source := make(chan any)
  147. threading.GoSafe(func() {
  148. defer close(source)
  149. keys := make(map[any]lang.PlaceholderType)
  150. for item := range s.source {
  151. key := fn(item)
  152. if _, ok := keys[key]; !ok {
  153. source <- item
  154. keys[key] = lang.Placeholder
  155. }
  156. }
  157. })
  158. return Range(source)
  159. }
  160. // Done waits all upstreaming operations to be done.
  161. func (s Stream) Done() {
  162. drain(s.source)
  163. }
  164. // Filter filters the items by the given FilterFunc.
  165. func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream {
  166. return s.Walk(func(item any, pipe chan<- any) {
  167. if fn(item) {
  168. pipe <- item
  169. }
  170. }, opts...)
  171. }
  172. // First returns the first item, nil if no items.
  173. func (s Stream) First() any {
  174. for item := range s.source {
  175. // make sure the former goroutine not block, and current func returns fast.
  176. go drain(s.source)
  177. return item
  178. }
  179. return nil
  180. }
  181. // ForAll handles the streaming elements from the source and no later streams.
  182. func (s Stream) ForAll(fn ForAllFunc) {
  183. fn(s.source)
  184. // avoid goroutine leak on fn not consuming all items.
  185. go drain(s.source)
  186. }
  187. // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
  188. func (s Stream) ForEach(fn ForEachFunc) {
  189. for item := range s.source {
  190. fn(item)
  191. }
  192. }
  193. // Group groups the elements into different groups based on their keys.
  194. func (s Stream) Group(fn KeyFunc) Stream {
  195. groups := make(map[any][]any)
  196. for item := range s.source {
  197. key := fn(item)
  198. groups[key] = append(groups[key], item)
  199. }
  200. source := make(chan any)
  201. go func() {
  202. for _, group := range groups {
  203. source <- group
  204. }
  205. close(source)
  206. }()
  207. return Range(source)
  208. }
  209. // Head returns the first n elements in p.
  210. func (s Stream) Head(n int64) Stream {
  211. if n < 1 {
  212. panic("n must be greater than 0")
  213. }
  214. source := make(chan any)
  215. go func() {
  216. for item := range s.source {
  217. n--
  218. if n >= 0 {
  219. source <- item
  220. }
  221. if n == 0 {
  222. // let successive method go ASAP even we have more items to skip
  223. close(source)
  224. // why we don't just break the loop, and drain to consume all items.
  225. // because if breaks, this former goroutine will block forever,
  226. // which will cause goroutine leak.
  227. drain(s.source)
  228. }
  229. }
  230. // not enough items in s.source, but we need to let successive method to go ASAP.
  231. if n > 0 {
  232. close(source)
  233. }
  234. }()
  235. return Range(source)
  236. }
  237. // Last returns the last item, or nil if no items.
  238. func (s Stream) Last() (item any) {
  239. for item = range s.source {
  240. }
  241. return
  242. }
  243. // Map converts each item to another corresponding item, which means it's a 1:1 model.
  244. func (s Stream) Map(fn MapFunc, opts ...Option) Stream {
  245. return s.Walk(func(item any, pipe chan<- any) {
  246. pipe <- fn(item)
  247. }, opts...)
  248. }
  249. // Max returns the maximum item from the underlying source.
  250. func (s Stream) Max(less LessFunc) any {
  251. var max any
  252. for item := range s.source {
  253. if max == nil || less(max, item) {
  254. max = item
  255. }
  256. }
  257. return max
  258. }
  259. // Merge merges all the items into a slice and generates a new stream.
  260. func (s Stream) Merge() Stream {
  261. var items []any
  262. for item := range s.source {
  263. items = append(items, item)
  264. }
  265. source := make(chan any, 1)
  266. source <- items
  267. close(source)
  268. return Range(source)
  269. }
  270. // Min returns the minimum item from the underlying source.
  271. func (s Stream) Min(less LessFunc) any {
  272. var min any
  273. for item := range s.source {
  274. if min == nil || less(item, min) {
  275. min = item
  276. }
  277. }
  278. return min
  279. }
  280. // NoneMatch returns whether all elements of this stream don't match the provided predicate.
  281. // May not evaluate the predicate on all elements if not necessary for determining the result.
  282. // If the stream is empty then true is returned and the predicate is not evaluated.
  283. func (s Stream) NoneMatch(predicate func(item any) bool) bool {
  284. for item := range s.source {
  285. if predicate(item) {
  286. // make sure the former goroutine not block, and current func returns fast.
  287. go drain(s.source)
  288. return false
  289. }
  290. }
  291. return true
  292. }
  293. // Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
  294. func (s Stream) Parallel(fn ParallelFunc, opts ...Option) {
  295. s.Walk(func(item any, pipe chan<- any) {
  296. fn(item)
  297. }, opts...).Done()
  298. }
  299. // Reduce is an utility method to let the caller deal with the underlying channel.
  300. func (s Stream) Reduce(fn ReduceFunc) (any, error) {
  301. return fn(s.source)
  302. }
  303. // Reverse reverses the elements in the stream.
  304. func (s Stream) Reverse() Stream {
  305. var items []any
  306. for item := range s.source {
  307. items = append(items, item)
  308. }
  309. // reverse, official method
  310. for i := len(items)/2 - 1; i >= 0; i-- {
  311. opp := len(items) - 1 - i
  312. items[i], items[opp] = items[opp], items[i]
  313. }
  314. return Just(items...)
  315. }
  316. // Skip returns a Stream that skips size elements.
  317. func (s Stream) Skip(n int64) Stream {
  318. if n < 0 {
  319. panic("n must not be negative")
  320. }
  321. if n == 0 {
  322. return s
  323. }
  324. source := make(chan any)
  325. go func() {
  326. for item := range s.source {
  327. n--
  328. if n >= 0 {
  329. continue
  330. } else {
  331. source <- item
  332. }
  333. }
  334. close(source)
  335. }()
  336. return Range(source)
  337. }
  338. // Sort sorts the items from the underlying source.
  339. func (s Stream) Sort(less LessFunc) Stream {
  340. var items []any
  341. for item := range s.source {
  342. items = append(items, item)
  343. }
  344. sort.Slice(items, func(i, j int) bool {
  345. return less(items[i], items[j])
  346. })
  347. return Just(items...)
  348. }
  349. // Split splits the elements into chunk with size up to n,
  350. // might be less than n on tailing elements.
  351. func (s Stream) Split(n int) Stream {
  352. if n < 1 {
  353. panic("n should be greater than 0")
  354. }
  355. source := make(chan any)
  356. go func() {
  357. var chunk []any
  358. for item := range s.source {
  359. chunk = append(chunk, item)
  360. if len(chunk) == n {
  361. source <- chunk
  362. chunk = nil
  363. }
  364. }
  365. if chunk != nil {
  366. source <- chunk
  367. }
  368. close(source)
  369. }()
  370. return Range(source)
  371. }
  372. // Tail returns the last n elements in p.
  373. func (s Stream) Tail(n int64) Stream {
  374. if n < 1 {
  375. panic("n should be greater than 0")
  376. }
  377. source := make(chan any)
  378. go func() {
  379. ring := collection.NewRing(int(n))
  380. for item := range s.source {
  381. ring.Add(item)
  382. }
  383. for _, item := range ring.Take() {
  384. source <- item
  385. }
  386. close(source)
  387. }()
  388. return Range(source)
  389. }
  390. // Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.
  391. func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream {
  392. option := buildOptions(opts...)
  393. if option.unlimitedWorkers {
  394. return s.walkUnlimited(fn, option)
  395. }
  396. return s.walkLimited(fn, option)
  397. }
  398. func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
  399. pipe := make(chan any, option.workers)
  400. go func() {
  401. var wg sync.WaitGroup
  402. pool := make(chan lang.PlaceholderType, option.workers)
  403. for item := range s.source {
  404. // important, used in another goroutine
  405. val := item
  406. pool <- lang.Placeholder
  407. wg.Add(1)
  408. // better to safely run caller defined method
  409. threading.GoSafe(func() {
  410. defer func() {
  411. wg.Done()
  412. <-pool
  413. }()
  414. fn(val, pipe)
  415. })
  416. }
  417. wg.Wait()
  418. close(pipe)
  419. }()
  420. return Range(pipe)
  421. }
  422. func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
  423. pipe := make(chan any, option.workers)
  424. go func() {
  425. var wg sync.WaitGroup
  426. for item := range s.source {
  427. // important, used in another goroutine
  428. val := item
  429. wg.Add(1)
  430. // better to safely run caller defined method
  431. threading.GoSafe(func() {
  432. defer wg.Done()
  433. fn(val, pipe)
  434. })
  435. }
  436. wg.Wait()
  437. close(pipe)
  438. }()
  439. return Range(pipe)
  440. }
  441. // UnlimitedWorkers lets the caller use as many workers as the tasks.
  442. func UnlimitedWorkers() Option {
  443. return func(opts *rxOptions) {
  444. opts.unlimitedWorkers = true
  445. }
  446. }
  447. // WithWorkers lets the caller customize the concurrent workers.
  448. func WithWorkers(workers int) Option {
  449. return func(opts *rxOptions) {
  450. if workers < minWorkers {
  451. opts.workers = minWorkers
  452. } else {
  453. opts.workers = workers
  454. }
  455. }
  456. }
  457. // buildOptions returns a rxOptions with given customizations.
  458. func buildOptions(opts ...Option) *rxOptions {
  459. options := newOptions()
  460. for _, opt := range opts {
  461. opt(options)
  462. }
  463. return options
  464. }
  465. // drain drains the given channel.
  466. func drain(channel <-chan any) {
  467. for range channel {
  468. }
  469. }
  470. // newOptions returns a default rxOptions.
  471. func newOptions() *rxOptions {
  472. return &rxOptions{
  473. workers: defaultWorkers,
  474. }
  475. }