Apache IoTDB开发系统之TsFile API
  RWAsyc7aInHM 2023年11月02日 37 0

Ts文件库安装

有两种方法可以在自己的项目中使用 TsFile。

  • Using as jars:
  • 编译源代码并构建到 jar
git clone https://github.com/apache/incubator-iotdb.git
cd tsfile/
mvn clean package -Dmaven.test.skip=true

然后,所有 jar 都可以在名为 的文件夹中获取。导入到您的项目。target/target/tsfile-0.10.0-jar-with-dependencies.jar

  • Using as a maven dependency:

编译源代码并通过三个步骤部署到本地存储库:

  • 获取源代码
git clone https://github.com/apache/incubator-iotdb.git
  • 编译源代码并部署
cd tsfile/
mvn clean install -Dmaven.test.skip=true
  • 将依赖项添加到项目中:
<dependency>
   <groupId>org.apache.iotdb</groupId>
   <artifactId>tsfile</artifactId>
   <version>0.10.0</version>
 </dependency>
Or, you can download the dependencies from official Maven repository:

-   First, find your maven `settings.xml` on path: `${username}\.m2\settings.xml` , add this `<profile>` to `<profiles>`:

    ```
      <profile>
           <id>allow-snapshots</id>
              <activation><activeByDefault>true</activeByDefault></activation>
           <repositories>
             <repository>  
                <id>apache.snapshots</id>
                <name>Apache Development Snapshot Repository</name>
                <url>https://repository.apache.org/content/repositories/snapshots/</url>
                <releases>
                    <enabled>false</enabled>
                </releases>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
              </repository>
           </repositories>
         </profile>
    ```

-   Then add dependencies into your project:

    ```
     <dependency>
       <groupId>org.apache.iotdb</groupId>
       <artifactId>tsfile</artifactId>
       <version>0.10.0</version>
     </dependency>
    ```

TSFile 用法

本节演示 TsFile 的详细用法。

时间序列数据

时间序列被视为四元组序列。四重定义为(设备、测量、时间、值)。

  • 测量:时间序列正在采取的物理或形式测量,例如,城市的温度、某些商品的销售数量或不同时间的火车速度。由于传统的传感器(如温度计)也进行单次测量并产生时间序列,因此我们将在下面互换使用测量和传感器。
  • 设备:设备是指进行多次测量(产生多个时间序列)的实体,例如,正在运行的火车监控其速度、油表、运行里程、当前乘客每个被传送到一个时间序列。

表 1 说明了一组时间序列数据。下表中显示的集包含一个名为“device_1”的设备,其中包含名为“sensor_1”、“sensor_2”和“sensor_3”的三个测量值。

device_1

sensor_1

sensor_2

sensor_3

时间

价值

时间

价值

时间

价值

1

1.2

1

20

2

50

3

1.4

2

20

4

51

5

1.1

3

21

6

52

7

1.8

4

20

8

53

一组时间序列数据

一行数据:在许多工业应用中,一个设备通常包含多个传感器,这些传感器的值可能位于同一时间戳,这称为一行数据

形式上,一行数据由 、 一个表示自 1 年 1970 月 00 日 00:00:<> 以来的毫秒的时间戳和由 和 对应的几个数据对组成。一行中的所有数据对都属于此行,并且具有相同的时间戳。如果其中一个在 中没有 ,请改用空格(实际上,TsFile 不存储空值)。其格式如下所示:device_idmeasurement_idvaluedevice_idmeasurementsvaluetimestamp

device_id, timestamp, <measurement_id, value>...

示例如下所示。在此示例中,两个度量的数据类型分别为 。INT32FLOAT

device_1, 1490860659000, m1, 10, m2, 12.12

正在写入 Ts文件

生成 tsfile 文件。

可以通过以下三个步骤生成 TsFile,完整的代码将在“编写 TsFile 的示例”一节中给出。

  • 首先,构造一个实例。TsFileWriter以下是可用的构造函数:
  • 没有预定义的架构
public TsFileWriter(File file) throws IOException
  • 使用预定义的架构
