Detection rules › Kusto

Detect unauthorized data transfers using timeseries anomaly (ASIM Web Session)

Status
available
Severity
medium
Time window
14d
Group by
AnomalyHour, BaselineBytesSentinMBperHour, EventProduct, TotalBytesSentinMBperHour, anomalies, score
Source
github.com/Azure/Azure-Sentinel

'This query utilizes built-in KQL anomaly detection algorithms to identify anomalous data transfers to public networks. It detects significant deviations from a baseline pattern, allowing the detection of sudden increases in data transferred to unknown public networks, which may indicate data exfiltration attempts. Investigating such anomalies is crucial. The score indicates the degree to which the data transfer deviates from the baseline value. A higher score indicates a greater deviation. The query's output provides an aggregated summary view of the traffic observed in the flagged anomaly hour, including unique combinations of source IP addresses, destination IP addresses, and port bytes sent. It may be necessary to run queries for individual source IP addresses from the provided 'SourceIPlist' to identify any suspicious activity that warrants further investigation'

MITRE ATT&CK coverage

TacticTechniques
ExfiltrationT1030 Data Transfer Size Limits

Rule body kusto

id: 5965d3e7-8ed0-477c-9b42-e75d9237fab0
name: Detect unauthorized data transfers using timeseries anomaly (ASIM Web Session)
description: |
  'This query utilizes built-in KQL anomaly detection algorithms to identify anomalous data transfers to public networks. It detects significant deviations from a baseline pattern, allowing the detection of sudden increases in data transferred to unknown public networks, which may indicate data exfiltration attempts. Investigating such anomalies is crucial.
  The score indicates the degree to which the data transfer deviates from the baseline value. A higher score indicates a greater deviation. The query's output provides an aggregated summary view of the traffic observed in the flagged anomaly hour, including unique combinations of source IP addresses, destination IP addresses, and port bytes sent. It may be necessary to run queries for individual source IP addresses from the provided 'SourceIPlist' to identify any suspicious activity that warrants further investigation'
severity: Medium
status: Available 
tags:
  - Schema: WebSession
    SchemaVersion: 0.2.6
requiredDataConnectors: []
queryFrequency: 1d
queryPeriod: 14d
triggerOperator: gt
triggerThreshold: 0
tactics:
  - Exfiltration
relevantTechniques:
  - T1030
