【Golang】influxdb使用之路(二)Go语言操作influxdb
前面【influxdb】influxdb使用之路(一)初探已经大概介绍了infuxdb,line protocol,如何写入数据、查询数据。接下来介绍作为开发人员的你我,怎样与influxdb交互了。
1.查询数据

上次的数据已经有一段时间间隔了,需要更改我们的查询范围了,否则就查不到了。
1.1 获取token
方法一
可以通过命令行查看管理员token,admin's Token
influx auth list

方法二
或者通过管理界面


1.2 编码
go get github.com/influxdata/influxdb-client-go/v2
package main
import (
	"context"
	"fmt"
	"time"
	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)
var client influxdb2.Client
// You can generate a Token from the "Tokens Tab" in the UI
const token = "9KMVnInyi1ATEsTBbKKwyyafQW1KqOIw0S7KnM2tH3eNxrhB6WNq1yN0RLg8W7dRRzIq9Jjt4UdXGOxPD1-7hQ=="
const bucket = "server"
const org = "randy"
//Store the URL of your InfluxDB instance
const url = "http://192.168.31.204:8086"
//connect http://192.168.31.204:8086
func connect() {
	// Create new client with default option for server url authenticate by token
	client = influxdb2.NewClient(url, token)
}
//close client
func close() {
	client.Close()
}
//query
func queryPoints() {
	// Get query client
	queryAPI := client.QueryAPI(org)
	// Get QueryTableResult
	result, err := queryAPI.Query(context.Background(),
		fmt.Sprintf(`from(bucket:"%v")
							|> range(start: -30d)
							|> filter(fn: (r) =>
								r._measurement == "system" and
								r._field=="temperature" and
								r.ip=="192.168.31.206")`, bucket))
	if err == nil {
		// Iterate over query response
		for result.Next() {
			// Notice when group key has changed
			if result.TableChanged() {
				fmt.Printf("table: %s\n", result.TableMetadata().String())
			}
			// fmt.Println(result)
			// Access data
			fmt.Printf("field:%v value: %v time:%s\n", result.Record().Field(), result.Record().Value(), result.Record().Time())
		}
		// Check for an error
		if result.Err() != nil {
			fmt.Printf("query parsing error: %s\n", result.Err().Error())
		}
	} else {
		panic(err)
	}
}
//main()
func main() {
	connect()
	defer close()
	// writePonits()
	queryPoints()
}
输出结果
table: col{0: name: result, datatype: string, defaultValue: _result, group: false},col{1: name: table, datatype: long, defaultValue: , group: false},col{2: name: _start, datatype: dateTime:RFC3339, defaultValue: , group: true},col{3: name: _stop, datatype: dateTime:RFC3339, defaultValue: , group: true},col{4: name: _time, datatype: dateTime:RFC3339, defaultValue: , group: false},col{5: name: _value, datatype: double, defaultValue: , group: false},col{6: 
name: _field, datatype: string, defaultValue: , group: true},col{7: name: _measurement, datatype: string, defaultValue: , group: true},col{8: name: hostName, datatype: string, defaultValue: , group: true},col{9: name: ip, datatype: string, defaultValue: , group: true}
system
temperature
0
field:temperature value: 70 time:2021-05-05 13:21:06 +0000 UTC
field:temperature value: 50 time:2021-05-05 13:30:00 +0000 UTC
field:temperature value: 40 time:2021-05-05 13:32:09 +0000 UTC
2.写入数据
2.1 编码
//insert
func writePonits() {
	// Get non-blocking write client
	writeAPI := client.WriteAPI(org, bucket)
	p := influxdb2.NewPoint("system", //
		map[string]string{
			"ip":       "192.168.31.206",
			"hostName": "nodekafka"}, //
		map[string]interface{}{
			"temperature": 50.0,
			"diskfree":    "300G",
			"disktotal":   "400G"},
		time.Now())
	// Create point using full params constructor
	p1 := influxdb2.NewPointWithMeasurement("system").
		AddTag("ip", "192.168.31.206").
		AddTag("hostName", "nodekafka").
		AddField("temperature", 38.0).
		AddField("diskfree", "300G").
		AddField("disktotal", "400G").
		SetTime(time.Now())
	//write point asynchronously
	writeAPI.WritePoint(p)
	writeAPI.WritePoint(p1)
	// Flush writes
	writeAPI.Flush()
}
插入数据一共有两种api,阻塞与非阻塞,一般情况使用非阻塞,也就是我们通常说的异步,隐式批处理,数据异步写入底层缓冲区,当写缓冲区的大小达到批处理大小(默认为5000)或刷新间隔(默认为1s)超时时,它们将自动发送到服务器。Flush()方法同步阻塞,以确保写数据的缓冲区被刷新,写入数据。更多方法就参考github文档吧。
2.2 验证数据插入
influx查询


命令行查询

- 原文作者:Garfield
- 原文链接:http://www.randyfield.cn/post/2021-05-20-go-influxdb/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。
 
         
    