diff --git a/pkg/channel/channel.go b/pkg/channel/channel.go index 4efbc1b..e7777f4 100644 --- a/pkg/channel/channel.go +++ b/pkg/channel/channel.go @@ -218,6 +218,59 @@ func (c *Channel) updateRateThresholds() { } } +func (c *Channel) HandleInbound(data []byte) error { + if len(data) < 6 { + return errors.New("channel packet too short") + } + + msgType := uint16(data[0])<<8 | uint16(data[1]) + sequence := uint16(data[2])<<8 | uint16(data[3]) + length := uint16(data[4])<<8 | uint16(data[5]) + + if len(data) < 6+int(length) { + return errors.New("channel packet incomplete") + } + + msgData := data[6 : 6+length] + + c.mutex.Lock() + defer c.mutex.Unlock() + + for _, handler := range c.messageHandlers { + if handler != nil { + msg := &GenericMessage{ + Type: msgType, + Data: msgData, + Seq: sequence, + } + if handler(msg) { + break + } + } + } + + return nil +} + +type GenericMessage struct { + Type uint16 + Data []byte + Seq uint16 +} + +func (g *GenericMessage) Pack() ([]byte, error) { + return g.Data, nil +} + +func (g *GenericMessage) Unpack(data []byte) error { + g.Data = data + return nil +} + +func (g *GenericMessage) GetType() uint16 { + return g.Type +} + func (c *Channel) Close() error { c.mutex.Lock() defer c.mutex.Unlock()