Apache IoTDB开发系统整合之MapReduce TsFile
  RWAsyc7aInHM 2023年11月02日 36 0

TsFile-Hadoop-Connector User Guide

关于 TsFile-Hadoop-Connector

TsFile-Hadoop-Connector 实现了 Hadoop 对 Tsfile 类型的外部数据源的支持。这使用户能够通过Hadoop读取,写入和查询Tsfile。

使用此连接器,咱们就可以:

  • 将单个 TsFile 从本地文件系统或 hdfs 加载到 Hadoop 中
  • 将特定目录中的所有文件(从本地文件系统或HDFS加载到Hadoop中)
  • 将数据从 Hadoop 写入 TsFile

系统要求

Hadoop Version

Java Version

TsFile Version

2.7.3

1.8

0.10.0

数据类型对应

TsFile data type

Hadoop writable

BOOLEAN

BooleanWritable

INT32

IntWritable

INT64

LongWritable

FLOAT

FloatWritable

DOUBLE

DoubleWritable

TEXT

Text

TSFInput格式说明

TSFInputFormat 从 tsfile 中提取数据,并将其格式化为 .MapWritable

假设我们要提取名为该设备的数据,该设备具有三个名为 、、 的传感器。d1s1s2s3

s1的类型为 ,的类型为 ,的类型为 。BOOLEANs2DOUBLEs3TEXT

结构将如下所示:MapWritable

  1. {
  2. "time_stamp": 10000000,
  3. "device_id": d1,
  4. "s1": true,
  5. "s2": 3.14,
  6. "s3": "middle"
  7. }

在 Hadoop 的 Map 作业中,你可以按键获取任何你想要的值,如下所示:

mapwritable.get(new Text("s1"))

注意:中的所有键的类型均为 。MapWritableText

阅读示例:计算总和

首先,我们应该告诉 InputFormat 我们想要从 tsfile 获得什么样的数据。

  1. // configure reading time enable
  2. TSFInputFormat.setReadTime(job, true);
  3. // configure reading deviceId enable
  4. TSFInputFormat.setReadDeviceId(job, true);
  5. // configure reading which deltaObjectIds
  6. String[] deviceIds = {"device_1"};
  7. TSFInputFormat.setReadDeviceIds(job, deltaObjectIds);
  8. // configure reading which measurementIds
  9. String[] measurementIds = {"sensor_1", "sensor_2", "sensor_3"};
  10. TSFInputFormat.setReadMeasurementIds(job, measurementIds);

然后,应指定映射器和化简器的输出键和值

  1. // set inputformat and outputformat
  2. job.setInputFormatClass(TSFInputFormat.class);
  3. // set mapper output key and value
  4. job.setMapOutputKeyClass(Text.class);
  5. job.setMapOutputValueClass(DoubleWritable.class);
  6. // set reducer output key and value
  7. job.setOutputKeyClass(Text.class);
  8. job.setOutputValueClass(DoubleWritable.class);

然后,和类是你如何处理类产生的。mapperreducerMapWritableTSFInputFormat

  1. public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, DoubleWritable> {

  2. @Override
  3. protected void map(NullWritable key, MapWritable value,
  4. Mapper<NullWritable, MapWritable, Text, DoubleWritable>.Context context)
  5. throws IOException, InterruptedException {

  6. Text deltaObjectId = (Text) value.get(new Text("device_id"));
  7. context.write(deltaObjectId, (DoubleWritable) value.get(new Text("sensor_3")));
  8. }
  9. }

  10. public static class TSReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

  11. @Override
  12. protected void reduce(Text key, Iterable<DoubleWritable> values,
  13. Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
  14. throws IOException, InterruptedException {

  15. double sum = 0;
  16. for (DoubleWritable value : values) {
  17. sum = sum + value.get();
  18. }
  19. context.write(key, new DoubleWritable(sum));
  20. }
  21. }

注意:有关完整的代码,请参阅以下链接:https://github.com/apache/incubator-iotdb/blob/master/example/hadoop/src/main/java/org/apache/iotdb//hadoop/tsfile/TSFMRReadExample.java

写入示例:将平均值写入 Tsfile

除了 ,Hadoop-map-reduce 作业的其余配置代码几乎与上面相同。OutputFormatClass

  1. job.setOutputFormatClass(TSFOutputFormat.class);
  2. // set reducer output key and value
  3. job.setOutputKeyClass(NullWritable.class);
  4. job.setOutputValueClass(HDFSTSRecord.class);

然后,和类是你如何处理类产生的。mapperreducerMapWritableTSFInputFormat

  1. public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, MapWritable> {
  2. @Override
  3. protected void map(NullWritable key, MapWritable value,
  4. Mapper<NullWritable, MapWritable, Text, MapWritable>.Context context)
  5. throws IOException, InterruptedException {

  6. Text deltaObjectId = (Text) value.get(new Text("device_id"));
  7. long timestamp = ((LongWritable)value.get(new Text("timestamp"))).get();
  8. if (timestamp % 100000 == 0) {
  9. context.write(deltaObjectId, new MapWritable(value));
  10. }
  11. }
  12. }

  13. /**
  14. * This reducer calculate the average value.
  15. */
  16. public static class TSReducer extends Reducer<Text, MapWritable, NullWritable, HDFSTSRecord> {

  17. @Override
  18. protected void reduce(Text key, Iterable<MapWritable> values,
  19. Reducer<Text, MapWritable, NullWritable, HDFSTSRecord>.Context context) throws IOException, InterruptedException {
  20. long sensor1_value_sum = 0;
  21. long sensor2_value_sum = 0;
  22. double sensor3_value_sum = 0;
  23. long num = 0;
  24. for (MapWritable value : values) {
  25. num++;
  26. sensor1_value_sum += ((LongWritable)value.get(new Text("sensor_1"))).get();
  27. sensor2_value_sum += ((LongWritable)value.get(new Text("sensor_2"))).get();
  28. sensor3_value_sum += ((DoubleWritable)value.get(new Text("sensor_3"))).get();
  29. }
  30. HDFSTSRecord tsRecord = new HDFSTSRecord(1L, key.toString());
  31. DataPoint dPoint1 = new LongDataPoint("sensor_1", sensor1_value_sum / num);
  32. DataPoint dPoint2 = new LongDataPoint("sensor_2", sensor2_value_sum / num);
  33. DataPoint dPoint3 = new DoubleDataPoint("sensor_3", sensor3_value_sum / num);
  34. tsRecord.addTuple(dPoint1);
  35. tsRecord.addTuple(dPoint2);
  36. tsRecord.addTuple(dPoint3);
  37. context.write(NullWritable.get(), tsRecord);
  38. }
  39. }

注意:有关完整的代码,请参阅以下链接:https://github.com/apache/incubator-iotdb/blob/master/example/hadoop/src/main/java/org/apache/iotdb//hadoop/tsfile/TSMRWriteExample.java



【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  6554CEvzIu1g   2023年11月12日   35   0   0 SystemText数据
  iyuah6QlwXb6   2023年11月12日   31   0   0 Systemjava
  c587woZguOp7   2023年11月12日   23   0   0 java
  c587woZguOp7   2023年11月12日   20   0   0 java
  4ozAyWrX6Sw9   2023年11月12日   30   0   0 javajar
RWAsyc7aInHM