public TsFileWriter(File file, Schema schema) throws IOException

这个用于使用 HDFS 文件系统。 可以是类 的实例。TsFileOutputHDFSOutput

public TsFileWriter(TsFileOutput output, Schema schema) throws IOException

如果你想自己设置一些 TSFile 配置,你可以使用 param 。例如:config

TSFileConfig conf = new TSFileConfig();
conf.setTSFileStorageFs("HDFS");
TsFileWriter tsFileWriter = new TsFileWriter(file, schema, conf);

在此示例中,数据文件将存储在 HDFS 中,而不是本地文件系统中。如果要将数据文件存储在本地文件系统中,可以使用 ,这也是默认配置。conf.setTSFileStorageFs("LOCAL")

您还可以通过 和 配置 HDFS 的 IP 和端口。默认 ip 为 ,默认端口为 。config.setHdfsIp(...)config.setHdfsPort(...)localhost9000

参数:

  • 文件 : 要写入的 Ts文件
  • 架构 :文件架构将在下一部分介绍。
  • config : TsFile 的配置。
  • 第二,添加测量值
    或者你可以先创建一个类的实例,然后将其传递给类的构造函数SchemaTsFileWriter该类包含一个映射,其键是一个度量架构的名称,值是架构本身。Schema
    以下是接口:
// Create an empty Schema or from an existing map
public Schema()
public Schema(Map<String, MeasurementSchema> measurements)
// Use this two interfaces to add measurements
public void registerMeasurement(MeasurementSchema descriptor)
public void registerMeasurements(Map<String, MeasurementSchema> measurements)
// Some useful getter and checker
public TSDataType getMeasurementDataType(String measurementId)
public MeasurementSchema getMeasurementSchema(String measurementId)
public Map<String, MeasurementSchema> getAllMeasurementSchema()
public boolean hasMeasurement(String measurementId)

您始终可以在类中使用以下接口来添加其他测量:TsFileWriter

public void addMeasurement(MeasurementSchema measurementSchema) throws WriteProcessException

该类包含一个度量的信息,有几个构造函数:MeasurementSchema

public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding)
public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding, CompressionType compressionType)
public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding, CompressionType compressionType, 
Map<String, String> props)

参数:

  • 测量 ID:此测量的名称,通常是传感器的名称。
  • 类型:数据类型,现在支持六种类型:BOOLEANINT32INT64FLOATDOUBLETEXT;
  • 编码:数据编码。看第2-3章.
  • 压缩:数据压缩。现在支持和 .UNCOMPRESSEDSNAPPY
  • props:特殊数据类型的属性。如 for 和 , for 。用作映射中的字符串对,例如 (“max_point_number”、“3”)。max_point_numberFLOATDOUBLEmax_string_lengthTEXT
> **Notice:** Although one measurement name can be used in multiple deltaObjects, the properties cannot be changed. I.e. it's not allowed to add one measurement name for multiple times with different type or encoding. Here is a bad example:

```
  // The measurement "sensor_1" is float type
  addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));

  // This call will throw a WriteProcessException exception
  addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT32, TSEncoding.RLE));
```
  • Third, insert and write data continually.
    Use this interface to create a new (a timestamp and device pair).TSRecord
public TSRecord(long timestamp, String deviceId)

Then create a (a measurement and value pair), and use the addTuple method to add the DataPoint to the correct TsRecord.DataPoint

Use this method to write

public void write(TSRecord record) throws IOException, WriteProcessException
  • Finally, call to finish this writing process.close
public void close() throws IOException

我们还能够将数据写入封闭的 TsFile。

  • 用于打开已关闭的文件。ForceAppendTsFileWriter
public ForceAppendTsFileWriter(File file) throws IOException
  • 调用截断元数据的一部分doTruncate
  • 然后用于构造一个新的ForceAppendTsFileWriterTsFileWriter
public TsFileWriter(TsFileIOWriter fileWriter) throws IOException

请注意,在将新数据写入 TsFile 之前,我们应该重做添加测量值的步骤。

写入 TsFile 的示例

