Detection rules › Kusto
Potential communication with a Domain Generation Algorithm (DGA) based hostname (ASIM Web Session schema)
'This rule identifies communication with hosts that have a domain name that might have been generated by a Domain Generation Algorithm (DGA). DGAs are used by malware to generate rendezvous points that are difficult to predict in advance. This detection uses the top 1 million domain names to build a model of what normal domains look like nad uses the model to identify domains that may have been randomly generated by an algorithm. You can modify the triThreshold and dgaLengthThreshold query parameters to change Analytic Rule sensitivity. The higher the numbers, the less noisy the rule is. This analytic rule uses ASIM and supports any built-in or custom source that supports the ASIM WebSession schema (ASIM WebSession Schema)'
MITRE ATT&CK coverage
| Tactic | Techniques |
|---|---|
| Command & Control | T1568 Dynamic Resolution |
Rule body kusto
id: 9176b18f-a946-42c6-a2f6-0f6d17cd6a8a
name: Potential communication with a Domain Generation Algorithm (DGA) based hostname (ASIM Web Session schema)
description: |
'This rule identifies communication with hosts that have a domain name that might have been generated by a Domain Generation Algorithm (DGA).
DGAs are used by malware to generate rendezvous points that are difficult to predict in advance. This detection uses the top 1 million domain names to build a model of what normal domains look like nad uses the model to identify domains that may have been randomly generated by an algorithm. You can modify the triThreshold and dgaLengthThreshold query parameters to change Analytic Rule sensitivity. The higher the numbers, the less noisy the rule is.
This analytic rule uses [ASIM](https://aka.ms/AboutASIM) and supports any built-in or custom source that supports the ASIM WebSession schema (ASIM WebSession Schema)'
severity: Medium
requiredDataConnectors:
- connectorId: SquidProxy
dataTypes:
- SquidProxy_CL
- connectorId: Zscaler
dataTypes:
- CommonSecurityLog
queryFrequency: 6h
queryPeriod: 6h
triggerOperator: gt
triggerThreshold: 0
tactics:
- CommandAndControl
tags:
- ParentAlert: https://github.com/Azure/Azure-Sentinel/blob/master/Detections/CommonSecurityLog/MultiVendor-PossibleDGAContacts.yaml
version: 1.0.0
- Schema: ASIMWebSession
SchemaVersion: 0.2.0
relevantTechniques:
- T1568
query: |
let triThreshold = 500;
let querystarttime = 6h;
let dgaLengthThreshold = 8;
// fetch the cisco umbrella top 1M domains
let top1M = (externaldata (Position:int, Domain:string) [@"http://s3-us-west-1.amazonaws.com/umbrella-static/top-1m.csv.zip"] with (format="csv", zipPattern="*.csv"));
// extract tri grams that are above our threshold - i.e. are common
let triBaseline = top1M
| extend Domain = tolower(extract("([^.]*).{0,7}$", 1, Domain))
| extend AllTriGrams = array_concat(extract_all("(...)", Domain), extract_all("(...)", substring(Domain, 1)), extract_all("(...)", substring(Domain, 2)))
| mvexpand Trigram=AllTriGrams to typeof(string)
| summarize triCount=count() by Trigram
| sort by triCount desc
| where triCount > triThreshold
| distinct Trigram;
// collect domain information from common security log, filter and extract the DGA candidate and its trigrams
let allDataSummarized = _Im_WebSession
| where isnotempty(Url)
| extend Name = tolower(tostring(parse_url(Url)["Host"]))
| summarize NameCount=count() by Name
| where Name has "."
| where Name !endswith ".home" and Name !endswith ".lan"
// extract DGA candidate
| extend DGADomain = extract("([^.]*).{0,7}$", 1, Name)
| where strlen(DGADomain) > dgaLengthThreshold
// throw out domains with number in them
| where DGADomain matches regex "^[A-Za-z]{0,}$"
// extract the tri grams from summarized data
| extend AllTriGrams = array_concat(extract_all("(...)", DGADomain), extract_all("(...)", substring(DGADomain, 1)), extract_all("(...)", substring(DGADomain, 2)));
// throw out domains that have repeating tri's and/or >=3 repeating letters
let nonRepeatingTris = allDataSummarized
| join kind=leftanti
(
allDataSummarized
| mvexpand AllTriGrams
| summarize count() by tostring(AllTriGrams), DGADomain
| where count_ > 1
| distinct DGADomain
)
on DGADomain;
// find domains that do not have a common tri in the baseline
let dataWithRareTris = nonRepeatingTris
| join kind=leftanti
(
nonRepeatingTris
| mvexpand AllTriGrams
| extend Trigram = tostring(AllTriGrams)
| distinct Trigram, DGADomain
| join kind=inner
(
triBaseline
)
on Trigram
| distinct DGADomain
)
on DGADomain;
dataWithRareTris
// join DGAs back on connection data
| join kind=inner
(
_Im_WebSession
| where isnotempty(Url)
| extend Url = tolower(Url)
| summarize arg_max(TimeGenerated, EventVendor, SrcIpAddr) by Url
| extend Name=tostring(parse_url(Url)["Host"])
| summarize StartTime=min(TimeGenerated), EndTime=max(TimeGenerated) by Name, SrcIpAddr, Url
)
on Name
| project StartTime, EndTime, Name, DGADomain, SrcIpAddr, Url, NameCount
entityMappings:
- entityType: IP
fieldMappings:
- identifier: Address
columnName: SrcIpAddr
- entityType: URL
fieldMappings:
- identifier: Url
columnName: Url
alertDetailsOverride:
alertDisplayNameFormat: Potential communication from {{SrcIpAddr}} with a Domain Generation Algorithm (DGA) based host {{Name}}
alertDescriptionFormat: A client with address {{SrcIpAddr}} communicated with host {{Name}} that have a domain name that might have been generated by a Domain Generation Algorithm (DGA), identified by the pattern {{DGADomain}}. DGAs are used by malware to generate rendezvous points that are difficult to predict in advance. This detection uses the top 1 million domain names to build a model of what normal domains look like and uses the model to identify domains that may have been randomly generated by an algorithm.
customDetails:
DGAPattern: DGADomain
NameCount: NameCount
version: 1.1.5
kind: Scheduled
metadata:
source:
kind: Community
author:
name: Yaron
support:
tier: Community
categories:
domains: [ "Security - Threat Protection" ]
Stages and Predicates
Parameters
let triThreshold = 500;
let querystarttime = 6h;
let dgaLengthThreshold = 8;
Let binding: top1M
let top1M = (externaldata (Position:int, Domain:string) [@"http://s3-us-west-1.amazonaws.com/umbrella-static/top-1m.csv.zip"] with (format="csv", zipPattern="*.csv"));
Let binding: triBaseline
let triBaseline = top1M
| extend Domain = tolower(extract("([^.]*).{0,7}$", 1, Domain))
| extend AllTriGrams = array_concat(extract_all("(...)", Domain), extract_all("(...)", substring(Domain, 1)), extract_all("(...)", substring(Domain, 2)))
| mvexpand Trigram=AllTriGrams to typeof(string)
| summarize triCount=count() by Trigram
| sort by triCount desc
| where triCount > triThreshold
| distinct Trigram;
Derived from triThreshold, top1M.
The stages below define let dataWithRareTris (the rule's main pipeline source).
Stage 1: source
_Im_WebSession
Stage 2: where
| where isnotempty(Url)
Stage 3: extend
| extend Name = tolower(tostring(parse_url(Url)["Host"]))
Stage 4: summarize
| summarize NameCount=count() by Name
Stage 5: where
| where Name has "."
Stage 6: where
| where Name !endswith ".home" and Name !endswith ".lan"
Stage 7: extend
| extend DGADomain = extract("([^.]*).{0,7}$", 1, Name)
Stage 8: where
| where strlen(DGADomain) > dgaLengthThreshold
Stage 9: where
| where DGADomain matches regex "^[A-Za-z]{0,}$"
Stage 10: extend
| extend AllTriGrams = array_concat(extract_all("(...)", DGADomain), extract_all("(...)", substring(DGADomain, 1)), extract_all("(...)", substring(DGADomain, 2)))
Stage 11: join (negated)
| join kind=leftanti
(
allDataSummarized
| mvexpand AllTriGrams
| summarize count() by tostring(AllTriGrams), DGADomain
| where count_ > 1
| distinct DGADomain
)
on DGADomain
Stage 12: join (negated)
| join kind=leftanti
(
nonRepeatingTris
| mvexpand AllTriGrams
| extend Trigram = tostring(AllTriGrams)
| distinct Trigram, DGADomain
| join kind=inner
(
triBaseline
)
on Trigram
| distinct DGADomain
)
on DGADomain
The stages below run on dataWithRareTris (the outer pipeline).
Stage 13: join
dataWithRareTris
| join kind=inner
(
_Im_WebSession
| where isnotempty(Url)
| extend Url = tolower(Url)
| summarize arg_max(TimeGenerated, EventVendor, SrcIpAddr) by Url
| extend Name=tostring(parse_url(Url)["Host"])
| summarize StartTime=min(TimeGenerated), EndTime=max(TimeGenerated) by Name, SrcIpAddr, Url
)
on Name
Stage 14: project
| project StartTime, EndTime, Name, DGADomain, SrcIpAddr, Url, NameCount
Exclusions
Top-level NOT(...) conjuncts: predicates this rule actively suppresses.
| Field | Kind | Excluded values |
|---|---|---|
Name | ends_with | .home |
Name | ends_with | .lan |
DGADomain | gt | 8 |
DGADomain | regex_match | ^[A-Za-z]{0,}$ |
Name | match | . |
Url | is_not_null | |
count_ | gt | 1 |
DGADomain | gt | 8 |
DGADomain | regex_match | ^[A-Za-z]{0,}$ |
Name | match | . |
Url | is_not_null | |
count_ | gt | 1 |
triCount | gt | 500 |
Indicators
Each row is a field, operator, and value that the rule matches. The corpus column counts how many other rules in the catalog look for the same combination: high numbers point to widely-used, community-vetted indicators. Blank or 1 shows that the indicator is specific to this rule.
Output fields
Fields the rule emits when it matches. Chronicle authors list these in the outcome block; they appear on the detection and $risk_score drives alerting. Sentinel / Defender XDR rules build them up through project / summarize / extend stages. Sentinel maps these into alert fields via entityMappings and customDetails; Defender XDR custom detections surface them as alert fields directly.
| Field | Source |
|---|---|
DGADomain | project |
EndTime | project |
Name | project |
NameCount | project |
SrcIpAddr | project |
StartTime | project |
Url | project |