Detection rules › Kusto

TI map Domain entity to Web Session Events (ASIM Web Session schema)

Severity
medium
Time window
14d
Group by
DomainName, Id, IndicatorId, ObservableValue, domain
Source
github.com/Azure/Azure-Sentinel

'This rule identifies Web Sessions for which the target URL hostname is a known IoC. This rule uses the Advanced Security Information Model (ASIM) and supports any web session source that complies with ASIM.'

MITRE ATT&CK coverage

TacticTechniques
Command & ControlT1071 Application Layer Protocol

Rule body kusto

id: afa4cb9e-6fec-4742-a17f-f494b54c01e7
name: TI map Domain entity to Web Session Events (ASIM Web Session schema)
description: |
  'This rule identifies Web Sessions for which the target URL hostname is a known IoC. This rule uses the [Advanced Security Information Model (ASIM)](https:/aka.ms/AboutASIM) and supports any web session source that complies with ASIM.'
severity: Medium
requiredDataConnectors:
  - connectorId: SquidProxy
    dataTypes:
      - SquidProxy_CL
  - connectorId: Zscaler
    dataTypes:
      - CommonSecurityLog
  - connectorId: ThreatIntelligence
    dataTypes:
      - ThreatIntelIndicators
  - connectorId: ThreatIntelligenceTaxii
    dataTypes:
      - ThreatIntelIndicators
  - connectorId: MicrosoftDefenderThreatIntelligence
    dataTypes:
      - ThreatIntelIndicators
queryFrequency: 1h
queryPeriod: 14d
triggerOperator: gt
triggerThreshold: 0
tactics:
  - CommandAndControl
relevantTechniques:
  - T1071