query: |
  let startTime = 14d;
  let endTime = 1d;
  let timeframe = 1h;
  let scorethreshold = 5;
  let bytessentperhourthreshold = 10;
  // calculate avg. eps(events per second)
  let eps = materialize(_Im_WebSession(starttime=ago(1d))
      | project TimeGenerated
      | summarize AvgPerSec = count() / 3600 by bin(TimeGenerated, 1h)
      | summarize round(avg(AvgPerSec))
      );
  let summarizationexist  = (
      union isfuzzy=true 
          (
          WebSession_Summarized_SrcIP_CL
          | where EventTime_t > ago(1d) 
          | project v = int(2)
          ),
          (
          print int(1) 
          | project v = print_0
          )
      | summarize maxv = max(v)
      | extend sumexist = (maxv > 1)
      );
  let TimeSeriesData = union isfuzzy=true 
          (
          (datatable(exists: int, sumexist: bool)[1, false]
          | where toscalar(eps) > 1000
          | join (summarizationexist) on sumexist)
          | join (
              _Im_WebSession(starttime=ago(2d), endtime=now())
              | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
              | where isnotempty(DstIpAddr)
                  and not(ipv4_is_private(DstIpAddr))
                  and isnotempty(SrcBytes)
              | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
              | extend EventTime = TimeGenerated, exists=int(1)
              )
              on exists
          | project-away exists*, maxv, sum*
          ),
          (
          (datatable(exists: int, sumexist: bool)[1, false]
          | where toscalar(eps) between (501 .. 1000)
          | join (summarizationexist) on sumexist)
          | join (
              _Im_WebSession(starttime=ago(3d), endtime=now())
              | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
              | where isnotempty(DstIpAddr)
                  and not(ipv4_is_private(DstIpAddr))
                  and isnotempty(SrcBytes)
              | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
              | extend EventTime = TimeGenerated, exists=int(1)
              )
              on exists
          | project-away exists*, maxv, sum*
          ),
          (
          (datatable(exists: int, sumexist: bool)[1, false]
          | where toscalar(eps) <= 500
          | join (summarizationexist) on sumexist)
          | join (
              _Im_WebSession(starttime=ago(4d), endtime=now())
              | project DstIpAddr, SrcBytes, TimeGenerated, EventProduct
              | where isnotempty(DstIpAddr)
                  and not(ipv4_is_private(DstIpAddr))
                  and isnotempty(SrcBytes)
              | summarize SrcBytesSum=tolong(sum(SrcBytes)) by EventProduct, bin(TimeGenerated, 1h)
              | extend EventTime = TimeGenerated, exists=int(1)
              )
              on exists
          | project-away exists*, maxv, sum*
          ),
          (
          WebSession_Summarized_SrcIP_CL
          | where EventTime_t between (ago(startTime) .. now())
          | where isnotempty(SrcBytes_d) and not(DstIPIsPrivate_b)
          | project
              SrcBytesSum=tolong(SrcBytes_d),
              EventTime=EventTime_t,
              EventProduct = EventProduct_s
          )
      | make-series TotalBytesSent = sum(SrcBytesSum) on EventTime from startofday(ago(startTime)) to startofday(now()) step timeframe by EventProduct;
  // TimeSeriesData block ends here
  //Take only anomalies in TimeSeriesData
  let TimeSeriesAnomalies = materialize(TimeSeriesData
      | extend (anomalies, score, baseline) = series_decompose_anomalies(TotalBytesSent, scorethreshold, -1, 'linefit')
      | mv-expand
          TotalBytesSent to typeof(long),
          EventTime to typeof(datetime),
          anomalies to typeof(double),
          score to typeof(double),
          baseline to typeof(long)
      | where anomalies > 0 and baseline > 0
      | extend AnomalyHour = EventTime
      | extend
          TotalBytesSentinMBperHour = round(((TotalBytesSent / 1024) / 1024), 2),
          BaselineBytesSentinMBperHour = round(((baseline / 1024) / 1024), 2),
          score = round(score, 2)
      | project
          EventProduct,
          AnomalyHour,
          TotalBytesSentinMBperHour,
          BaselineBytesSentinMBperHour,
          anomalies,
          score
      | where AnomalyHour between (startofday(ago(endTime)) .. startofday(now())) // Get TimeSeriesAnomalies in previous day
          );
  // TimeSeriesAlerts block end here
  let AnomalyHours = materialize (TimeSeriesAnomalies
      | project AnomalyHour);
  //Previous day aggregated per hour
  let PreviousDayLogs = 
      _Im_WebSession(starttime=startofday(ago(endTime)), endtime=startofday(now()))
      | where isnotempty(DstIpAddr) and isnotempty(SrcIpAddr) and isnotempty(SrcBytes)
      | where not(ipv4_is_private(DstIpAddr))
      | project
          TimeGenerated,
          DstIpAddr,
          SrcIpAddr,
          SrcBytes,
          DstBytes,
          DstPortNumber,
          EventProduct
      | extend DateHour = bin(TimeGenerated, timeframe) // create a new column and round to hour
      | where DateHour in (AnomalyHours) // Filter dataset to include only anomaly AnomalyHours
      | extend
          SentBytesinMB = ((SrcBytes / 1024) / 1024),
          ReceivedBytesinMB = ((DstBytes / 1024) / 1024)
      | summarize
          HourlyCount = count(),
          TimeGeneratedMax = arg_max(TimeGenerated, *),
          DestinationIPList = make_set(DstIpAddr, 100),
          DestinationPortList = make_set(DstPortNumber, 100),
          SentBytesinMB = tolong(sum(SentBytesinMB)),
          ReceivedBytesinMB = tolong(sum(ReceivedBytesinMB))
          by SrcIpAddr, EventProduct, TimeGeneratedHour = bin(TimeGenerated, timeframe)
      | where SentBytesinMB > bytessentperhourthreshold
      | sort by TimeGeneratedHour asc, SentBytesinMB desc
      | extend Rank=row_number(1, prev(TimeGeneratedHour) != TimeGeneratedHour) // Ranking the dataset per Hourly Partition
      | where Rank <= 10  // Selecting Top 10 records with Highest BytesSent in each Hour
      | project
          EventProduct,
          TimeGeneratedHour,
          TimeGeneratedMax,
          SrcIpAddr,
          DestinationIPList,
          DestinationPortList,
          SentBytesinMB,
          ReceivedBytesinMB,
          Rank,
          HourlyCount;
  // PreviousDayLogs block ends here
  TimeSeriesAnomalies
  | join kind = inner (PreviousDayLogs
      | extend AnomalyHour = TimeGeneratedHour)
      on EventProduct, AnomalyHour
  | sort by score desc
  | project
      EventProduct,
      AnomalyHour,
      TimeGeneratedMax,
      SrcIpAddr,
      DestinationIPList,
      DestinationPortList,
      SentBytesinMB,
      ReceivedBytesinMB,
      TotalBytesSentinMBperHour,
      BaselineBytesSentinMBperHour,
      score,
      anomalies,
      HourlyCount
  | summarize
      EventCount = sum(HourlyCount),
      startTimeUtc = min(TimeGeneratedMax),
      EndTimeUtc = max(TimeGeneratedMax),
      SentBytesinMB = sum(SentBytesinMB),
      ReceivedBytesinMB = sum(ReceivedBytesinMB),
      SourceIP = take_any(SrcIpAddr),
      SourceIPList = make_set(SrcIpAddr, 10),
      DestinationIPList = make_set(DestinationIPList, 100),
      DestinationPortList = make_set(DestinationPortList, 100)
      by
      AnomalyHour,
      TotalBytesSentinMBperHour,
      BaselineBytesSentinMBperHour,
      score,
      anomalies,
      EventProduct
  | project
      EventProduct,
      AnomalyHour,
      startTimeUtc,
      EndTimeUtc,
      SourceIP,
      SourceIPList,
      DestinationIPList,
      DestinationPortList,
      SentBytesinMB,
      ReceivedBytesinMB,
      TotalBytesSentinMBperHour,
      BaselineBytesSentinMBperHour,
      anomalies,
      score,
      EventCount