您应该将 TsFile 安装到本地 maven 存储库中。

mvn clean install -pl tsfile -am -DskipTests

如果您有未对齐(例如,并非所有传感器都包含值)时间序列数据,则可以通过构造 TSRecord 来编写 TsFile。

更全面的例子可以在/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTSRecord.java

如果您有对齐的时间序列数据,则可以通过构造平板电脑来编写 TsFile。

更全面的例子可以在/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java

您可以使用 ForceAppendTsFileWriter 将数据写入关闭的 TsFile。

更全面的例子可以在/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileForceAppendWrite.java

用于读取 TsFile 的接口

开始之前

此处使用“时间序列数据”部分中的时间序列数据集进行本节的具体介绍。下表中显示的集合包含一个名为“device_1”的 deltaObject,其中包含名为“sensor_1”、“sensor_2”和“sensor_3”的三个测量值。测量已简化为一个简单的说明,每个图仅包含 4 个时间值对。

device_1

sensor_1

sensor_2

sensor_3

时间

价值

时间

价值

时间

价值

1

1.2

1

20

2

50

3

1.4

2

20

4

51

5

1.1

3

21

6

52

7

1.8

4

20

8

53

一组时间序列数据

路径的定义

路径是一个点分隔的字符串,它唯一标识 TsFile 中的时间序列,例如“root.area_1.device_1.sensor_1”。最后一部分“sensor_1”称为“measurementId”,而其余部分“root.area_1.device_1”称为deviceId。如上所述,不同设备中的相同测量具有相同的数据类型和编码,并且设备也是唯一的。

在读取接口中,该参数指示要选择的测量值。paths

路径实例可以通过类轻松构造。例如:Path

Path p = new Path("device_1.sensor_1");

我们将为最终查询调用传递路径的 ArrayList 以支持多个路径。

List<Path> paths = new ArrayList<Path>();
paths.add(new Path("device_1.sensor_1"));
paths.add(new Path("device_1.sensor_3"));

Notice: When constructing a Path, the format of the parameter should be a dot-separated string, the last part will be recognized as measurementId while the remaining parts will be recognized as deviceId.

Definition of Filter

使用场景

过滤器在 TsFile 读取过程中用于选择满足一个或多个给定条件的数据。

IExpression

这是一个过滤器表达式接口,它将传递给我们的最终查询调用。我们创建一个或多个过滤器表达式,并可能使用二进制过滤器运算符将它们链接到我们的最终表达式。IExpression

  • 创建筛选表达式
    有两种类型的过滤器。
  • 时间筛选器:用于时间序列数据的筛选器。time
IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter);

使用以下关系获取对象(值是长整型变量)。TimeFilter

关系

描述

TimeFilter.eq(value)

选择等于值的时间

TimeFilter.lt(值)

选择小于值的时间

TimeFilter.gt(值)

选择大于值的时间

TimeFilter.ltEq(value)

选择小于或等于值的时间

TimeFilter.gtEq(value)

选择大于或等于该值的时间

TimeFilter.notEq(value)

选择不等于值的时间

TimeFilter.not(TimeFilter)

选择不满足另一个时间过滤器的时间

  • 值筛选器:用于时序数据的筛选器。value
IExpression valueFilterExpr = new SingleSeriesExpression(Path, ValueFilter);

的用法与使用 相同,只是为了确保值的类型等于测量值的类型(在路径中定义)。ValueFilterTimeFilter

  • 二进制筛选器运算符
    二进制筛选器运算符可用于链接两个单个表达式。
  • BinaryExpression.and(Expression, Expression):为两个表达式选择满足值。
  • BinaryExpression.or(Expression, Expression):选择至少一个表达式的值 ati。
筛选器表达式示例
  • 时间过滤器表达式示例
IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.eq(15)); // series time = 15
IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.ltEq(15)); // series time <= 15
IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.lt(15)); // series time < 15
IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.gtEq(15)); // series time >= 15
IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.notEq(15)); // series time != 15
IExpression timeFilterExpr = BinaryExpression.and(new GlobalTimeExpression(TimeFilter.gtEq(15L)),
                                         new GlobalTimeExpression(TimeFilter.lt(25L))); // 15 <= series time < 25
