【Golang】influxdb使用之路(二)Go语言操作influxdb
前面【influxdb】influxdb使用之路(一)初探已经大概介绍了infuxdb
,line protocol,如何写入数据、查询数据。接下来介绍作为开发人员的你我,怎样与influxdb
交互了。
1.查询数据
上次的数据已经有一段时间间隔了,需要更改我们的查询范围了,否则就查不到了。
1.1 获取token
方法一
可以通过命令行查看管理员token
,admin's Token
influx auth list
方法二
或者通过管理界面
1.2 编码
go get github.com/influxdata/influxdb-client-go/v2
package main
import (
"context"
"fmt"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)
var client influxdb2.Client
// You can generate a Token from the "Tokens Tab" in the UI
const token = "9KMVnInyi1ATEsTBbKKwyyafQW1KqOIw0S7KnM2tH3eNxrhB6WNq1yN0RLg8W7dRRzIq9Jjt4UdXGOxPD1-7hQ=="
const bucket = "server"
const org = "randy"
//Store the URL of your InfluxDB instance
const url = "http://192.168.31.204:8086"
//connect http://192.168.31.204:8086
func connect() {
// Create new client with default option for server url authenticate by token
client = influxdb2.NewClient(url, token)
}
//close client
func close() {
client.Close()
}
//query
func queryPoints() {
// Get query client
queryAPI := client.QueryAPI(org)
// Get QueryTableResult
result, err := queryAPI.Query(context.Background(),
fmt.Sprintf(`from(bucket:"%v")
|> range(start: -30d)
|> filter(fn: (r) =>
r._measurement == "system" and
r._field=="temperature" and
r.ip=="192.168.31.206")`, bucket))
if err == nil {
// Iterate over query response
for result.Next() {
// Notice when group key has changed
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
// fmt.Println(result)
// Access data
fmt.Printf("field:%v value: %v time:%s\n", result.Record().Field(), result.Record().Value(), result.Record().Time())
}
// Check for an error
if result.Err() != nil {
fmt.Printf("query parsing error: %s\n", result.Err().Error())
}
} else {
panic(err)
}
}
//main()
func main() {
connect()
defer close()
// writePonits()
queryPoints()
}
输出结果
table: col{0: name: result, datatype: string, defaultValue: _result, group: false},col{1: name: table, datatype: long, defaultValue: , group: false},col{2: name: _start, datatype: dateTime:RFC3339, defaultValue: , group: true},col{3: name: _stop, datatype: dateTime:RFC3339, defaultValue: , group: true},col{4: name: _time, datatype: dateTime:RFC3339, defaultValue: , group: false},col{5: name: _value, datatype: double, defaultValue: , group: false},col{6:
name: _field, datatype: string, defaultValue: , group: true},col{7: name: _measurement, datatype: string, defaultValue: , group: true},col{8: name: hostName, datatype: string, defaultValue: , group: true},col{9: name: ip, datatype: string, defaultValue: , group: true}
system
temperature
0
field:temperature value: 70 time:2021-05-05 13:21:06 +0000 UTC
field:temperature value: 50 time:2021-05-05 13:30:00 +0000 UTC
field:temperature value: 40 time:2021-05-05 13:32:09 +0000 UTC
2.写入数据
2.1 编码
//insert
func writePonits() {
// Get non-blocking write client
writeAPI := client.WriteAPI(org, bucket)
p := influxdb2.NewPoint("system", //
map[string]string{
"ip": "192.168.31.206",
"hostName": "nodekafka"}, //
map[string]interface{}{
"temperature": 50.0,
"diskfree": "300G",
"disktotal": "400G"},
time.Now())
// Create point using full params constructor
p1 := influxdb2.NewPointWithMeasurement("system").
AddTag("ip", "192.168.31.206").
AddTag("hostName", "nodekafka").
AddField("temperature", 38.0).
AddField("diskfree", "300G").
AddField("disktotal", "400G").
SetTime(time.Now())
//write point asynchronously
writeAPI.WritePoint(p)
writeAPI.WritePoint(p1)
// Flush writes
writeAPI.Flush()
}
插入数据一共有两种api,阻塞与非阻塞,一般情况使用非阻塞,也就是我们通常说的异步,隐式批处理,数据异步写入底层缓冲区,当写缓冲区的大小达到批处理大小(默认为5000)或刷新间隔(默认为1s)超时时,它们将自动发送到服务器。Flush()
方法同步阻塞,以确保写数据的缓冲区被刷新,写入数据。更多方法就参考github文档吧。
2.2 验证数据插入
influx查询
命令行查询
- 原文作者:Garfield
- 原文链接:http://www.randyfield.cn/post/2021-05-20-go-influxdb/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。