Problems
- If the response time is too long, the users may have left this page, but the server is still consuming resources to process, and the result obtained is meaningless.
- Too long server-side processing will cost too much resources, resulting in a decline in concurrency, and even service unavailability
Why Go timeout control is necessary?
Go is normally used to write backend services. Generally, a request is completed by multiple serial or parallel subtasks. Each subtask may issue another internal request. When the request times out, it’s better to return quickly and release the occupied resources, such as goroutines, file descriptors, etc.
Common timeout control on the server side
- In-process processing
- Serving client requests, such as HTTP or RPC requests
- Calling other services, including calling RPC or accessing DB, etc.
What if there is no timeout control?
In order to simplify, we take a request function hardWork
as an example. It does not matter what it is used for. As the name suggests, it may be slow to process.
func hardWork(job interface{}) error {
time.Sleep(time.Minute)
return nil
}
func requestWork(ctx context.Context, job interface{}) error {
return hardWork(job)
}
When we use this kind of code to serve, the familiar image shows up for one minute. I guess most people can’t wait that long, but the server is still working on that even the page is closed. And the processing resources are not released to serve other requests.
This article will not go deep into other details, only focus on the timeout implementation.
Let's take a look at how timeout work and what kind of pitfalls shall we take care.
Version 1
Before read further, think about how to implement the timeout of the function.
Here is our first try:
func requestWork(ctx context.Context, job interface{}) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
done := make(chan error)
go func() {
done <- hardWork(job)
}()
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
}
Let's write a main function to test it.
func main() {
const total = 1000
var wg sync.WaitGroup
wg.Add(total)
now := time.Now()
for i := 0; i <total; i++ {
go func() {
defer wg.Done()
requestWork(context.Background(), "any")
}()
}
wg.Wait()
fmt.Println("elapsed:", time.Since(now))
}
Run it to we’ll see.
➜ go run timeout.go
elapsed: 2.005725931s
The timeout has taken effect. But is it all done?
goroutine leak
Let's add a line of code at the end of the main function to see how many goroutines are there.
time.Sleep(time.Minute*2)
fmt.Println("number of goroutines:", runtime.NumGoroutine())
Sleep 2 minutes is to wait for all tasks to be done, then we print the current number of goroutines. Let's run it and see the result.
➜ go run timeout.go
elapsed: 2.005725931s
number of goroutines: 1001
Oops, the goroutine leaked, let's see why this happens? First, the requestWork
function exits after a timeout of 2 seconds. When the requestWork
function exits, the done channel
isn't being received by any goroutine. When the code done <- hardWork(job)
is executed, it will always be stuck and cannot write. This kind of problem causes each timeout request to occupy a goroutine forever. This is a seious problem. Each goroutine takes 2-4K bytes memory, and when the memory is exhausted, the process exits unexpectedly.
So how to fix it? In fact, it is very simple, the only thing that we need to do is to set the buffer size
to 1 when make chan
, as below:
done := make(chan error, 1)
In this way, done <- hardWork(job)
can be written regardless of whether it timed out or not, without getting stuck in the goroutine. With this method, someone may ask if there will be a problem if you write to a channel that is not being received by any goroutine. In Go, the channel is not like the resources like file descriptor. It does not have to be closed, it is just an object, close (channel)
is only used to tell the receivers that there is nothing to write, no other purpose.
After changing one line of code, let's test it again.
➜ go run timeout.go
elapsed: 2.005655146s
number of goroutines: 1
The goroutine leaking problem is gone. Awesome!
panic cannot be captured
Let's change the code of the hardWork
function to
panic("oops")
Modify the main
function to catch the exceptions as below:
go func() {
defer func() {
if p := recover(); p != nil {
fmt.Println("oops, panic")
}
}()
defer wg.Done()
requestWork(context.Background(), "any")
}()
If you execute the code, you will find that panic cannot be captured. The reason is that other goroutines cannot capture panic generated in the goroutine from within requestWork
.
The solution is to add panicChan
to the requestWork
. Similarly, the buffer size of panicChan
needs to be 1, as below:
func requestWork(ctx context.Context, job interface{}) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
done := make(chan error, 1)
panicChan := make(chan interface{}, 1)
go func() {
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()
done <- hardWork(job)
}()
select {
case err := <-done:
return err
case p := <-panicChan:
panic(p)
case <-ctx.Done():
return ctx.Err()
}
}
With this code, panics can be handled outside of requestWork
.
Is the timeout period correct?
The above implementation of requestWork
ignores the incoming ctx
parameter. If ctx
has a timeout setting, we must pay attention to whether the incoming timeout is less than 2 seconds we given here. If it is, we need to use the given timeout setting in ctx
argument. Fortunately, context.WithTimeout
compares the timeout and sets to the less one, so we just modify the code like below:
ctx, cancel := context.WithTimeout(ctx, time.Second*2)
Data race
In our example, requestWork
just returns an error
parameter. If you need to return multiple parameters, we need to pay attention to data race
, which can be solved by mutex
. For specific implementation, please refer to go-zero/zrpc/internal/serverinterceptors/timeoutinterceptor.go, I won’t go deep into the details here.
Complete example
package main
import (
"context"
"fmt"
"runtime"
"sync"
"time"
)
func hardWork(job interface{}) error {
time.Sleep(time.Second * 10)
return nil
}
func requestWork(ctx context.Context, job interface{}) error {
ctx, cancel := contextx.ShrinkDeadline(ctx, time.Second*2)
defer cancel()
done := make(chan error, 1)
panicChan := make(chan interface{}, 1)
go func() {
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()
done <- hardWork(job)
}()
select {
case err := <-done:
return err
case p := <-panicChan:
panic(p)
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
const total = 10
var wg sync.WaitGroup
wg.Add(total)
now := time.Now()
for i := 0; i <total; i++ {
go func() {
defer func() {
if p := recover(); p != nil {
fmt.Println("oops, panic")
}
}()
defer wg.Done()
requestWork(context.Background(), "any")
}()
}
wg.Wait()
fmt.Println("elapsed:", time.Since(now))
time.Sleep(time.Second * 20)
fmt.Println("number of goroutines:", runtime.NumGoroutine())
}