您的当前位置:首页正文

TIGK技术栈:kapacitor(二)tickscript脚本基本概念和语法

2024-11-08 来源:个人技术集锦

可能总结得不如官网详细,具体可移步官网:

kapacitor脚本一些核心概念

  • task:
    task是一个tickscript的最大粒度单位,可以说,一个tickscript就是一个task,task标识在kapacitor当中处理一段数据(a set of data)的任务集,一段脚本要起作用,首先需要经过kapacitor对当前的task文件进行编译(define)然后enable,同事还可以使用record命令来检测某一个task最近20分钟进入的数据流和节点处理的结果

  • pipeline
    和名称类似,pipeline就是指管道,标识数据的流向
    kapaciotr当中的数据是由influxdb处监听获取,从管道
    自上而下进行处理,可以有分支,可以有交汇,但是不可以回溯,管道根据数据源来源方式不同,可以分为两种,即stream和batch

  • stream 数据流
    kapacitor关键字之一,用来标识数据流来源的入口,即它会监听每一个插入influxdb的数据,然后通过管道当中 的各个节点进行处理,最终流向管道底部
    格式:

 var varible_name = stream
|from()
...
  • batch 批处理
    kapacitor关键字之一,用来标识批处理数据来源的入口,可以理解为每隔一段时间执行一条influxQL去influxdb当中查询数据,查询到的结果集会随当前batch所在的管道流向底部
    格式:
