将C Sharp中的IEnumerable<T> 序列化为Avro格式的最佳方法。

我正在开发一个C Sharp sdk,它接收IEnumerable数据,然后将其发送到restful API.restful API将这些记录推送到Kafka。

我已经有了schemaString(this.schemaString),下面是我对SDK序列化部分的实现。

        public string ValidateAvroSchema<T>(IEnumerable<T> value) {
            using(var ms = new MemoryStream()){
                try{
                    Avro.IO.Encoder e = new BinaryEncoder(ms);
                    var schema = Schema.Parse(this.schemaString) as RecordSchema;
                    var writer = new GenericDatumWriter<GenericRecord>(schema);

                    foreach(T item in value) {
                        GenericRecord record = new GenericRecord(schema);
                        FieldInfo[] fieldsInfo;
                        Type typeParameterType = typeof(T);
                        var type = item.GetType();
                        fieldsInfo = typeParameterType.GetFields();
                        for (int i = 0; i < fieldsInfo.Length; i++)
                        {
                            record.Add(fieldsInfo[i].Name, GetFieldValue(item, fieldsInfo[i].Name));
                        }
                        writer.Write(record, e);
                    }

                    // I am passing this string to Restful API so the Java side can parse it
                    return Convert.ToBase64String(ms.ToArray());;
                } catch (AvroException e) {
                    // handle exception
                }
            }
        }

在API上,我做了一些类似于:

    byte[] input = Base64.decodeBase64(payloadInJson.toString());
    List<GenericRecord> listOfRecords = new ArrayList<>();
    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
    InputStream inputStream = new ByteArrayInputStream(input);
    BinaryDecoder decoder = new DecoderFactory().get().binaryDecoder(inputStream, null);
    while(true){
        try {
            GenericRecord record = reader.read(null, decoder);
            listOfRecords.add(record);
        } catch (EOFException eof) {
            break;
        }
    }

现在可以用了。谢谢你们。

只剩下一个问题。

问题1 :使用反射获取所有属性,然后将其添加到GenericRecord中,是否合适?

谢谢你们了。

解决方案:

在我看来,最方便的方式是使用。

        public string ValidateAvroSchema<T>(IEnumerable<T> value) {

          byte[] result = AvroConvert.SerializeHeadless(value.ToList(), this.schemaString);
          return Convert.ToBase64String(result);;
        }  

只要记住,在这种情况下,schema是一个T的数组模式。 要生成它,你可以使用。

AvroConvert.GenerateSchema(typeof(List<T>));

而在API方面

var deserialized = AvroConvert.DeserializeHeadless<List<T>>(result, schema);

从… https:/github.comAdrianStrugalaAvroConvert。

给TA打赏
共{{data.count}}人
人已打赏
未分类

如何使Elastic Search API (Query)具有字段计数和子字段的功能。

2022-9-8 18:12:07

未分类

如何根据一定的条件删除列?

2022-9-8 18:12:09

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索