Hunter0x7c7
2022-08-11 a82f9cb69f63aaeba40c024960deda7d75b9fece
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package commander
 
import (
    "context"
    "sync"
 
    "github.com/v2fly/v2ray-core/v5/common"
    "github.com/v2fly/v2ray-core/v5/common/net"
    "github.com/v2fly/v2ray-core/v5/common/signal/done"
    "github.com/v2fly/v2ray-core/v5/transport"
)
 
// OutboundListener is a net.Listener for listening gRPC connections.
type OutboundListener struct {
    buffer chan net.Conn
    done   *done.Instance
}
 
func (l *OutboundListener) add(conn net.Conn) {
    select {
    case l.buffer <- conn:
    case <-l.done.Wait():
        conn.Close()
    default:
        conn.Close()
    }
}
 
// Accept implements net.Listener.
func (l *OutboundListener) Accept() (net.Conn, error) {
    select {
    case <-l.done.Wait():
        return nil, newError("listen closed")
    case c := <-l.buffer:
        return c, nil
    }
}
 
// Close implement net.Listener.
func (l *OutboundListener) Close() error {
    common.Must(l.done.Close())
L:
    for {
        select {
        case c := <-l.buffer:
            c.Close()
        default:
            break L
        }
    }
    return nil
}
 
// Addr implements net.Listener.
func (l *OutboundListener) Addr() net.Addr {
    return &net.TCPAddr{
        IP:   net.IP{0, 0, 0, 0},
        Port: 0,
    }
}
 
// Outbound is a outbound.Handler that handles gRPC connections.
type Outbound struct {
    tag      string
    listener *OutboundListener
    access   sync.RWMutex
    closed   bool
}
 
// Dispatch implements outbound.Handler.
func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
    co.access.RLock()
 
    if co.closed {
        common.Interrupt(link.Reader)
        common.Interrupt(link.Writer)
        co.access.RUnlock()
        return
    }
 
    closeSignal := done.New()
    c := net.NewConnection(net.ConnectionInputMulti(link.Writer), net.ConnectionOutputMulti(link.Reader), net.ConnectionOnClose(closeSignal))
    co.listener.add(c)
    co.access.RUnlock()
    <-closeSignal.Wait()
}
 
// Tag implements outbound.Handler.
func (co *Outbound) Tag() string {
    return co.tag
}
 
// Start implements common.Runnable.
func (co *Outbound) Start() error {
    co.access.Lock()
    co.closed = false
    co.access.Unlock()
    return nil
}
 
// Close implements common.Closable.
func (co *Outbound) Close() error {
    co.access.Lock()
    defer co.access.Unlock()
 
    co.closed = true
    return co.listener.Close()
}