var varible_name = batch
|query('''select usage_idle from cpu where host = 'xxx'''')
...

stream是监听当前influxdb每一个时间点插入的数据流,而batch则是获取的InfluxQL查询的结果集,此结果集应该是单行的,stream下面可以不用直接跟from()方法指定数据库和表(即会监听当前所有插入influxdb的数据)
但是batch方法下必须指定query()作为批处理对数据进行筛选

  • node
    用来组成管道的节点,可以理解为我们常见的水管上用来拼接成管道的连接头,也可以理解为processor point,即数据处理器节点,一个pipeline是通过多个node组成的

  • filed和tag
    是influxdb数据库的字段类型
    field表示当前插入数据值,它带有数据类型,包括double int等
    tag其实就是标签的意思,通常用于数据筛选,它的数据类型一定是字符串类型
    eg:

host       cpu_usage_idle
----------------------
'1138'         58.5

表示1138主机上的cpu使用空闲率为58.5%
在kapacitor当中是会将filed和tag进行区分的,一个明显的区别就是filed在node的处理过程当中可能会被销毁,但是tag不会

  • syntax-subspace
    语法子空间
    kapacitor借鉴了很多种语言做出了tickscript,而tickscript当中包含语法子空间的
    简单的说就是在一个task当中,不止包含tickscript语法
    举个栗子
    在使用query方法时
batch
query('''select count(usage_idle) from cpu where time > now() - 10s limit 1''')

这里包含了influxQL查询语句,influxQL语法,就是一个语法子空间
还有比如使用eval模块时

|eval(lambda : if("usage_idle" > 80)).as('res')

这里使用lambda表达式,则使用了tickscript 和 lambda语法
通常在kapacitor当中也只会包含这两种另外的语法

  • chaining method和property method
    连接方法和连接方法的属性方法(子方法)
    之前有说过node
    定义一个node的方式则是使用一个管道符号(“|”)然后跟一个chaining method,概念上可以把chaining method直接认为是node
    比如
    var data = stream
    |from()
    上面的from就是一个chaining method,和管道符号一起组成了一个node
    而属性方法则是定义在当前连接方法下的子方法
    比如as()
    连接方法join()和连接方法eval()都有as()作为properties method 但是是两个完全不同的方法(join()下的as是为交汇的pipeline的结果集字段赋予前缀,eval()下的as是创建一个新的field字段供后面的节点使用)
    eg:
var data=stream
|from()
  .database('aomp_monitor_db')
  .retentionPolicy('one_day')

这里的database和retentionPolicy都是定义在chaining method下的property method,用来指定数据库和数据保存方法
需要注意的是,property method的执行顺序是在chaining method执行之前按顺序执行,property method的同一个chaining method下的property metohd的执行顺序也会影响执行结果(亲测)

就像不同的水管只能装不同的连接器一样,不同的node后面只能跟指定的node,有的node作为pipeline的终端后面也不可以跟node,具体可以上官网看node之间的关联关系

kapacitor脚本语法

1.脚本结构:

  • 通常一个task被拆分为两个部分
    declaration和expression
    即声明和操作
    声明即是声明变量
    操作则是对数据进行处理评测并发出警告等,即是上面写的pipeline定义和处理
    比如:
var v_alarmRuleId=1894653
var v_alarmPolicyId=1894654
var v_level='warn'
var v_host='{{ index .Tags "host" }}'
var v_target_id='{{ index .Tags "host" }}'
var v_db='aomp_monitor_db'
var v_rp='autogen'
var v_measurement='cpu'
var v_groupBy='host'
var v_whereFilter=lambda: ("host"=='1889911' OR "host"=='1874471' OR "host"=='1855534')
var v_name='测试'
var v_field='usage_idle'
var v_idVar=v_name +':{{.Group}}'
var v_message='id:{{ .ID }} ,name: {{ .Name}},taskname:{{.TaskName}},Group:{{.Group}}, host: {{ index .Tags "host" }} ,Level :warn, value:{{ index .Fields "value" }},time:{{.Time}}'
var v_idTag='alertID'
var v_levelTag='level'
var v_every=10s
var v_messageField='message'
var v_durationField='duration'
var v_value='{{ index .Fields "value" }}'
var data = stream 
|from() 
   .database(v_db) 
   .retentionPolicy(v_rp) 
   .measurement(v_measurement) 
   .where(v_whereFilter) 
   .groupBy(v_groupBy) 
   |eval(lambda: if(("usage_idle"<=10), 1, 0)).as('res').keep() 
var trigger = data 
|alert() 
   .info(lambda: "res" == 1)
    .id(v_idVar) 
    .message(v_message) 
    .messageField(v_messageField) 
    .durationField(v_durationField) 
    .stateChangesOnly() 
    .details('{"v_alarmRuleId":"1894653","v_alarmPolicyId":"1894654","v_level":"warn","v_host":"{{ index .Tags "host" }}","v_target_id":"{{ index .Tags "host" }}","v_db":"aomp_monitor_db","v_rp":"autogen","v_measurement":"cpu","v_groupBy":"host","v_name":"测试多条件告警规则策略","v_field":"usage_idle","v_idVar":"v_name +:{{.Group}}","v_message":"id:{{ .ID }} ,name: {{ .Name}},taskname:{{.TaskName}},Group:{{.Group}}, host: {{ index .Tags "host" }} ,Level :warn, value:{{ index .Fields "value" }},time:{{.Time}}","v_idTag":"alertID","v_levelTag":"level","v_every":"10s","v_messageField":"message","v_durationField":"duration","v_value":"{{ index .Fields "value" }}"}')
    .post('http://192.168.183.81:7001/open_api/alarm_notify/receive') 
var solve_trigger = data 
| window() 
  .period(60s) 
  .every(v_every) 
|alert() 
   .info(lambda: "res" == 0)
   .warn(lambda: "res" == 1)
    .id(v_idVar) 
    .message(v_message) 
    .messageField(v_messageField) 
    .durationField(v_durationField) 
    .stateChangesOnly() 
    .details('{"v_alarmRuleId":"1894653","v_alarmPolicyId":"1894654","v_level":"warn","v_host":"{{ index .Tags "host" }}","v_target_id":"{{ index .Tags "host" }}","v_db":"aomp_monitor_db","v_rp":"autogen","v_measurement":"cpu","v_groupBy":"host","v_name":"测试多条件告警规则策略","v_field":"usage_idle","v_idVar":"v_name +:{{.Group}}","v_message":"id:{{ .ID }} ,name: {{ .Name}},taskname:{{.TaskName}},Group:{{.Group}}, host: {{ index .Tags "host" }} ,Level :warn, value:{{ index .Fields "value" }},time:{{.Time}}","v_idTag":"alertID","v_levelTag":"level","v_every":"10s","v_messageField":"message","v_durationField":"duration","v_value":"{{ index .Fields "value" }}"}')
    .post('http://192.168.183.81:7001/open_api/alarm_notify/solve') 

2.变量
kapacitor脚本中所有声明的变量都是常量,每一次评估时变量的内容不可以修改
类似kapacitor这样的脚本有一个概念叫evaluate,即评估,每一次评估则意味着一次数据进入脚本,在数据在pipeline当中流动的时候,当前脚本定义的变量值和流动的数据本身,是不可修改的
变量声明格式类似go和javascript

var v_db = 'monitor_linux_db'

变量名必须以英文开头,区分大小写,英文开头后可以跟数字,下划线等
变量声明不可以用kapacitor关键字,比如stream和batch

3.空格
tickscript中,在expression部分,空格是可以忽略的,但是在声明变量的declaration部分要求必须使用空格将var标识和变量名隔开

4.单双引号
单引号在任何时候都表示字符串,
使用\ 反斜杠进行转义
同时kapacitor也支持类似python的语法,用三个连续的单引号表示长字符串(避免过多的转义符)

在lambda表达式和influxql语句当中,使用双引号来表示某一个field或者tag,尤其是在influxql语句当中,强烈建议字段名加上双引号,因为influxql有特别多的内置关键字,使用双引号可以将字段名和关键字进行区分

5.数据结构
常用的数据结构包含
string duration int float lamdba list

  • string
    使用单引号表示字符串
    当需要在字符串当中引用变量时:
    引用当前script当中定义的变量直接使用{{ .valrible_name }} 比如{{ .ID }}
    引用结果集或者stream当中定义的变量使用{{ index .Fileds "usage_idle" }}或者{{ index.Tags "host" }}
  • duration
    时间数据类型
    这里列举一下时间数据类型常见字面量
    1d 1w 1h 1m 1s 1ms 1u
    天 周 小时 分 秒 毫秒 微秒
  • regular expression
    正则表达式类型
    字面量:
var cz_reg = /^cz\d+/
  • lambda表达式
    通常用来做数据处理并返回一个处理后的值
    常见字面量
    var my_lambda = lambda:1>0
    返回一个值为true的lambda表达式类型变量
  • int 和 float
    如果没有在使用lineprotocol插入数据到influxdb的时候指定某个filed字段为int类型,那么默认都是float类型,需要注意的是,有的lambda表达式方法需要的参数限制了数据类型为int,这个时候可以用int()方法强转一下

6.comparison operator 比较器
lamdba比较器
使用 == 和 !=,以及AND 和 OR 进行表达式比较
influxql比较器
除了普遍的sql语法外,influxQL还支持使用=~和!~
来进行正则表达式的匹配

7.关于node的分类
关于node
node分类为
datasource definition nodes 比如 batch和stream
data definition node 比如 from() query()
data manipulation nodes 比如 sample() default()
processing nodes
processing nodes下分
数据构造处理节点和传输处理节点
比如alert()是数据传输处理节点
join()是数据构造处理节点

8.kapacitor还可以调用自己的go方法
使用@+方法名进行调用
参考官方文档

下一篇讲kapacitor脚本监控实战和使用此组件监控的心得体会

Top