Detection rules › Kusto
Detect unauthorized data transfers using timeseries anomaly (ASIM Web Session)
'This query utilizes built-in KQL anomaly detection algorithms to identify anomalous data transfers to public networks. It detects significant deviations from a baseline pattern, allowing the detection of sudden increases in data transferred to unknown public networks, which may indicate data exfiltration attempts. Investigating such anomalies is crucial. The score indicates the degree to which the data transfer deviates from the baseline value. A higher score indicates a greater deviation. The query's output provides an aggregated summary view of the traffic observed in the flagged anomaly hour, including unique combinations of source IP addresses, destination IP addresses, and port bytes sent. It may be necessary to run queries for individual source IP addresses from the provided 'SourceIPlist' to identify any suspicious activity that warrants further investigation'
MITRE ATT&CK coverage
| Tactic | Techniques |
|---|---|
| Exfiltration | T1030 Data Transfer Size Limits |
Rule body kusto
id: 5965d3e7-8ed0-477c-9b42-e75d9237fab0
name: Detect unauthorized data transfers using timeseries anomaly (ASIM Web Session)
description: |
'This query utilizes built-in KQL anomaly detection algorithms to identify anomalous data transfers to public networks. It detects significant deviations from a baseline pattern, allowing the detection of sudden increases in data transferred to unknown public networks, which may indicate data exfiltration attempts. Investigating such anomalies is crucial.
The score indicates the degree to which the data transfer deviates from the baseline value. A higher score indicates a greater deviation. The query's output provides an aggregated summary view of the traffic observed in the flagged anomaly hour, including unique combinations of source IP addresses, destination IP addresses, and port bytes sent. It may be necessary to run queries for individual source IP addresses from the provided 'SourceIPlist' to identify any suspicious activity that warrants further investigation'
severity: Medium
status: Available
tags:
- Schema: WebSession
SchemaVersion: 0.2.6
requiredDataConnectors: []
queryFrequency: 1d
queryPeriod: 14d
triggerOperator: gt
triggerThreshold: 0
tactics:
- Exfiltration
relevantTechniques:
- T1030
query: |
let startTime = 14d;
let endTime = 1d;
let timeframe = 1h;
let scorethreshold = 5;
let bytessentperhourthreshold = 10;
// calculate avg. eps(events per second)
let eps = materialize(_Im_WebSession(starttime=ago(1d))
| project TimeGenerated
| summarize AvgPerSec = count() / 3600 by bin(TimeGenerated, 1h)
| summarize round(avg(AvgPerSec))
);
let summarizationexist = (
union isfuzzy=true
(
WebSession_Summarized_SrcIP_CL
| where EventTime_t > ago(1d)
| project v = int(2)
),
(
print int(1)
| project v = print_0
)
| summarize maxv = max(v)
| extend sumexist = (maxv > 1)
);
let TimeSeriesData = union isfuzzy=true
(
(datatable(exists: int, sumexist: bool)[1, false]
| where toscalar(eps) > 1000
| join (summarizationexist) on sumexist)
| join (
_Im_WebSession(starttime=ago(2d), endtime=now())
| project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
| where isnotempty(DstIpAddr)
and not(ipv4_is_private(DstIpAddr))
and isnotempty(SrcBytes)
| summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
| extend EventTime = TimeGenerated, exists=int(1)
)
on exists
| project-away exists*, maxv, sum*
),
(
(datatable(exists: int, sumexist: bool)[1, false]
| where toscalar(eps) between (501 .. 1000)
| join (summarizationexist) on sumexist)
| join (
_Im_WebSession(starttime=ago(3d), endtime=now())
| project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
| where isnotempty(DstIpAddr)
and not(ipv4_is_private(DstIpAddr))
and isnotempty(SrcBytes)
| summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
| extend EventTime = TimeGenerated, exists=int(1)
)
on exists
| project-away exists*, maxv, sum*
),
(
(datatable(exists: int, sumexist: bool)[1, false]
| where toscalar(eps) <= 500
| join (summarizationexist) on sumexist)
| join (
_Im_WebSession(starttime=ago(4d), endtime=now())
| project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
| where isnotempty(DstIpAddr)
and not(ipv4_is_private(DstIpAddr))
and isnotempty(SrcBytes)
| summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
| extend EventTime = TimeGenerated, exists=int(1)
)
on exists
| project-away exists*, maxv, sum*
),
(
WebSession_Summarized_SrcIP_CL
| where EventTime_t between (ago(startTime) .. now())
| where isnotempty(SrcBytes_d) and not(DstIPIsPrivate_b)
| project
SrcBytesSum=tolong(SrcBytes_d),
EventTime=EventTime_t,
EventProduct = EventProduct_s
)
| make-series TotalBytesSent = sum(SrcBytesSum) on EventTime from startofday(ago(startTime)) to startofday(now()) step timeframe by EventProduct;
// TimeSeriesData block ends here
//Take only anomalies in TimeSeriesData
let TimeSeriesAnomalies = materialize(TimeSeriesData
| extend (anomalies, score, baseline) = series_decompose_anomalies(TotalBytesSent, scorethreshold, -1, 'linefit')
| mv-expand
TotalBytesSent to typeof(long),
EventTime to typeof(datetime),
anomalies to typeof(double),
score to typeof(double),
baseline to typeof(long)
| where anomalies > 0 and baseline > 0
| extend AnomalyHour = EventTime
| extend
TotalBytesSentinMBperHour = round(((TotalBytesSent / 1024) / 1024), 2),
BaselineBytesSentinMBperHour = round(((baseline / 1024) / 1024), 2),
score = round(score, 2)
| project
EventProduct,
AnomalyHour,
TotalBytesSentinMBperHour,
BaselineBytesSentinMBperHour,
anomalies,
score
| where AnomalyHour between (startofday(ago(endTime)) .. startofday(now())) // Get TimeSeriesAnomalies in previous day
);
// TimeSeriesAlerts block end here
let AnomalyHours = materialize (TimeSeriesAnomalies
| project AnomalyHour);
//Previous day aggregated per hour
let PreviousDayLogs =
_Im_WebSession(starttime=startofday(ago(endTime)), endtime=startofday(now()))
| where isnotempty(DstIpAddr) and isnotempty(SrcIpAddr) and isnotempty(SrcBytes)
| where not(ipv4_is_private(DstIpAddr))
| project
TimeGenerated,
DstIpAddr,
SrcIpAddr,
SrcBytes,
DstBytes,
DstPortNumber,
EventProduct
| extend DateHour = bin(TimeGenerated, timeframe) // create a new column and round to hour
| where DateHour in (AnomalyHours) // Filter dataset to include only anomaly AnomalyHours
| extend
SentBytesinMB = ((SrcBytes / 1024) / 1024),
ReceivedBytesinMB = ((DstBytes / 1024) / 1024)
| summarize
HourlyCount = count(),
TimeGeneratedMax = arg_max(TimeGenerated, *),
DestinationIPList = make_set(DstIpAddr, 100),
DestinationPortList = make_set(DstPortNumber, 100),
SentBytesinMB = tolong(sum(SentBytesinMB)),
ReceivedBytesinMB = tolong(sum(ReceivedBytesinMB))
by SrcIpAddr, EventProduct, TimeGeneratedHour = bin(TimeGenerated, timeframe)
| where SentBytesinMB > bytessentperhourthreshold
| sort by TimeGeneratedHour asc, SentBytesinMB desc
| extend Rank=row_number(1, prev(TimeGeneratedHour) != TimeGeneratedHour) // Ranking the dataset per Hourly Partition
| where Rank <= 10 // Selecting Top 10 records with Highest BytesSent in each Hour
| project
EventProduct,
TimeGeneratedHour,
TimeGeneratedMax,
SrcIpAddr,
DestinationIPList,
DestinationPortList,
SentBytesinMB,
ReceivedBytesinMB,
Rank,
HourlyCount;
// PreviousDayLogs block ends here
TimeSeriesAnomalies
| join kind = inner (PreviousDayLogs
| extend AnomalyHour = TimeGeneratedHour)
on EventProduct, AnomalyHour
| sort by score desc
| project
EventProduct,
AnomalyHour,
TimeGeneratedMax,
SrcIpAddr,
DestinationIPList,
DestinationPortList,
SentBytesinMB,
ReceivedBytesinMB,
TotalBytesSentinMBperHour,
BaselineBytesSentinMBperHour,
score,
anomalies,
HourlyCount
| summarize
EventCount = sum(HourlyCount),
startTimeUtc = min(TimeGeneratedMax),
EndTimeUtc = max(TimeGeneratedMax),
SentBytesinMB = sum(SentBytesinMB),
ReceivedBytesinMB = sum(ReceivedBytesinMB),
SourceIP = take_any(SrcIpAddr),
SourceIPList = make_set(SrcIpAddr, 10),
DestinationIPList = make_set(DestinationIPList, 100),
DestinationPortList = make_set(DestinationPortList, 100)
by
AnomalyHour,
TotalBytesSentinMBperHour,
BaselineBytesSentinMBperHour,
score,
anomalies,
EventProduct
| project
EventProduct,
AnomalyHour,
startTimeUtc,
EndTimeUtc,
SourceIP,
SourceIPList,
DestinationIPList,
DestinationPortList,
SentBytesinMB,
ReceivedBytesinMB,
TotalBytesSentinMBperHour,
BaselineBytesSentinMBperHour,
anomalies,
score,
EventCount
entityMappings:
- entityType: IP
fieldMappings:
- identifier: Address
columnName: SourceIP
eventGroupingSettings:
aggregationKind: AlertPerResult
customDetails:
EventCount: EventCount
SourceIPList: SourceIPList
DestinationIPList: DestinationIPList
DestinationPortList: DestinationPortList
SentBytesinMB: SentBytesinMB
ReceivedBytesinMB: ReceivedBytesinMB
anomalies: anomalies
score: score
alertDetailsOverride:
alertDisplayNameFormat: "IP address '{{SourceIP}}' is engaged in data transfers to a public network that exceeds usual levels"
alertDescriptionFormat: "Please conduct a thorough investigation of each IPAddresses listed in SourceIPList: '{{SourceIPList}}' to identify any suspicious activities that may require further investigation. 'SourceIPList' include the top 10 client IP addresses that transmitted the highest amount of data during the anomalous hour"
version: 1.0.1
kind: Scheduled
Stages and Predicates
Parameters
let startTime = 14d;
let endTime = 1d;
let timeframe = 1h;
let scorethreshold = 5;
let bytessentperhourthreshold = 10;
Let binding: eps
let eps = materialize(_Im_WebSession(starttime=ago(1d))
| project TimeGenerated
| summarize AvgPerSec = count() / 3600 by bin(TimeGenerated, 1h)
| summarize round(avg(AvgPerSec))
);
Let binding: summarizationexist
let summarizationexist = (
union isfuzzy=true
(
WebSession_Summarized_SrcIP_CL
| where EventTime_t > ago(1d)
| project v = int(2)
),
(
print int(1)
| project v = print_0
)
| summarize maxv = max(v)
| extend sumexist = (maxv > 1)
);
Let binding: AnomalyHours
let AnomalyHours = materialize (TimeSeriesAnomalies
| project AnomalyHour);
Derived from TimeSeriesAnomalies.
Let binding: PreviousDayLogs
let PreviousDayLogs = _Im_WebSession(starttime=startofday(ago(endTime)), endtime=startofday(now()))
| where isnotempty(DstIpAddr) and isnotempty(SrcIpAddr) and isnotempty(SrcBytes)
| where not(ipv4_is_private(DstIpAddr))
| project
TimeGenerated,
DstIpAddr,
SrcIpAddr,
SrcBytes,
DstBytes,
DstPortNumber,
EventProduct
| extend DateHour = bin(TimeGenerated, timeframe)
| where DateHour in (AnomalyHours)
| extend
SentBytesinMB = ((SrcBytes / 1024) / 1024),
ReceivedBytesinMB = ((DstBytes / 1024) / 1024)
| summarize
HourlyCount = count(),
TimeGeneratedMax = arg_max(TimeGenerated, *),
DestinationIPList = make_set(DstIpAddr, 100),
DestinationPortList = make_set(DstPortNumber, 100),
SentBytesinMB = tolong(sum(SentBytesinMB)),
ReceivedBytesinMB = tolong(sum(ReceivedBytesinMB))
by SrcIpAddr, EventProduct, TimeGeneratedHour = bin(TimeGenerated, timeframe)
| where SentBytesinMB > bytessentperhourthreshold
| sort by TimeGeneratedHour asc, SentBytesinMB desc
| extend Rank=row_number(1, prev(TimeGeneratedHour) != TimeGeneratedHour)
| where Rank <= 10
| project
EventProduct,
TimeGeneratedHour,
TimeGeneratedMax,
SrcIpAddr,
DestinationIPList,
DestinationPortList,
SentBytesinMB,
ReceivedBytesinMB,
Rank,
HourlyCount;
Derived from endTime, timeframe, bytessentperhourthreshold, AnomalyHours.
The stages below define let TimeSeriesAnomalies (the rule's main pipeline source).
Stage 1: source
TimeSeriesData
Stage 2: extend
extend anomalies, baseline, score
Stage 3: mv-expand
mv-expand TotalBytesSent
Stage 4: where
where anomalies > 0 and baseline > 0
Stage 5: extend
extend AnomalyHour
Stage 6: extend
extend BaselineBytesSentinMBperHour, TotalBytesSentinMBperHour, score
Stage 7: project
project AnomalyHour, BaselineBytesSentinMBperHour, EventProduct, TotalBytesSentinMBperHour, anomalies, score
Stage 8: where
where /* macro: (AnomalyHour between (startofday(ago(1d)) .. startofday(now()))) */
Stage 9: join
join kind=inner (...)
The stages below run on TimeSeriesAnomalies (the outer pipeline).
Stage 10: sort
sort by score
Stage 11: project
project AnomalyHour, BaselineBytesSentinMBperHour, DestinationIPList, DestinationPortList, EventProduct, HourlyCount, ReceivedBytesinMB, SentBytesinMB, SrcIpAddr, TimeGeneratedMax, TotalBytesSentinMBperHour, anomalies, score
Stage 12: summarize
summarize DestinationIPList, DestinationPortList, EndTimeUtc, EventCount, ReceivedBytesinMB, SentBytesinMB, SourceIP, SourceIPList, startTimeUtc by AnomalyHour, TotalBytesSentinMBperHour, BaselineBytesSentinMBperHour, score, anomalies, EventProduct
Stage 13: project
project AnomalyHour, BaselineBytesSentinMBperHour, DestinationIPList, DestinationPortList, EndTimeUtc, EventCount, EventProduct, ReceivedBytesinMB, SentBytesinMB, SourceIP, SourceIPList, TotalBytesSentinMBperHour, anomalies, score, startTimeUtc
Exclusions
Top-level NOT(...) conjuncts: predicates this rule actively suppresses.
| Field | Kind | Excluded values |
|---|---|---|
DstIpAddr | cidr_match | 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, 169.254.0.0/16, 127.0.0.0/8 |
Indicators
Each row is a field, operator, and value that the rule matches. The corpus column counts how many other rules in the catalog look for the same combination: high numbers point to widely-used, community-vetted indicators. Blank or 1 shows that the indicator is specific to this rule.
Output fields
Fields the rule emits when it matches. Chronicle authors list these in the outcome block; they appear on the detection and $risk_score drives alerting. Sentinel / Defender XDR rules build them up through project / summarize / extend stages. Sentinel maps these into alert fields via entityMappings and customDetails; Defender XDR custom detections surface them as alert fields directly.
| Field | Source |
|---|---|
AnomalyHour | project |
BaselineBytesSentinMBperHour | project |
DestinationIPList | project |
DestinationPortList | project |
EndTimeUtc | project |
EventCount | project |
EventProduct | project |
ReceivedBytesinMB | project |
SentBytesinMB | project |
SourceIP | project |
SourceIPList | project |
TotalBytesSentinMBperHour | project |
anomalies | project |
score | project |
startTimeUtc | project |