IExpression timeFilterExpr = BinaryExpression.or(new GlobalTimeExpression(TimeFilter.gtEq(15L)),
                                         new GlobalTimeExpression(TimeFilter.lt(25L))); // series time >= 15 or series time < 25

读取接口

首先,我们打开 TsFile 并从 文件路径字符串 .ReadOnlyTsFilepath

TsFileSequenceReader reader = new TsFileSequenceReader(path);

ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);

接下来,我们准备路径数组和查询表达式,然后通过此接口获取最终对象:QueryExpression

QueryExpression queryExpression = QueryExpression.create(paths, statement);

类有两种方法来执行查询。query

  • 方法 1
public QueryDataSet query(QueryExpression queryExpression) throws IOException
  • 方法 2
public QueryDataSet query(QueryExpression queryExpression, long partitionStartOffset, long partitionEndOffset) throws IOException

此方法专为高级应用程序(如 TsFile-Spark 连接器)而设计。

  • params :对于方法 2,添加了两个附加参数以支持部分查询:
  • partitionStartOffset:TsFile 的起始偏移量
  • partitionEndOffset:TsFile 的结束偏移量

什么是部分查询?

在一些分布式文件系统(例如HDFS)中,文件被分成几个部分,称为“块”并存储在不同的节点中。在所涉及的每个节点中并行执行查询可以提高效率。因此需要部分查询。Paritial Query 仅选择存储在由 TsFile 分割的零件中的结果。QueryConstant.PARTITION_START_OFFSETQueryConstant.PARTITION_END_OFFSET

查询数据集接口

上面执行的查询将返回一个对象。QueryDataset

这是对用户有用的界面。

  • bool hasNext();如果此数据集仍有元素,则返回 true。
  • List<Path> getPaths()获取此数据集中的路径。
  • List<TSDataType> getDataTypes();获取数据类型。类 TSDataType 是一个枚举类,其值将是以下值之一:
BOOLEAN,
 INT32,
 INT64,
 FLOAT,
 DOUBLE,
 TEXT;
  • RowRecord next() throws IOException;获取下一条记录。
    该类由时间戳和不同传感器中的数据组成,我们可以使用两种 getter 方法来获取它们。RowRecordlongList<Field>
long getTimestamp();
List<Field> getFields();

要从一个字段获取数据,请使用以下方法:

TSDataType getDataType();
Object getObjectValue();

读取现有 TsFile 的示例

您应该将 TsFile 安装到本地 maven 存储库中。

有关查询语句的更全面示例,请参见/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileRead.java

package org.apache.iotdb.tsfile;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;

/**
 * The class is to show how to read TsFile file named "test.tsfile".
 * The TsFile file "test.tsfile" is generated from class TsFileWrite.
 * Run TsFileWrite to generate the test.tsfile first
 */
public class TsFileRead {
  private static void queryAndPrint(ArrayList<Path> paths, ReadOnlyTsFile readTsFile, IExpression statement)
          throws IOException {
    QueryExpression queryExpression = QueryExpression.create(paths, statement);
    QueryDataSet queryDataSet = readTsFile.query(queryExpression);
    while (queryDataSet.hasNext()) {
      System.out.println(queryDataSet.next());
    }
    System.out.println("------------");
  }

  public static void main(String[] args) throws IOException {

    // file path
    String path = "test.tsfile";

    // create reader and get the readTsFile interface
    TsFileSequenceReader reader = new TsFileSequenceReader(path);
    ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
    // use these paths(all sensors) for all the queries
    ArrayList<Path> paths = new ArrayList<>();
    paths.add(new Path("device_1.sensor_1"));
    paths.add(new Path("device_1.sensor_2"));
    paths.add(new Path("device_1.sensor_3"));

    // no query statement
    queryAndPrint(paths, readTsFile, null);

    //close the reader when you left
    reader.close();
  }
}

更改 Ts文件配置

TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
config.setXXX();
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
RWAsyc7aInHM