Hunter0x7c7
2022-08-11 b8230139fb40edea387617b6accd8371e37eda58
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package mux_test
 
import (
    "context"
    "testing"
    "time"
 
    "github.com/golang/mock/gomock"
 
    "github.com/v2fly/v2ray-core/v5/common"
    "github.com/v2fly/v2ray-core/v5/common/errors"
    "github.com/v2fly/v2ray-core/v5/common/mux"
    "github.com/v2fly/v2ray-core/v5/common/net"
    "github.com/v2fly/v2ray-core/v5/common/session"
    "github.com/v2fly/v2ray-core/v5/testing/mocks"
    "github.com/v2fly/v2ray-core/v5/transport"
    "github.com/v2fly/v2ray-core/v5/transport/pipe"
)
 
func TestIncrementalPickerFailure(t *testing.T) {
    mockCtl := gomock.NewController(t)
    defer mockCtl.Finish()
 
    mockWorkerFactory := mocks.NewMuxClientWorkerFactory(mockCtl)
    mockWorkerFactory.EXPECT().Create().Return(nil, errors.New("test"))
 
    picker := mux.IncrementalWorkerPicker{
        Factory: mockWorkerFactory,
    }
 
    _, err := picker.PickAvailable()
    if err == nil {
        t.Error("expected error, but nil")
    }
}
 
func TestClientWorkerEOF(t *testing.T) {
    reader, writer := pipe.New(pipe.WithoutSizeLimit())
    common.Must(writer.Close())
 
    worker, err := mux.NewClientWorker(transport.Link{Reader: reader, Writer: writer}, mux.ClientStrategy{})
    common.Must(err)
 
    time.Sleep(time.Millisecond * 500)
 
    f := worker.Dispatch(context.Background(), nil)
    if f {
        t.Error("expected failed dispatching, but actually not")
    }
}
 
func TestClientWorkerClose(t *testing.T) {
    mockCtl := gomock.NewController(t)
    defer mockCtl.Finish()
 
    r1, w1 := pipe.New(pipe.WithoutSizeLimit())
    worker1, err := mux.NewClientWorker(transport.Link{
        Reader: r1,
        Writer: w1,
    }, mux.ClientStrategy{
        MaxConcurrency: 4,
        MaxConnection:  4,
    })
    common.Must(err)
 
    r2, w2 := pipe.New(pipe.WithoutSizeLimit())
    worker2, err := mux.NewClientWorker(transport.Link{
        Reader: r2,
        Writer: w2,
    }, mux.ClientStrategy{
        MaxConcurrency: 4,
        MaxConnection:  4,
    })
    common.Must(err)
 
    factory := mocks.NewMuxClientWorkerFactory(mockCtl)
    gomock.InOrder(
        factory.EXPECT().Create().Return(worker1, nil),
        factory.EXPECT().Create().Return(worker2, nil),
    )
 
    picker := &mux.IncrementalWorkerPicker{
        Factory: factory,
    }
    manager := &mux.ClientManager{
        Picker: picker,
    }
 
    tr1, tw1 := pipe.New(pipe.WithoutSizeLimit())
    ctx1 := session.ContextWithOutbound(context.Background(), &session.Outbound{
        Target: net.TCPDestination(net.DomainAddress("www.v2fly.org"), 80),
    })
    common.Must(manager.Dispatch(ctx1, &transport.Link{
        Reader: tr1,
        Writer: tw1,
    }))
    defer tw1.Close()
 
    common.Must(w1.Close())
 
    time.Sleep(time.Millisecond * 500)
    if !worker1.Closed() {
        t.Error("worker1 is not finished")
    }
 
    tr2, tw2 := pipe.New(pipe.WithoutSizeLimit())
    ctx2 := session.ContextWithOutbound(context.Background(), &session.Outbound{
        Target: net.TCPDestination(net.DomainAddress("www.v2fly.org"), 80),
    })
    common.Must(manager.Dispatch(ctx2, &transport.Link{
        Reader: tr2,
        Writer: tw2,
    }))
    defer tw2.Close()
 
    common.Must(w2.Close())
}