2017
Jun
06




Splunk 基本語法

功能 語法
average stats avg(duration) by ...
sort sort -_time, sort _time
replace rex mode=sed field=url "s/aaa/zzz/"

Regular Expression match

rex field=_raw "\"(?<msg>[^\[]+)\"" |table msg

Parse JSON string

如果 Splunk _raw data = {"user": {"id":"2134"}} ,我們可以用 spath 來 parse JSON string

spath json=_raw |table user.id

Parse JSON array

如果 Splunk _raw data = {"users": [{"id":"2134"},{"id":"2135"}]} ,我們可以用 spath + mvexpand 來 parse JSON string

spath json=_raw path=users{} output=x | mvexpand x | spath input=x | table id

mvexpand 的意思是把 users array 攤開成多筆資料,所以這個 example 會得到兩筆 id

Parse 特殊字元

如果 _raw data 中有反斜線,那麼可以使用 [^a] ,利用 非 a 來判斷

Example
  1. rex field=raw "[^a]\"id[^a]\":[^a]\"(?<id>[^\"]+)[^a]\""

deocde unicode

如果你的 log value 有這種 unicode 格式 "\u8338\u5125\u6163\u78ba\u3684\u5716\u" ,可以透過 spath 來處理。

我給的 example ,斜線 "\" 是兩個,首先我用 sed 將斜線瀘掉一個 。

Example
  1. rex mode=sed field=data "s/[\\\]+/\\\/g"

再將 data 字串轉成 json object ,然後透過 spath 分析 json,就可以成功取出 decode 過的中文字。

Example
  1. eval data="{\"msg\":\"" + data + "\"}" | spath input=data | table msg

判定某變數是否為空白

eval hasSubtitle = if (subtitle !="", "1", "0")

處理空白資料

eval serverId = if (isnull(serverId), "unknow", serverId)

用 search 瀘掉不要的資料

search id=123
search price >= 100

濾掉 url 後面的 Query String

rex field=url "(?[^\?]+)"

計算 percentage

Example
  1. | stats count by url
  2. | eventstats sum(count) as total
  3. | eval percent = round(100 * count/total) . " %"
  4. | table url,count,percent

比對字串是否包含某個字元

eval req500=mvfind(status, "500")

使用 eval 增加一個新變數時,不能使用數字開頭的變數,否則在 if condition 中會有問題。

統計各 device 流量

eval isIOS=mvfind(agent, "iOS") | eval isAndroid=mvfind(agent, "android") | eval isWeb = if(isnull(isIOS), if(isnull(isAndroid), 0, 1), 1) | eval device = if(isnull(isIOS), if(isnull(isAndroid), "Web" , "Android") , "iOS") | stats count by device

圖表

利用 timechart 統計 HTTP status

Example
  1. | timechart span=1m count(eval(status="200")) AS 200,count(eval(status="401")) AS 401, count(eval(status="403")) AS 403 , count(eval(status="500")) AS 500
  • span=1m 代表每一分鐘統計一次

每 10 分鐘統計一次 QPS

index=xx_access-log | bucket _time span=10m | stats count by _time | eval QPS=count/600 |table _time,count,QPS

join 兩個 index

通常 Web service 會有兩種 log : access log 與 error log ,只要兩個 log 有一個共同的 id ,那麼我們就可以透過 join 的功能將它們合併 ,範例中我的 access log 有個 accessId ,而在 error log 中有個 logId ,一個 Request 的 accessId and logId 值是相同的,合併方式如下。

index=access_log method="POST" url="*xxx*" | eval logId=accessId | stats values(url) AS url, values(_time) as _time ,values(host) as host by logId | join type=left logId [search _raw="*exception*" index=error_log | stats values(_raw) as error by logId ]

回應 (Leave a comment)