fn.go 7.5 KB


  1. package fx
  2. import (
  3. "sort"
  4. "sync"
  5. "github.com/tal-tech/go-zero/core/collection"
  6. "github.com/tal-tech/go-zero/core/lang"
  7. "github.com/tal-tech/go-zero/core/threading"
  8. )
  9. const (
  10. defaultWorkers = 16
  11. minWorkers = 1
  12. )
  13. type (
  14. rxOptions struct {
  15. unlimitedWorkers bool
  16. workers int
  17. }
  18. FilterFunc func(item interface{}) bool
  19. ForAllFunc func(pipe <-chan interface{})
  20. ForEachFunc func(item interface{})
  21. GenerateFunc func(source chan<- interface{})
  22. KeyFunc func(item interface{}) interface{}
  23. LessFunc func(a, b interface{}) bool
  24. MapFunc func(item interface{}) interface{}
  25. Option func(opts *rxOptions)
  26. ParallelFunc func(item interface{})
  27. ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
  28. WalkFunc func(item interface{}, pipe chan<- interface{})
  29. Stream struct {
  30. source <-chan interface{}
  31. }
  32. )
  33. // From constructs a Stream from the given GenerateFunc.
  34. func From(generate GenerateFunc) Stream {
  35. source := make(chan interface{})
  36. threading.GoSafe(func() {
  37. defer close(source)
  38. generate(source)
  39. })
  40. return Range(source)
  41. }
  42. // Just converts the given arbitrary items to a Stream.
  43. func Just(items ...interface{}) Stream {
  44. source := make(chan interface{}, len(items))
  45. for _, item := range items {
  46. source <- item
  47. }
  48. close(source)
  49. return Range(source)
  50. }
  51. // Range converts the given channel to a Stream.
  52. func Range(source <-chan interface{}) Stream {
  53. return Stream{
  54. source: source,
  55. }
  56. }
  57. // Buffer buffers the items into a queue with size n.
  58. func (p Stream) Buffer(n int) Stream {
  59. if n < 0 {
  60. n = 0
  61. }
  62. source := make(chan interface{}, n)
  63. go func() {
  64. for item := range p.source {
  65. source <- item
  66. }
  67. close(source)
  68. }()
  69. return Range(source)
  70. }
  71. // Count counts the number of elements in the result.
  72. func (p Stream) Count() (count int) {
  73. for range p.source {
  74. count++
  75. }
  76. return
  77. }
  78. // Distinct removes the duplicated items base on the given KeyFunc.
  79. func (p Stream) Distinct(fn KeyFunc) Stream {
  80. source := make(chan interface{})
  81. threading.GoSafe(func() {
  82. defer close(source)
  83. keys := make(map[interface{}]lang.PlaceholderType)
  84. for item := range p.source {
  85. key := fn(item)
  86. if _, ok := keys[key]; !ok {
  87. source <- item
  88. keys[key] = lang.Placeholder
  89. }
  90. }
  91. })
  92. return Range(source)
  93. }
  94. // Done waits all upstreaming operations to be done.
  95. func (p Stream) Done() {
  96. for range p.source {
  97. }
  98. }
  99. // Filter filters the items by the given FilterFunc.
  100. func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
  101. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  102. if fn(item) {
  103. pipe <- item
  104. }
  105. }, opts...)
  106. }
  107. // ForAll handles the streaming elements from the source and no later streams.
  108. func (p Stream) ForAll(fn ForAllFunc) {
  109. fn(p.source)
  110. }
  111. // ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
  112. func (p Stream) ForEach(fn ForEachFunc) {
  113. for item := range p.source {
  114. fn(item)
  115. }
  116. }
  117. // Group groups the elements into different groups based on their keys.
  118. func (p Stream) Group(fn KeyFunc) Stream {
  119. groups := make(map[interface{}][]interface{})
  120. for item := range p.source {
  121. key := fn(item)
  122. groups[key] = append(groups[key], item)
  123. }
  124. source := make(chan interface{})
  125. go func() {
  126. for _, group := range groups {
  127. source <- group
  128. }
  129. close(source)
  130. }()
  131. return Range(source)
  132. }
  133. func (p Stream) Head(n int64) Stream {
  134. source := make(chan interface{})
  135. go func() {
  136. for item := range p.source {
  137. n--
  138. if n >= 0 {
  139. source <- item
  140. }
  141. if n == 0 {
  142. // let successive method go ASAP even we have more items to skip
  143. // why we don't just break the loop, because if break,
  144. // this former goroutine will block forever, which will cause goroutine leak.
  145. close(source)
  146. }
  147. }
  148. if n > 0 {
  149. close(source)
  150. }
  151. }()
  152. return Range(source)
  153. }
  154. // Maps converts each item to another corresponding item, which means it's a 1:1 model.
  155. func (p Stream) Map(fn MapFunc, opts ...Option) Stream {
  156. return p.Walk(func(item interface{}, pipe chan<- interface{}) {
  157. pipe <- fn(item)
  158. }, opts...)
  159. }
  160. // Merge merges all the items into a slice and generates a new stream.
  161. func (p Stream) Merge() Stream {
  162. var items []interface{}
  163. for item := range p.source {
  164. items = append(items, item)
  165. }
  166. source := make(chan interface{}, 1)
  167. source <- items
  168. close(source)
  169. return Range(source)
  170. }
  171. // Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
  172. func (p Stream) Parallel(fn ParallelFunc, opts ...Option) {
  173. p.Walk(func(item interface{}, pipe chan<- interface{}) {
  174. fn(item)
  175. }, opts...).Done()
  176. }
  177. // Reduce is a utility method to let the caller deal with the underlying channel.
  178. func (p Stream) Reduce(fn ReduceFunc) (interface{}, error) {
  179. return fn(p.source)
  180. }
  181. // Reverse reverses the elements in the stream.
  182. func (p Stream) Reverse() Stream {
  183. var items []interface{}
  184. for item := range p.source {
  185. items = append(items, item)
  186. }
  187. // reverse, official method
  188. for i := len(items)/2 - 1; i >= 0; i-- {
  189. opp := len(items) - 1 - i
  190. items[i], items[opp] = items[opp], items[i]
  191. }
  192. return Just(items...)
  193. }
  194. // Sort sorts the items from the underlying source.
  195. func (p Stream) Sort(less LessFunc) Stream {
  196. var items []interface{}
  197. for item := range p.source {
  198. items = append(items, item)
  199. }
  200. sort.Slice(items, func(i, j int) bool {
  201. return less(items[i], items[j])
  202. })
  203. return Just(items...)
  204. }
  205. func (p Stream) Tail(n int64) Stream {
  206. source := make(chan interface{})
  207. go func() {
  208. ring := collection.NewRing(int(n))
  209. for item := range p.source {
  210. ring.Add(item)
  211. }
  212. for _, item := range ring.Take() {
  213. source <- item
  214. }
  215. close(source)
  216. }()
  217. return Range(source)
  218. }
  219. // Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.
  220. func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream {
  221. option := buildOptions(opts...)
  222. if option.unlimitedWorkers {
  223. return p.walkUnlimited(fn, option)
  224. } else {
  225. return p.walkLimited(fn, option)
  226. }
  227. }
  228. func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
  229. pipe := make(chan interface{}, option.workers)
  230. go func() {
  231. var wg sync.WaitGroup
  232. pool := make(chan lang.PlaceholderType, option.workers)
  233. for {
  234. pool <- lang.Placeholder
  235. item, ok := <-p.source
  236. if !ok {
  237. <-pool
  238. break
  239. }
  240. wg.Add(1)
  241. // better to safely run caller defined method
  242. threading.GoSafe(func() {
  243. defer func() {
  244. wg.Done()
  245. <-pool
  246. }()
  247. fn(item, pipe)
  248. })
  249. }
  250. wg.Wait()
  251. close(pipe)
  252. }()
  253. return Range(pipe)
  254. }
  255. func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
  256. pipe := make(chan interface{}, defaultWorkers)
  257. go func() {
  258. var wg sync.WaitGroup
  259. for {
  260. item, ok := <-p.source
  261. if !ok {
  262. break
  263. }
  264. wg.Add(1)
  265. // better to safely run caller defined method
  266. threading.GoSafe(func() {
  267. defer wg.Done()
  268. fn(item, pipe)
  269. })
  270. }
  271. wg.Wait()
  272. close(pipe)
  273. }()
  274. return Range(pipe)
  275. }
  276. // UnlimitedWorkers lets the caller to use as many workers as the tasks.
  277. func UnlimitedWorkers() Option {
  278. return func(opts *rxOptions) {
  279. opts.unlimitedWorkers = true
  280. }
  281. }
  282. // WithWorkers lets the caller to customize the concurrent workers.
  283. func WithWorkers(workers int) Option {
  284. return func(opts *rxOptions) {
  285. if workers < minWorkers {
  286. opts.workers = minWorkers
  287. } else {
  288. opts.workers = workers
  289. }
  290. }
  291. }
  292. func buildOptions(opts ...Option) *rxOptions {
  293. options := newOptions()
  294. for _, opt := range opts {
  295. opt(options)
  296. }
  297. return options
  298. }
  299. func newOptions() *rxOptions {
  300. return &rxOptions{
  301. workers: defaultWorkers,
  302. }
  303. }