SpringBoot整合influxdb使用BatchPoints实现批量插入数据功能

最近有个物联网项目,需要在SpringBoot里对influxdb进行整合,并且需要插入数据到influxdb中,基本的代码示例记录一下。

POM依赖

		<dependency>
			<groupId>org.</groupId>
			<artifactId>-</artifactId>
			<version>2.15</version>
		</dependency>

Application.yml配置文件

spring:
  influx:
    url: http://192.168.10.59:8086
    user:
    password:
    database: iotbigdata

InfluxDbConfig类

import com.tk.wisdombigdatamiddleware.utils.InfluxDbUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class InfluxDbConfig {
    @Value("${spring.influx.url:''}")
    private String influxDBUrl;
    @Value("${spring.influx.user:''}")
    private String userName="";

    @Value("${spring.influx.password:''}")
    private String password="";

    @Value("${spring.influx.database:''}")
    private String database;

    @Bean
    public InfluxDbUtils influxDbUtils() {
        return new InfluxDbUtils(userName, password, influxDBUrl, database, "2_years_iot");
    }

    public String getDatabase() {
        return database;
    }
}

上面代码里2_years_iot是设定的一个influxdb的存储策略,如下,这里我设置的是数据存储两年,保存一个副本

CREATE RETENTION POLICY "2_years_iot" ON "iotbigdata" DURATION 17520h REPLICATION 1 DEFAULT

查看使用的存储策略,可以使用如下的语句查询:

SHOW RETENTION POLICIES ON "iotbigdata";

InfluxDbUtils类

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Query;

@Data
@Slf4j
public class InfluxDbUtils {
    private String userName;
    private String password;
    private String url;
    public String database;
    private String retentionPolicy;
    // InfluxDB实例
    private InfluxDB influxDB;

    // 数据保存策略
    public static String policyNamePix = "2_years_iot";

    public InfluxDbUtils(String userName, String password, String url, String database,
                         String retentionPolicy) {
        this.userName = userName;
        this.password = password;
        this.url = url;
        this.database = database;
        this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;
        this.influxDB = influxDbBuild();
    }

    /**
     * 连接数据库 ,若不存在则创建
     *
     * @return influxDb实例
     */
    private InfluxDB influxDbBuild() {
        if (influxDB == null) {
            if (StringChargeUtils.isObjectEmpty(userName) && StringChargeUtils.isObjectEmpty(password)) {
                influxDB = InfluxDBFactory.connect(url);
            } else {
                influxDB = InfluxDBFactory.connect(url, userName, password);
            }

        }
        try {
            createDB(database);
            influxDB.setDatabase(database);
        } catch (Exception e) {
            log.error("create influx db failed, error: {}", e.getMessage());
        } finally {
            influxDB.setRetentionPolicy(retentionPolicy);
        }
        influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
        return influxDB;
    }

    /****
     *  创建数据库
     * @param database
     */
    private void createDB(String database) {
        influxDB.query(new Query("CREATE DATABASE " + database));
    }
}

数据封装类

这里使用的influxdb提供的@Measurement注解封装的IotElectricity类,方便的是你可以在创建变量的时候指定是否为tag等等,代码如下:

import lombok.Builder;
import lombok.Data;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;

@Builder
@Data
@Measurement(name = "iotelectricity")
public class IotElectricity {
    // Column中的name为measurement中的列名
    // 此外,需要注意InfluxDB中时间戳均是以UTC时保存,在保存以及提取过程中需要注意时区转换
    @Column(name = "time")
    private String time;
    // 注解中添加tag = true,表示当前字段内容为tag内容
    @Column(name = "type", tag = true)
    private String type;
    @Column(name = "brand", tag = true)
    private String brand;
    @Column(name = "deviceSn", tag = true)
    private String deviceSn;
    @Column(name = "param", tag = true)
    private String param;
    @Column(name = "value")
    private String value;
}

BatchPoints批量插入数据调用方法

        InfluxDbConfig influxDbConfig = new InfluxDbConfig();//new一个influxdb config
        InfluxDbUtils influxDbUtils = influxDbConfig.influxDbUtils();
        InfluxDB influxDB = influxDbUtils.getInfluxDB();//创建一个influxdb连接
        BatchPoints batchPoints = BatchPoints.builder().build();//创建批量数据存储batch
        IotElectricity iotElectricity = 
        IotElectricity.builder().brand("TM").deviceSn("890000002872").type("breaker").param("voltage").value("195").build();
        Point point = 
        Point.measurementByPOJO(iotElectricity.getClass()).addFieldsFromPOJO(iotElectricity).time(System.currentTimeMillis(), TimeUnit.MILLISECONDS).build();
        batchPoints.point(point);//将point放入batch中
        iotElectricity = 
        IotElectricity.builder().brand("TM").deviceSn("890000002872").type("breaker").param("electric").value("45").build();
        point = 
        Point.measurementByPOJO(iotElectricity.getClass()).addFieldsFromPOJO(iotElectricity).time(System.currentTimeMillis(), TimeUnit.MILLISECONDS).build();
        batchPoints.point(point);
        influxDB.write(batchPoints);//将这一批次数据一次写入influxdb

文章参考:

https://www.cnblogs.com/jason1990/archive/2019/06/24/11076310.html

https://blog.csdn.net/qq_33326449/article/details/87972168

 

点赞

发表评论

电子邮件地址不会被公开。 必填项已用*标注