query: |
    let HAS_ANY_MAX = 10000;
    let dt_lookBack = 1h;
    let ioc_lookBack = 14d;
    //Create a list of TLDs in our threat feed for later validation
    let DOMAIN_TI=ThreatIntelIndicators
    // Picking up only IOC's that contain the entities we want
    | extend IndicatorType = replace(@"\[|\]|\""", "", tostring(split(ObservableKey, ":", 0)))
    | where IndicatorType == "domain-name"
    | extend DomainName = tolower(ObservableValue)
    | extend TrafficLightProtocolLevel = tostring(parse_json(AdditionalFields).TLPLevel)
    | extend IndicatorId = tostring(split(Id, "--")[2])
    | extend Url = iff(ObservableKey == "url:value", ObservableValue, "")
    | where TimeGenerated >= ago(ioc_lookBack)
    | summarize LatestIndicatorTime = arg_max(TimeGenerated, *) by Id, ObservableValue
    | where IsActive and (ValidUntil > now() or isempty(ValidUntil));
    let DOMAIN_TI_list= todynamic(toscalar(DOMAIN_TI | summarize NIoCs = dcount(DomainName), Domains = make_set(DomainName) 
      | project Domains=iff(NIoCs > HAS_ANY_MAX, dynamic([]), Domains) ));
    DOMAIN_TI
    | project-reorder *, IsActive, Tags, TrafficLightProtocolLevel, DomainName, Type
    // using innerunique to keep perf fast and result set low, we only need one match to indicate potential malicious activity that needs to be investigated
    | join kind=innerunique (
        _Im_WebSession(starttime=ago(dt_lookBack), url_has_any= DOMAIN_TI_list )
        //Extract domain patterns from syslog message
        | extend domain = tostring(parse_url(Url)["Host"])
        | where isnotempty(domain)
        | extend tld = tostring(split(domain, '.')[-1])
        | extend Event_TimeGenerated = TimeGenerated
    ) on $left.DomainName==$right.domain
    | where Event_TimeGenerated < ValidUntil
    | summarize Event_TimeGenerated  = arg_max(Event_TimeGenerated , *) by IndicatorId, domain
    | extend Description = tostring(parse_json(Data).description)
    | extend ActivityGroupNames = extract(@"ActivityGroup:(\S+)", 1, tostring(parse_json(Data).labels))
    | extend ThreatType = tostring(Data.indicator_types[0])
    | project Event_TimeGenerated, Description, ActivityGroupNames, IndicatorId, ValidUntil, Confidence, domain, SrcIpAddr, Url, ThreatType
entityMappings:
  - entityType: IP
    fieldMappings:
      - identifier: Address
        columnName: SrcIpAddr
  - entityType: URL
    fieldMappings:
      - identifier: Url
        columnName: Url
customDetails:
  EventTime: Event_TimeGenerated
  IoCDescription: Description
  ActivityGroupNames: ActivityGroupNames
  IndicatorId: IndicatorId
  ThreatType: ThreatType
  IoCExpirationTime: ValidUntil
  IoCConfidenceScore: Confidence
alertDetailsOverride:
  alertDisplayNameFormat: A web request from {{SrcIpAddr}} to hostname  {{domain}} matched an IoC
  alertDescriptionFormat: A client with address {{SrcIpAddr}} requested the URL {{Url}}, whose hostname is a known indicator of compromise of {{ThreatType}}. Consult the threat intelligence blade for more information on the indicator.
version: 1.0.11
kind: Scheduled

Stages and Predicates

Parameters

let HAS_ANY_MAX = 10000;
let dt_lookBack = 1h;
let ioc_lookBack = 14d;

Let binding: DOMAIN_TI_list

let DOMAIN_TI_list = todynamic(toscalar(DOMAIN_TI | summarize NIoCs = dcount(DomainName), Domains = make_set(DomainName) 
  | project Domains=iff(NIoCs > HAS_ANY_MAX, dynamic([]), Domains) ));

Derived from HAS_ANY_MAX, DOMAIN_TI.

The stages below define let DOMAIN_TI (the rule's main pipeline source).

Stage 1: source

ThreatIntelIndicators

Stage 2: extend

| extend IndicatorType = replace(@"\[|\]|\""", "", tostring(split(ObservableKey, ":", 0)))

Stage 3: where

| where IndicatorType == "domain-name"

Stage 4: extend (4 consecutive steps)

| extend DomainName = tolower(ObservableValue)
| extend TrafficLightProtocolLevel = tostring(parse_json(AdditionalFields).TLPLevel)
| extend IndicatorId = tostring(split(Id, "--")[2])
| extend Url = iff(ObservableKey == "url:value", ObservableValue, "")

Stage 5: where

| where TimeGenerated >= ago(ioc_lookBack)

Stage 6: summarize

| summarize LatestIndicatorTime = arg_max(TimeGenerated, *) by Id, ObservableValue

Stage 7: where

| where IsActive and (ValidUntil > now() or isempty(ValidUntil))

The stages below run on DOMAIN_TI (the outer pipeline).

Stage 8: project-reorder

DOMAIN_TI
| project-reorder *, IsActive, Tags, TrafficLightProtocolLevel, DomainName, Type

Stage 9: join

| join kind=innerunique (
    _Im_WebSession(starttime=ago(dt_lookBack), url_has_any= DOMAIN_TI_list )
    | extend domain = tostring(parse_url(Url)["Host"])
    | where isnotempty(domain)
    | extend tld = tostring(split(domain, '.')[-1])
    | extend Event_TimeGenerated = TimeGenerated
) on $left.DomainName==$right.domain

Stage 10: where

| where Event_TimeGenerated < ValidUntil

Stage 11: summarize

| summarize Event_TimeGenerated  = arg_max(Event_TimeGenerated , *) by IndicatorId, domain

Stage 12: extend (3 consecutive steps)

| extend Description = tostring(parse_json(Data).description)
| extend ActivityGroupNames = extract(@"ActivityGroup:(\S+)", 1, tostring(parse_json(Data).labels))
| extend ThreatType = tostring(Data.indicator_types[0])

Stage 13: project

| project Event_TimeGenerated, Description, ActivityGroupNames, IndicatorId, ValidUntil, Confidence, domain, SrcIpAddr, Url, ThreatType

Indicators

Each row is a field, operator, and value that the rule matches. The corpus column counts how many other rules in the catalog look for the same combination: high numbers point to widely-used, community-vetted indicators. Blank or 1 shows that the indicator is specific to this rule.

FieldKindValues
Event_TimeGeneratedlt
  • ValidUntil transforms: cased
IndicatorTypeeq
  • domain-name transforms: cased
ValidUntilis_null
  • (no value, null check)
domainis_not_null
  • (no value, null check)

Output fields

Fields the rule emits when it matches. Chronicle authors list these in the outcome block; they appear on the detection and $risk_score drives alerting. Sentinel / Defender XDR rules build them up through project / summarize / extend stages. Sentinel maps these into alert fields via entityMappings and customDetails; Defender XDR custom detections surface them as alert fields directly.

FieldSource
ActivityGroupNamesproject
Confidenceproject
Descriptionproject
Event_TimeGeneratedproject
IndicatorIdproject
SrcIpAddrproject
ThreatTypeproject
Urlproject
ValidUntilproject
domainproject