release list paritioning
This commit is contained in:
parent
2dd9b34d28
commit
ad99c1b749
|
@ -18,11 +18,11 @@ package tiller
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"k8s.io/helm/pkg/proto/hapi/release"
|
||||
"k8s.io/helm/pkg/proto/hapi/services"
|
||||
relutil "k8s.io/helm/pkg/releaseutil"
|
||||
"regexp"
|
||||
)
|
||||
|
||||
// ListReleases lists the releases found by the server.
|
||||
|
@ -107,21 +107,52 @@ func (s *ReleaseServer) ListReleases(req *services.ListReleasesRequest, stream s
|
|||
rels = rels[0:req.Limit]
|
||||
l = int64(len(rels))
|
||||
}
|
||||
|
||||
for i := 0; i < min(len(rels), int(req.Limit)); i++ {
|
||||
res := &services.ListReleasesResponse{
|
||||
Next: next,
|
||||
Count: l,
|
||||
Total: total,
|
||||
Releases: []*release.Release{rels[i]},
|
||||
}
|
||||
res := &services.ListReleasesResponse{
|
||||
Next: next,
|
||||
Count: l,
|
||||
Total: total,
|
||||
}
|
||||
chunks := s.partition(rels[:min(len(rels), int(req.Limit))], maxMsgSize-proto.Size(res))
|
||||
for res.Releases = range chunks {
|
||||
if err := stream.Send(res); err != nil {
|
||||
for range chunks { // drain
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// partition packs releases into slices upto the capacity cap in bytes.
|
||||
func (s *ReleaseServer) partition(rels []*release.Release, cap int) <-chan []*release.Release {
|
||||
chunks := make(chan []*release.Release, 1)
|
||||
go func() {
|
||||
var (
|
||||
fill = 0 // fill is space available to fill
|
||||
size int // size is size of a release
|
||||
)
|
||||
var chunk []*release.Release
|
||||
for _, rls := range rels {
|
||||
if size = proto.Size(rls); size+fill > cap {
|
||||
// Over-cap, push chunk onto channel to send over gRPC stream
|
||||
s.Log("partitioned at %d with %d releases (cap=%d)", fill, len(chunk), cap)
|
||||
chunks <- chunk
|
||||
// reset paritioning state
|
||||
chunk = chunk[:0]
|
||||
fill = 0
|
||||
}
|
||||
chunk = append(chunk, rls)
|
||||
fill += size
|
||||
}
|
||||
if len(chunk) > 0 {
|
||||
// send remaining if any
|
||||
chunks <- chunk
|
||||
}
|
||||
close(chunks)
|
||||
}()
|
||||
return chunks
|
||||
}
|
||||
|
||||
func filterByNamespace(namespace string, rels []*release.Release) ([]*release.Release, error) {
|
||||
matches := []*release.Release{}
|
||||
for _, r := range rels {
|
||||
|
|
Loading…
Reference in New Issue