entityMappings:
  - entityType: IP
    fieldMappings:
      - identifier: Address
        columnName: SourceIP
eventGroupingSettings:
  aggregationKind: AlertPerResult
customDetails:
  EventCount: EventCount
  SourceIPList: SourceIPList
  DestinationIPList: DestinationIPList
  DestinationPortList: DestinationPortList
  SentBytesinMB: SentBytesinMB
  ReceivedBytesinMB: ReceivedBytesinMB
  anomalies: anomalies
  score: score
alertDetailsOverride:
  alertDisplayNameFormat: "IP address '{{SourceIP}}' is engaged in data transfers to a public network that exceeds usual levels"
  alertDescriptionFormat: "Please conduct a thorough investigation of each IPAddresses listed in SourceIPList: '{{SourceIPList}}' to identify any suspicious activities that may require further investigation. 'SourceIPList' include the top 10 client IP addresses that transmitted the highest amount of data during the anomalous hour"
version: 1.0.1
kind: Scheduled

Stages and Predicates

Parameters

let startTime = 14d;
let endTime = 1d;
let timeframe = 1h;
let scorethreshold = 5;
let bytessentperhourthreshold = 10;

Let binding: eps

let eps = materialize(_Im_WebSession(starttime=ago(1d))
    | project TimeGenerated
    | summarize AvgPerSec = count() / 3600 by bin(TimeGenerated, 1h)
    | summarize round(avg(AvgPerSec))
    );

Let binding: summarizationexist

let summarizationexist = (
    union isfuzzy=true 
        (
        WebSession_Summarized_SrcIP_CL
        | where EventTime_t > ago(1d) 
        | project v = int(2)
        ),
        (
        print int(1) 
        | project v = print_0
        )
    | summarize maxv = max(v)
    | extend sumexist = (maxv > 1)
    );

Let binding: AnomalyHours

let AnomalyHours = materialize (TimeSeriesAnomalies
    | project AnomalyHour);

Derived from TimeSeriesAnomalies.

Let binding: PreviousDayLogs

