fn.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. package fx
  2. import (
  3. "sort"
  4. "sync"
  5. "github.com/tal-tech/go-zero/core/collection"
  6. "github.com/tal-tech/go-zero/core/lang"
  7. "github.com/tal-tech/go-zero/core/threading"
  8. )
  9. const (
  10. defaultWorkers = 16
  11. minWorkers = 1
  12. )
  13. type (
  14. rxOptions struct {
  15. unlimitedWorkers bool
  16. workers int
  17. }
  18. FilterFunc func(item interface{}) bool
  19. ForAllFunc func(pipe <-chan interface{})
  20. ForEachFunc func(item interface{})
  21. GenerateFunc func(source chan<- interface{})
  22. KeyFunc func(item interface{}) interface{}
  23. LessFunc func(a, b interface{}) bool
  24. MapFunc func(item interface{}) interface{}
  25. Option func(opts *rxOptions)
  26. ParallelFunc func(item interface{})
  27. ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
  28. WalkFunc func(item interface{}, pipe chan<- interface{})
  29. Stream struct {
  30. source <-chan interface{}
  31. }
  32. )
  33. // From constructs a Stream from the given GenerateFunc.
  34. func From(generate GenerateFunc) Stream {
  35. source := make(chan interface{})
  36. threading.GoSafe(func() {
  37. defer close(source)
  38. generate(source)
  39. })
  40. return Range(source)
  41. }
  42. // Just converts the given arbitrary items to a Stream.
  43. func Just(items ...interface{}) Stream {
  44. source := make(chan interface{}, len(items))
  45. for _, item := range items {
  46. source <- item
  47. }
  48. close(source)
  49. return Range(source)
  50. }
  51. // Range converts the given channel to a Stream.
  52. func Range(source <-chan interface{}) Stream {
  53. return Stream{
  54. source: source,
  55. }
  56. }
  57. // Buffer buffers the items into a queue with size n.
  58. func (p Stream) Buffer(n int) Stream {
  59. if n < 0 {
  60. n = 0
  61. }
  62. source := make(chan interface{}, n)
  63. go func() {
  64. for item := range p.source {
  65. source <- item
  66. }
  67. close(source)
  68. }()
  69. return Range(source)
  70. }
  71. // Count counts the number of elements in the result.
  72. func (p Stream) Count() (count int) {
  73. for range p.source {
  74. count++
  75. }
  76. return
  77. }
  78. // Distinct removes the duplicated items base on the given KeyFunc.
  79. func (p Stream) Distinct(fn KeyFunc) Stream {
  80. source := make(chan interface{})
  81. threading.GoSafe(func() {
  82. defer close(source)
  83. keys := make(map[interface{}]lang.PlaceholderType)
  84. for item := range p.source {
  85. key := fn(item)
  86. if _, ok := keys[key]; !ok {
  87. source <- item
  88. keys[key] = lang.Placeholder
  89. }
  90. }
  91. })
  92. return Range(source)
  93. }
  94. // Done waits all upstreaming operations to be done.
  95. func (p Stream) Done() {
  96. for range p.source {
  97. }
  98. }
  99. // Filter filters the items by the given FilterFunc.
  100. func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
  101. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  102. if fn(item) {
  103. pipe <- item
  104. }
  105. }, opts...)
  106. }
  107. // ForAll handles the streaming elements from the source and no later streams.
  108. func (p Stream) ForAll(fn ForAllFunc) {
  109. fn(p.source)
  110. }
  111. // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
  112. func (p Stream) ForEach(fn ForEachFunc) {
  113. for item := range p.source {
  114. fn(item)
  115. }
  116. }
  117. // Group groups the elements into different groups based on their keys.
  118. func (p Stream) Group(fn KeyFunc) Stream {
  119. groups := make(map[interface{}][]interface{})
  120. for item := range p.source {
  121. key := fn(item)
  122. groups[key] = append(groups[key], item)
  123. }
  124. source := make(chan interface{})
  125. go func() {
  126. for _, group := range groups {
  127. source <- group
  128. }
  129. close(source)
  130. }()
  131. return Range(source)
  132. }
  133. func (p Stream) Head(n int64) Stream {
  134. if n < 1 {
  135. panic("n must be greater than 0")
  136. }
  137. source := make(chan interface{})
  138. go func() {
  139. for item := range p.source {
  140. n--
  141. if n >= 0 {
  142. source <- item
  143. }
  144. if n == 0 {
  145. // let successive method go ASAP even we have more items to skip
  146. // why we don't just break the loop, because if break,
  147. // this former goroutine will block forever, which will cause goroutine leak.
  148. close(source)
  149. }
  150. }
  151. if n > 0 {
  152. close(source)
  153. }
  154. }()
  155. return Range(source)
  156. }
  157. // Maps converts each item to another corresponding item, which means it's a 1:1 model.
  158. func (p Stream) Map(fn MapFunc, opts ...Option) Stream {
  159. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  160. pipe <- fn(item)
  161. }, opts...)
  162. }
  163. // Merge merges all the items into a slice and generates a new stream.
  164. func (p Stream) Merge() Stream {
  165. var items []interface{}
  166. for item := range p.source {
  167. items = append(items, item)
  168. }
  169. source := make(chan interface{}, 1)
  170. source <- items
  171. close(source)
  172. return Range(source)
  173. }
  174. // Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
  175. func (p Stream) Parallel(fn ParallelFunc, opts ...Option) {
  176. p.Walk(func(item interface{}, pipe chan<- interface{}) {
  177. fn(item)
  178. }, opts...).Done()
  179. }
  180. // Reduce is a utility method to let the caller deal with the underlying channel.
  181. func (p Stream) Reduce(fn ReduceFunc) (interface{}, error) {
  182. return fn(p.source)
  183. }
  184. // Reverse reverses the elements in the stream.
  185. func (p Stream) Reverse() Stream {
  186. var items []interface{}
  187. for item := range p.source {
  188. items = append(items, item)
  189. }
  190. // reverse, official method
  191. for i := len(items)/2 - 1; i >= 0; i-- {
  192. opp := len(items) - 1 - i
  193. items[i], items[opp] = items[opp], items[i]
  194. }
  195. return Just(items...)
  196. }
  197. // Sort sorts the items from the underlying source.
  198. func (p Stream) Sort(less LessFunc) Stream {
  199. var items []interface{}
  200. for item := range p.source {
  201. items = append(items, item)
  202. }
  203. sort.Slice(items, func(i, j int) bool {
  204. return less(items[i], items[j])
  205. })
  206. return Just(items...)
  207. }
  208. func (p Stream) Tail(n int64) Stream {
  209. if n < 1 {
  210. panic("n should be greater than 0")
  211. }
  212. source := make(chan interface{})
  213. go func() {
  214. ring := collection.NewRing(int(n))
  215. for item := range p.source {
  216. ring.Add(item)
  217. }
  218. for _, item := range ring.Take() {
  219. source <- item
  220. }
  221. close(source)
  222. }()
  223. return Range(source)
  224. }
  225. // Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.
  226. func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream {
  227. option := buildOptions(opts...)
  228. if option.unlimitedWorkers {
  229. return p.walkUnlimited(fn, option)
  230. } else {
  231. return p.walkLimited(fn, option)
  232. }
  233. }
  234. func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
  235. pipe := make(chan interface{}, option.workers)
  236. go func() {
  237. var wg sync.WaitGroup
  238. pool := make(chan lang.PlaceholderType, option.workers)
  239. for {
  240. pool <- lang.Placeholder
  241. item, ok := <-p.source
  242. if !ok {
  243. <-pool
  244. break
  245. }
  246. wg.Add(1)
  247. // better to safely run caller defined method
  248. threading.GoSafe(func() {
  249. defer func() {
  250. wg.Done()
  251. <-pool
  252. }()
  253. fn(item, pipe)
  254. })
  255. }
  256. wg.Wait()
  257. close(pipe)
  258. }()
  259. return Range(pipe)
  260. }
  261. func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
  262. pipe := make(chan interface{}, defaultWorkers)
  263. go func() {
  264. var wg sync.WaitGroup
  265. for {
  266. item, ok := <-p.source
  267. if !ok {
  268. break
  269. }
  270. wg.Add(1)
  271. // better to safely run caller defined method
  272. threading.GoSafe(func() {
  273. defer wg.Done()
  274. fn(item, pipe)
  275. })
  276. }
  277. wg.Wait()
  278. close(pipe)
  279. }()
  280. return Range(pipe)
  281. }
  282. // UnlimitedWorkers lets the caller to use as many workers as the tasks.
  283. func UnlimitedWorkers() Option {
  284. return func(opts *rxOptions) {
  285. opts.unlimitedWorkers = true
  286. }
  287. }
  288. // WithWorkers lets the caller to customize the concurrent workers.
  289. func WithWorkers(workers int) Option {
  290. return func(opts *rxOptions) {
  291. if workers < minWorkers {
  292. opts.workers = minWorkers
  293. } else {
  294. opts.workers = workers
  295. }
  296. }
  297. }
  298. func buildOptions(opts ...Option) *rxOptions {
  299. options := newOptions()
  300. for _, opt := range opts {
  301. opt(options)
  302. }
  303. return options
  304. }
  305. func newOptions() *rxOptions {
  306. return &rxOptions{
  307. workers: defaultWorkers,
  308. }
  309. }