Skip to content

Latest commit

 

History

History
556 lines (507 loc) · 15.4 KB

explainer_CN.md

File metadata and controls

556 lines (507 loc) · 15.4 KB

Y3Codec Explainer

Problem and Motivation

  • 市面上在长连接下缺乏正确的 codec,Protobuf 也是针对离线数据的场景,而用户面对的是未来实时的数据处理,而这类数据的特点就是高频产生,数据结构的变化频次小。
  • 在解码阶段,其他的 codec 都是 Fully Decode,即一定要拿到一个完整的数据包,才能开始反序列化过程,这在实时数据处理环节是大忌。

Goals

  • Faster than Real-time
  • 针对长连接下的实时数据流处理,提供高效的解码。用户无需等待拿到完整的数据包之后,才能开始解码
  • 用户只需 observe key 即可拿到想要的数据。

Key use-cases

  • 对延迟敏感的应用程序。
  • 长连接下的实时数据流处理。

Proposed solutions

Y3 的场景是应对长连接下的实时数据流处理,所以用户把 raw stream 交给 Y3,然后告诉 Y3 要 observe 的 key,Y3 在接管 raw stream 后开始 parsing 的操作,发现了 key 后,开始将其对应的 value 以用户指定的数据类型做反序列化,再将其作为参数,调用用户指定的回调函数(event-driven method)。

Y3 通过把对象数据描述成一组 TLV 结构,在数据包解码时,可以在解码过程中更早的了解到当前的 T 是否为所受监听的 key,从而判断是否直接跳到下一组TLV结构,而并不需要对非受监听的数据包进行多余的解码操作,从而提升了解码的效率和资源利用率。

主要接口包括:

  • Marshal 按照 Y3 的编码规则序列化用户的数据。
  • Subscribe 监听用户想 observe 的 key
  • OnObserve Y3 发现了key后,调用用户指定的回调函数进行解码。

Examples

1.数据源为一批类拟 JSON 的层级数据(其中包含了关心和不关心的数据),需要把这些数据转换成 Y3 编码通过流式传输给接收方,比如 yomo-flow。接收方监听关心的数据并进行业务处理。

编码数据

type SourceData struct {
   Name  string      `y3:"0x10"`
   Noise float32     `y3:"0x11"`
   Therm Thermometer `y3:"0x12"`
}

type Thermometer struct {
   Temperature float32 `y3:"0x13"`
   Humidity    float32 `y3:"0x14"`
}

func main() {
	input := SourceData{
		Name:  "yomo",
		Noise: float32(456),
		Therm: Thermometer{Temperature: float32(30), Humidity: float32(40)},
	}
  // 把对象编码为符合Y3-Codec格式的数据
	codec := y3.NewCodec(0x20)
	inputBuf, _ := codec.Marshal(input)
	fmt.Printf("inputBuf=%#v\n", inputBuf)
}

解码并监听一个值

func main() {
	// 定义回调函数用于处理被监听的数据
	callback := func(v []byte) (interface{}, error) {
		return y3.ToFloat32(v)
	}
	// 创建Observable接口
	source := y3.FromStream(bytes.NewReader(inputBuf))
	// 订阅被监听的Key,并设置回调函数
	consumer := source.Subscribe(0x11).OnObserve(callback)
	// 检查被处理后的数据
	for c := range consumer {
		fmt.Printf("observed value=%v, type=%v\n", c, reflect.ValueOf(c).Kind())
	}
}

Types

Y3提供High-Level的封装,用于支持像YoMo这样的框架。

统一的编码方法: y3.NewCodec(observe byte).Marshal(input interface{})