let PreviousDayLogs = _Im_WebSession(starttime=startofday(ago(endTime)), endtime=startofday(now()))
    | where isnotempty(DstIpAddr) and isnotempty(SrcIpAddr) and isnotempty(SrcBytes)
    | where not(ipv4_is_private(DstIpAddr))
    | project
        TimeGenerated,
        DstIpAddr,
        SrcIpAddr,
        SrcBytes,
        DstBytes,
        DstPortNumber,
        EventProduct
    | extend DateHour = bin(TimeGenerated, timeframe)
    | where DateHour in (AnomalyHours)
    | extend
        SentBytesinMB = ((SrcBytes / 1024) / 1024),
        ReceivedBytesinMB = ((DstBytes / 1024) / 1024)
    | summarize
        HourlyCount = count(),
        TimeGeneratedMax = arg_max(TimeGenerated, *),
        DestinationIPList = make_set(DstIpAddr, 100),
        DestinationPortList = make_set(DstPortNumber, 100),
        SentBytesinMB = tolong(sum(SentBytesinMB)),
        ReceivedBytesinMB = tolong(sum(ReceivedBytesinMB))
        by SrcIpAddr, EventProduct, TimeGeneratedHour = bin(TimeGenerated, timeframe)
    | where SentBytesinMB > bytessentperhourthreshold
    | sort by TimeGeneratedHour asc, SentBytesinMB desc
    | extend Rank=row_number(1, prev(TimeGeneratedHour) != TimeGeneratedHour)
    | where Rank <= 10
    | project
        EventProduct,
        TimeGeneratedHour,
        TimeGeneratedMax,
        SrcIpAddr,
        DestinationIPList,
        DestinationPortList,
        SentBytesinMB,
        ReceivedBytesinMB,
        Rank,
        HourlyCount;

Derived from endTime, timeframe, bytessentperhourthreshold, AnomalyHours.

The stages below define let TimeSeriesAnomalies (the rule's main pipeline source).

Stage 1: source

TimeSeriesData

Stage 2: extend

extend anomalies, baseline, score

Stage 3: mv-expand

mv-expand TotalBytesSent

Stage 4: where

where anomalies > 0 and baseline > 0

Stage 5: extend

extend AnomalyHour

Stage 6: extend

extend BaselineBytesSentinMBperHour, TotalBytesSentinMBperHour, score

Stage 7: project

project AnomalyHour, BaselineBytesSentinMBperHour, EventProduct, TotalBytesSentinMBperHour, anomalies, score

Stage 8: where

where /* macro: (AnomalyHour between (startofday(ago(1d)) .. startofday(now()))) */

Stage 9: join

join kind=inner (...)

The stages below run on TimeSeriesAnomalies (the outer pipeline).

Stage 10: sort

sort by score

Stage 11: project

project AnomalyHour, BaselineBytesSentinMBperHour, DestinationIPList, DestinationPortList, EventProduct, HourlyCount, ReceivedBytesinMB, SentBytesinMB, SrcIpAddr, TimeGeneratedMax, TotalBytesSentinMBperHour, anomalies, score

Stage 12: summarize

summarize DestinationIPList, DestinationPortList, EndTimeUtc, EventCount, ReceivedBytesinMB, SentBytesinMB, SourceIP, SourceIPList, startTimeUtc by AnomalyHour, TotalBytesSentinMBperHour, BaselineBytesSentinMBperHour, score, anomalies, EventProduct

Stage 13: project

project AnomalyHour, BaselineBytesSentinMBperHour, DestinationIPList, DestinationPortList, EndTimeUtc, EventCount, EventProduct, ReceivedBytesinMB, SentBytesinMB, SourceIP, SourceIPList, TotalBytesSentinMBperHour, anomalies, score, startTimeUtc

Exclusions

Top-level NOT(...) conjuncts: predicates this rule actively suppresses.

FieldKindExcluded values
DstIpAddrcidr_match10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, 169.254.0.0/16, 127.0.0.0/8

Indicators

Each row is a field, operator, and value that the rule matches. The corpus column counts how many other rules in the catalog look for the same combination: high numbers point to widely-used, community-vetted indicators. Blank or 1 shows that the indicator is specific to this rule.

FieldKindValues
DateHourin
  • AnomalyHours transforms: cased
DstIpAddris_not_null
  • (no value, null check)
Rankle
  • 10 transforms: cased
SentBytesinMBgt
  • 10 transforms: cased
SrcBytesis_not_null
  • (no value, null check)
SrcIpAddris_not_null
  • (no value, null check)
anomaliesgt
  • 0 transforms: cased
baselinegt
  • 0 transforms: cased

Output fields

Fields the rule emits when it matches. Chronicle authors list these in the outcome block; they appear on the detection and $risk_score drives alerting. Sentinel / Defender XDR rules build them up through project / summarize / extend stages. Sentinel maps these into alert fields via entityMappings and customDetails; Defender XDR custom detections surface them as alert fields directly.

FieldSource
AnomalyHourproject
BaselineBytesSentinMBperHourproject
DestinationIPListproject
DestinationPortListproject
EndTimeUtcproject
EventCountproject
EventProductproject
ReceivedBytesinMBproject
SentBytesinMBproject
SourceIPproject
SourceIPListproject
TotalBytesSentinMBperHourproject
anomaliesproject
scoreproject
startTimeUtcproject