Kevin Wan
Kevin Wan's Blog

Kevin Wan's Blog

Reduce Response Time with MapReduce

Kevin Wan's photo
Kevin Wan

Published on Oct 4, 2021

6 min read

In microservice development, api gateways play the role of providing restful api to the outside world, and api data often depends on other services, and complex api will depend on multiple or even dozens of services. Although the time consumption of a single dependent service is generally low, if multiple services are serially dependent, then the time consumption of the entire api will be greatly increased.

So what is the means to optimize it? The first thing that comes to mind is to handle the dependencies in a concurrent manner so that the overall dependency time can be reduced, and the Go base library provides us with the WaitGroup tool for concurrency control, but the actual business scenario is that if one of the multiple dependencies goes wrong We expect to return immediately instead of waiting for all dependencies to finish executing and then return the result, and the assignment of variables in WaitGroup often requires locking, and each dependency function needs to add Add and Done for novices to be more error-prone.

Based on the above background, the go-zero framework provides us with a concurrency tool MapReduce, which works out of the box and does not require any initialization. Let's take a look at the following graph to see the time consumption comparison between using MapReduce and not using it:

Dependency Time-Consuming Comparison

For the same dependency, it takes 200ms to process serially, but the time taken with MapReduce is equal to 100ms, which means that MapReduce can greatly reduce the service time, and the effect will be more obvious as the dependency increases, reducing the processing time without increasing the pressure on the server.

Concurrent Processing Tools MapReduce

MapReduce is a software architecture proposed by Google for parallel computing of large-scale datasets, and the MapReduce tool in go-zero draws on this architectural idea

The MapReduce tool in the go-zero framework is mainly used for concurrent processing of bulk data to improve the performance of the service

mapreduce schematic

Let's demonstrate the usage of MapReduce with a few examples

MapReduce has three main parameters, the first parameter is generate to produce data, the second parameter is mapper to process the data, and the third parameter is reducer to aggregate the data after mapper and return it, and you can also set the number of concurrent threads through the opts option

Scenario 1: The results of certain functions often need to rely on multiple services, for example, the results of product details often rely on user services, inventory services, order services, etc., and generally the dependent services are provided in the form of rpc, in order to reduce the time consuming dependencies we often need to do parallel processing of dependencies

func productDetail(uid, pid int64) (*ProductDetail, error) {
    var pd ProductDetail
    err := mr.Finish(func() (err error) {
        pd.User, err = userRpc.User(uid)
        return
    }, func() (err error) {
        pd.Store, err = storeRpc.Store(pid)
        return
    }, func() (err error) {
        pd.Order, err = orderRpc.Order(pid)
        return
    })

    if err ! = nil {
        log.Printf("product detail error: %v", err)
        return nil, err
    }

    return &pd, nil
}

The example of returning product details relies on multiple services to get the data, so doing concurrent dependency processing is a big performance boost for the interface.

Scenario 2: Very often we need to process a batch of data, for example, for a batch of user ids, validate the legitimacy of each user and if there is an error in the validation process, the validation will fail and the returned result will be the validation of the legitimate user id

func checkLegal(uids []int64) ([]int64, error) {
    r, err := mr.MapReduce(func(source chan<- interface{}) {
        for _, uid := range uids {
            source <- uid
        }
    }, func(item interface{}, writer mr.Writer, cancel func(error)) {
        uid := item.(int64)
        ok, err := check(uid)
        if err ! = nil {
            cancel(err)
        }
        if ok {
            writer.Write(uid)
        }
    }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
        var uids []int64
        for p := range pipe {
            uids = append(uids, p.(int64))
        }
        writer.Write(uids)
    })
    if err ! = nil {
        log.Printf("check error: %v", err)
        return nil, err
    }

    return r.([]int64), nil
}

func check(uid int64) (bool, error) {
    // do something check user legal
    return true, nil
}

In this example, if there is an error in the check process, the check process will be ended by the cancel method, and error will be returned, and if a uid is false, the final result will not return the uid.

MapReduce usage notes

  • Both mapper and reducer can call cancel with the parameter error, which returns immediately after the call and returns the result as nil, error
  • Write is not called in the mapper, the item will not be aggregated by the reducer.
  • Write is not called in the reducer, the result is nil, ErrReduceNoOutput.
  • The reducer is single-threaded, and all the results from the mapper are aggregated serially here

Implementation principle analysis:

MapReduce first generates data through the buildSource method by executing generate (parameter is unbuffered channel) and returns the unbuffered channel from which the mapper will read data

func buildSource(generate GenerateFunc) chan interface{} {
    source := make(chan interface{})
    go func() {
        defer close(source)
        generate(source)
    }()

    return source
}

The cancel method is defined in the MapReduceWithSource method, which can be called in both mapper and reducer, and will return immediately after the main thread receives the close signal

cancel := once(func(err error) {
    if err ! = nil {
        retErr.Set(err)
    } else {
        // default error
        retErr.Set(ErrCancelWithNil)
    }

    drain(source)
    // call close(ouput) The main thread receives the Done signal and returns immediately
    finish()
})

In the mapperDispatcher method is called executeMappers, executeMappers consume the data generated by buildSource, each item will start a separate goroutine processing, the default maximum number of concurrency is 16, can be set by WithWorkers

var wg sync.WaitGroup
defer func() {
    wg.Wait() // make sure all items are processed
    close(collector)
}()

pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(collector, done) // write mapper's finished data to collector
for {
    select {
    case <-done: // When cancel is called it will trigger an immediate return
        return
    case pool <- lang.Placeholder: // control the maximum number of concurrent events
        item, ok := <-input
        if !ok {
            <-pool
            return
        }

        wg.Add(1)
        go func() {
            defer func() {
                wg.Done()
                <-pool
            }()

            mapper(item, writer) // process the item and call writer.Write to write the result to the corresponding channel in the collector
        }()
    }
}

The reducer single goroutine processes the data written to the collector by the mapper, and if the reducer does not call writer.Write manually, it will eventually execute the finish method to close the output to avoid deadlocks

go func() {
    defer func() {
        if r := recover(); r ! = nil {
            cancel(fmt.Errorf("%v", r))
        } else {
            finish()
        }
    }()
    reducer(collector, writer, cancel)
}()

The toolkit also provides a number of methods for different business scenarios, the implementation principle is similar to MapReduce, interested students can view the source code to learn

  • The MapReduceVoid function is similar to MapReduce but returns no results, only error.
  • Finish handles a fixed number of dependencies, returns error, and returns an error immediately
  • FinishVoid is similar to Finish method, no return value
  • Map only does generate and mapper processing, returns channel
  • MapVoid and Map function is similar, no return

This article introduces the MapReduce tool in the go-zero framework, which is very practical in real projects. Using the tools well can be of great help to improve service performance and development efficiency, so I hope this article can bring you some gains.

Project address

https://github.com/zeromicro/go-zero

Welcome to use and give a star!

 
Share this