Hunter0x7c7
2022-08-11 a82f9cb69f63aaeba40c024960deda7d75b9fece
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package command
 
//go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen
 
import (
    "context"
    "time"
 
    "google.golang.org/grpc"
 
    core "github.com/v2fly/v2ray-core/v5"
    "github.com/v2fly/v2ray-core/v5/common"
    "github.com/v2fly/v2ray-core/v5/features/routing"
    "github.com/v2fly/v2ray-core/v5/features/stats"
)
 
// routingServer is an implementation of RoutingService.
type routingServer struct {
    router       routing.Router
    routingStats stats.Channel
}
 
func (s *routingServer) GetBalancerInfo(ctx context.Context, request *GetBalancerInfoRequest) (*GetBalancerInfoResponse, error) {
    var ret GetBalancerInfoResponse
    ret.Balancer = &BalancerMsg{}
    if bo, ok := s.router.(routing.BalancerOverrider); ok {
        {
            res, err := bo.GetOverrideTarget(request.GetTag())
            if err != nil {
                return nil, err
            }
            ret.Balancer.Override = &OverrideInfo{
                Target: res,
            }
        }
    }
 
    if pt, ok := s.router.(routing.BalancerPrincipleTarget); ok {
        {
            res, err := pt.GetPrincipleTarget(request.GetTag())
            if err != nil {
                newError("unable to obtain principle target").Base(err).AtInfo().WriteToLog()
            } else {
                ret.Balancer.PrincipleTarget = &PrincipleTargetInfo{Tag: res}
            }
        }
    }
    return &ret, nil
}
 
func (s *routingServer) OverrideBalancerTarget(ctx context.Context, request *OverrideBalancerTargetRequest) (*OverrideBalancerTargetResponse, error) {
    if bo, ok := s.router.(routing.BalancerOverrider); ok {
        return &OverrideBalancerTargetResponse{}, bo.SetOverrideTarget(request.BalancerTag, request.Target)
    }
    return nil, newError("unsupported router implementation")
}
 
// NewRoutingServer creates a statistics service with statistics manager.
func NewRoutingServer(router routing.Router, routingStats stats.Channel) RoutingServiceServer {
    return &routingServer{
        router:       router,
        routingStats: routingStats,
    }
}
 
func (s *routingServer) TestRoute(ctx context.Context, request *TestRouteRequest) (*RoutingContext, error) {
    if request.RoutingContext == nil {
        return nil, newError("Invalid routing request.")
    }
    route, err := s.router.PickRoute(AsRoutingContext(request.RoutingContext))
    if err != nil {
        return nil, err
    }
    if request.PublishResult && s.routingStats != nil {
        ctx, _ := context.WithTimeout(context.Background(), 4*time.Second) // nolint: govet
        s.routingStats.Publish(ctx, route)
    }
    return AsProtobufMessage(request.FieldSelectors)(route), nil
}
 
func (s *routingServer) SubscribeRoutingStats(request *SubscribeRoutingStatsRequest, stream RoutingService_SubscribeRoutingStatsServer) error {
    if s.routingStats == nil {
        return newError("Routing statistics not enabled.")
    }
    genMessage := AsProtobufMessage(request.FieldSelectors)
    subscriber, err := stats.SubscribeRunnableChannel(s.routingStats)
    if err != nil {
        return err
    }
    defer stats.UnsubscribeClosableChannel(s.routingStats, subscriber)
    for {
        select {
        case value, ok := <-subscriber:
            if !ok {
                return newError("Upstream closed the subscriber channel.")
            }
            route, ok := value.(routing.Route)
            if !ok {
                return newError("Upstream sent malformed statistics.")
            }
            err := stream.Send(genMessage(route))
            if err != nil {
                return err
            }
        case <-stream.Context().Done():
            return stream.Context().Err()
        }
    }
}
 
func (s *routingServer) mustEmbedUnimplementedRoutingServiceServer() {}
 
type service struct {
    v *core.Instance
}
 
func (s *service) Register(server *grpc.Server) {
    common.Must(s.v.RequireFeatures(func(router routing.Router, stats stats.Manager) {
        RegisterRoutingServiceServer(server, NewRoutingServer(router, nil))
    }))
}
 
func init() {
    common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, cfg interface{}) (interface{}, error) {
        s := core.MustFromContext(ctx)
        return &service{v: s}, nil
    }))
}