类型 解码方法
struct y3.ToObject
struct slice y3.ToObject
int32 y3.ToInt32
int32 slice y3.ToInt32Slice
uint32 y3.ToUInt32
uint32 slice y3.ToUInt32Slice
int64 y3.ToInt64
int64 slice y3.ToInt64Slice
uint64 y3.ToUInt64
uint64 slice y3.ToUInt64Slice
float32 y3.ToFloat32
float32 slice y3.ToFloat32Slice
float64 y3.ToFloat64
float64 slice y3.ToFloat64Slice
bool y3.ToBool
bool slice y3.ToBoolSlice
string y3.ToUTF8String
string slice y3.ToUTF8StringSlice
[]byte y3.ToBytes
struct
func main() {
  // Simulate source to generate and send data
  data := NoiseData{Noise: 40, Time: time.Now().UnixNano() / 1e6, From: "127.0.0.1"}
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
      var obj NoiseData
      err := y3.ToObject(v, &obj)
      if err != nil {
          return nil, err
      }
      fmt.Printf("encoded data: %v\n", obj)
      return obj, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
}
type NoiseData struct {
    Noise float32 `y3:"0x11"`
    Time  int64   `y3:"0x12"`
    From  string  `y3:"0x13"`
}
struct slice
  func main() {
    // Simulate source to generate and send data
    data := []NoiseData{
        {Noise: 40, Time: time.Now().UnixNano() / 1e6, From: "127.0.0.1"},
        {Noise: 50, Time: time.Now().UnixNano() / 1e6, From: "127.0.0.1"},
    }
    sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
    source := y3.FromStream(bytes.NewReader(sendingBuf))
    // Simulate flow listening and decoding data
    var decode = func(v []byte) (interface{}, error) {
        var sl []NoiseData
        err := y3.ToObject(v, &sl)
        if err != nil {
            return nil, err
        }
        fmt.Printf("encoded data: %v\n", sl)
        return sl, nil
    }
    consumer := source.Subscribe(0x10).OnObserve(decode)
    for range consumer {
    }
  }
  type NoiseData struct {
      Noise float32 `y3:"0x11"`
      Time  int64   `y3:"0x12"`
      From  string  `y3:"0x13"`
  }
int32
  // Simulate source to generate and send data
  var data int32 = 123
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToInt32(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
int32 slice
  // Simulate source to generate and send data
  data := []int32{123, 456}
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
      sl, err := y3.ToInt32Slice(v)
      if err != nil {
          return nil, err
      }
      fmt.Printf("encoded data: %v\n", sl)
      return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
uint32
  // Simulate source to generate and send data
  var data uint32 = 123
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToUInt32(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
uint32 slice
// Simulate source to generate and send data
data := []uint32{123, 456}
sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
source := y3.FromStream(bytes.NewReader(sendingBuf))
// Simulate flow listening and decoding data
var decode = func(v []byte) (interface{}, error) {
    sl, err := y3.ToUInt32Slice(v)
    if err != nil {
        return nil, err
    }
    fmt.Printf("encoded data: %v\n", sl)
    return sl, nil
}
consumer := source.Subscribe(0x10).OnObserve(decode)
for range consumer {
}
int64
  // Simulate source to generate and send data
  var data int64 = 123
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToInt64(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
int64 slice
// Simulate source to generate and send data
data := []int64{123, 456}
sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
source := y3.FromStream(bytes.NewReader(sendingBuf))
// Simulate flow listening and decoding data
var decode = func(v []byte) (interface{}, error) {
    sl, err := y3.ToInt64Slice(v)
    if err != nil {
        return nil, err
    }
    fmt.Printf("encoded data: %v\n", sl)
    return sl, nil
}
consumer := source.Subscribe(0x10).OnObserve(decode)
for range consumer {
}
uint64
  // Simulate source to generate and send data
  var data uint64 = 123
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToUInt64(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
uint64 slice
  // Simulate source to generate and send data
  data := []uint64{123, 456}
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToUInt64Slice(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
float32
  // Simulate source to generate and send data
  var data float32 = 1.23
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToFloat32(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
float32 slice
// Simulate source to generate and send data
  data := []float32{1.23, 4.56}
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToFloat32Slice(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
float64
  // Simulate source to generate and send data
  var data float64 = 1.23
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToFloat64(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
float64 slice
  // Simulate source to generate and send data
  data := []float64{1.23, 4.56}
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToFloat64Slice(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
bool
  // Simulate source to generate and send data
  data := true
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToBool(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
bool slice
  // Simulate source to generate and send data
  data := []bool{true, false}
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToBoolSlice(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
string
  // Simulate source to generate and send data
  data := "abc"
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToUTF8String(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
string slice
  // Simulate source to generate and send data
  data := []string{"a", "b"}
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToUTF8StringSlice(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }
[]byte
  // Simulate source to generate and send data
  data := []byte{0x20, 0x21, 0x22}
  sendingBuf, _ := y3.NewCodec(0x10).Marshal(data)
  source := y3.FromStream(bytes.NewReader(sendingBuf))
  // Simulate flow listening and decoding data
  var decode = func(v []byte) (interface{}, error) {
  	sl, err := y3.ToBytes(v)
  	if err != nil {
  		return nil, err
  	}
  	fmt.Printf("encoded data: %#v\n", sl)
  	return sl, nil
  }
  consumer := source.Subscribe(0x10).OnObserve(decode)
  for range consumer {
  }

更多例子请看: /examples/

Attention

被监听的key有规定的使用范围:

  • 用户自定义的被监听key范围:0x10 ~ 0x3f
  • 系统保留: 0x01 ~ 0x0f