Hunter0x7c7
2022-08-11 a82f9cb69f63aaeba40c024960deda7d75b9fece
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package udp
 
import (
    "context"
 
    "github.com/v2fly/v2ray-core/v5/common/buf"
    "github.com/v2fly/v2ray-core/v5/common/net"
    "github.com/v2fly/v2ray-core/v5/common/protocol/udp"
    "github.com/v2fly/v2ray-core/v5/transport/internet"
)
 
type HubOption func(h *Hub)
 
func HubCapacity(capacity int) HubOption {
    return func(h *Hub) {
        h.capacity = capacity
    }
}
 
func HubReceiveOriginalDestination(r bool) HubOption {
    return func(h *Hub) {
        h.recvOrigDest = r
    }
}
 
type Hub struct {
    conn         *net.UDPConn
    cache        chan *udp.Packet
    capacity     int
    recvOrigDest bool
}
 
func ListenUDP(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, options ...HubOption) (*Hub, error) {
    hub := &Hub{
        capacity:     256,
        recvOrigDest: false,
    }
    for _, opt := range options {
        opt(hub)
    }
 
    var sockopt *internet.SocketConfig
    if streamSettings != nil {
        sockopt = streamSettings.SocketSettings
    }
    if sockopt != nil && sockopt.ReceiveOriginalDestAddress {
        hub.recvOrigDest = true
    }
 
    udpConn, err := internet.ListenSystemPacket(ctx, &net.UDPAddr{
        IP:   address.IP(),
        Port: int(port),
    }, sockopt)
    if err != nil {
        return nil, err
    }
    newError("listening UDP on ", address, ":", port).WriteToLog()
    hub.conn = udpConn.(*net.UDPConn)
    hub.cache = make(chan *udp.Packet, hub.capacity)
 
    go hub.start()
    return hub, nil
}
 
// Close implements net.Listener.
func (h *Hub) Close() error {
    h.conn.Close()
    return nil
}
 
func (h *Hub) WriteTo(payload []byte, dest net.Destination) (int, error) {
    return h.conn.WriteToUDP(payload, &net.UDPAddr{
        IP:   dest.Address.IP(),
        Port: int(dest.Port),
    })
}
 
func (h *Hub) start() {
    c := h.cache
    defer close(c)
 
    oobBytes := make([]byte, 256)
 
    for {
        buffer := buf.New()
        var noob int
        var addr *net.UDPAddr
        rawBytes := buffer.Extend(buf.Size)
 
        n, noob, _, addr, err := ReadUDPMsg(h.conn, rawBytes, oobBytes)
        if err != nil {
            newError("failed to read UDP msg").Base(err).WriteToLog()
            buffer.Release()
            break
        }
        buffer.Resize(0, int32(n))
 
        if buffer.IsEmpty() {
            buffer.Release()
            continue
        }
 
        payload := &udp.Packet{
            Payload: buffer,
            Source:  net.UDPDestination(net.IPAddress(addr.IP), net.Port(addr.Port)),
        }
        if h.recvOrigDest && noob > 0 {
            payload.Target = RetrieveOriginalDest(oobBytes[:noob])
            if payload.Target.IsValid() {
                newError("UDP original destination: ", payload.Target).AtDebug().WriteToLog()
            } else {
                newError("failed to read UDP original destination").WriteToLog()
            }
        }
 
        select {
        case c <- payload:
        default:
            buffer.Release()
            payload.Payload = nil
        }
    }
}
 
// Addr implements net.Listener.
func (h *Hub) Addr() net.Addr {
    return h.conn.LocalAddr()
}
 
func (h *Hub) Receive() <-chan *udp.Packet {
    return h.cache
}