前面【influxdb】influxdb使用之路(一)初探已经大概介绍了infuxdbline protocol,如何写入数据、查询数据。接下来介绍作为开发人员的你我,怎样与influxdb交互了。

1.查询数据

image-20210515154416105

上次的数据已经有一段时间间隔了,需要更改我们的查询范围了,否则就查不到了。

1.1 获取token

方法一

可以通过命令行查看管理员tokenadmin's Token

influx auth list

image-20210517004855673

方法二

或者通过管理界面

image-20210517004933724

image-20210517004949102

1.2 编码

go get github.com/influxdata/influxdb-client-go/v2
package main

import (
	"context"
	"fmt"
	"time"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)

var client influxdb2.Client

// You can generate a Token from the "Tokens Tab" in the UI
const token = "9KMVnInyi1ATEsTBbKKwyyafQW1KqOIw0S7KnM2tH3eNxrhB6WNq1yN0RLg8W7dRRzIq9Jjt4UdXGOxPD1-7hQ=="
const bucket = "server"
const org = "randy"

//Store the URL of your InfluxDB instance
const url = "http://192.168.31.204:8086"

//connect http://192.168.31.204:8086
func connect() {
	// Create new client with default option for server url authenticate by token
	client = influxdb2.NewClient(url, token)
}

//close client
func close() {
	client.Close()
}

//query
func queryPoints() {
	// Get query client
	queryAPI := client.QueryAPI(org)
	// Get QueryTableResult
	result, err := queryAPI.Query(context.Background(),
		fmt.Sprintf(`from(bucket:"%v")
							|> range(start: -30d)
							|> filter(fn: (r) =>
								r._measurement == "system" and
								r._field=="temperature" and
								r.ip=="192.168.31.206")`, bucket))

	if err == nil {
		// Iterate over query response
		for result.Next() {
			// Notice when group key has changed
			if result.TableChanged() {
				fmt.Printf("table: %s\n", result.TableMetadata().String())
			}
			// fmt.Println(result)
			// Access data
			fmt.Printf("field:%v value: %v time:%s\n", result.Record().Field(), result.Record().Value(), result.Record().Time())
		}
		// Check for an error
		if result.Err() != nil {
			fmt.Printf("query parsing error: %s\n", result.Err().Error())
		}
	} else {
		panic(err)
	}
}

//main()
func main() {
	connect()
	defer close()
	// writePonits()
	queryPoints()
}
输出结果
table: col{0: name: result, datatype: string, defaultValue: _result, group: false},col{1: name: table, datatype: long, defaultValue: , group: false},col{2: name: _start, datatype: dateTime:RFC3339, defaultValue: , group: true},col{3: name: _stop, datatype: dateTime:RFC3339, defaultValue: , group: true},col{4: name: _time, datatype: dateTime:RFC3339, defaultValue: , group: false},col{5: name: _value, datatype: double, defaultValue: , group: false},col{6: 
name: _field, datatype: string, defaultValue: , group: true},col{7: name: _measurement, datatype: string, defaultValue: , group: true},col{8: name: hostName, datatype: string, defaultValue: , group: true},col{9: name: ip, datatype: string, defaultValue: , group: true}
system
temperature
0
field:temperature value: 70 time:2021-05-05 13:21:06 +0000 UTC
field:temperature value: 50 time:2021-05-05 13:30:00 +0000 UTC
field:temperature value: 40 time:2021-05-05 13:32:09 +0000 UTC

2.写入数据

2.1 编码

//insert
func writePonits() {
	// Get non-blocking write client
	writeAPI := client.WriteAPI(org, bucket)
	p := influxdb2.NewPoint("system", //
		map[string]string{
			"ip":       "192.168.31.206",
			"hostName": "nodekafka"}, //
		map[string]interface{}{
			"temperature": 50.0,
			"diskfree":    "300G",
			"disktotal":   "400G"},
		time.Now())

	// Create point using full params constructor
	p1 := influxdb2.NewPointWithMeasurement("system").
		AddTag("ip", "192.168.31.206").
		AddTag("hostName", "nodekafka").
		AddField("temperature", 38.0).
		AddField("diskfree", "300G").
		AddField("disktotal", "400G").
		SetTime(time.Now())

	//write point asynchronously
	writeAPI.WritePoint(p)
	writeAPI.WritePoint(p1)
	// Flush writes
	writeAPI.Flush()

}

插入数据一共有两种api,阻塞与非阻塞,一般情况使用非阻塞,也就是我们通常说的异步,隐式批处理,数据异步写入底层缓冲区,当写缓冲区的大小达到批处理大小(默认为5000)或刷新间隔(默认为1s)超时时,它们将自动发送到服务器。Flush()方法同步阻塞,以确保写数据的缓冲区被刷新,写入数据。更多方法就参考github文档吧

2.2 验证数据插入

influx查询

image-20210520033735169

image-20210520033755822

命令行查询

image-20210520034038137