Blog from May, 2016

In part 4, you learned how we can attach threat intelligence indicators to the messages that are passing through the enrichment Storm topology.  The problem, however, is that not all threat intelligence indicators are made equal.  Some require immediate response, whereas others can be dealt with or investigated as time and availability permits.  What we need is the ability to triage and rank threats by severity.

Now that we know what we should do, the next question is how to accomplish it; in other words, we must define what exactly we mean when we say "severity."  The capability as implemented in Metron is accomplished by providing the ability to associate possibly complex conditions to numeric scores.  Then, for each message, the set of conditions are evaluated and the set of numbers for matching conditions are aggregated via a configurable aggregation function.  This aggregated score is added to the message in the threat.triage.level.  Let's dig a bit deeper into this and provide an example.

Stellar Language

The heart of the problem is how one defines a "condition."  In Metron, we provide a custom domain specific language named "Stellar" for defining conditions.  The documentation can be found here - https://github.com/apache/incubator-metron/blob/master/metron-platform/metron-common/README.md

Consider, for example, the following JSON message:

{
    ...
  "src_ip_addr" : "192.168.0.1"
 ,"is_local" : true
    ...
}

Consider the query:

IN_SUBNET( src_ip_addr, '192.168.0.0/24') or src_ip_addr in [ '10.0.0.1', '10.0.0.2' ] or exists(is_local)

This evaluates to true precisely when one of the following is true for a message:

  • The value of the src_ip_addr field is in the 192.168.0.0/24 subnet
  • The value of the src_ip_addr field is 10.0.0.1 or 10.0.0.2
  • The field is_local exists

Threat Triage Configuration

Now that we have the ability to define conditions, for each sensor we need to associate these conditions to scores.  Since this is a per-sensor configuration, this fits nicely within the sensor enrichment configuration held in zookeeper.  This configuration fits well within the threatIntel section of the configuration like so:

{
  ...,
  "threatIntel" : {
            ...,
           "triageConfig" : {
                     "riskLevelRules" : [
                               {
                                 "name" : "rule1",
                                 "comment" : "comment1",
                                 "rule" : "<...",
                                 "score" : 5,
                                 "reason" : "some reason"
                                },
                               {
                                 "name" : "rule2",
                                 "comment" : "comment2",
                                 "rule" : "<...",
                                 "score" : 10,
                                 "reason" : "some reason"
                                },
                                ...
                     ],
                     "aggregator" : "MAX"
           }
  }
}

 riskLevelRules correspond to the set of condition to numeric level mappings that define the threat triage for this particular sensor. aggregator is an aggregation function that takes all non-zero scores representing the matching queries from riskLevelRules and aggregates them into a single score.  The current supported aggregation functions are

  • MAX : The max of all of the associated values for matching queries
  • MIN : The min of all of the associated values for matching queries
  • MEAN : The mean of all of the associated values for matching queries
  • POSITIVE_MEAN : The mean of the positive associated values for the matching queries.

Example

So, where we left off in part 4 was a working threat intelligence enrichment.  Now, let's see if we can triage those threats for the squid data flowing through.  In particular, let's triage the threat alerts for the squid sensor data higher under the following conditions:

  • If the threat intel enrichment type zeusList as defined in part 4 is alerted, then we want to consider that an alert of score of 5
  • If the url is neither a .com nor a .net, then we want to consider that alert a score of 10

For each message we will assign the maximum score across all conditions as the triage score.  This translates into the following configuration:

{
  ...
  ,"threatIntel" : {
            ...,
           "triageConfig" : {
               "riskLevelRules" : [
{
                     "name" : "in_zeus",
                     "comment" : "Checks if this domain without subdomains matches against the zeus threat intel list",
                     "reason" : "FORMAT('%s exists in the Zeus threat intel list', domain_without_subdomains)",
                     "rule" : "exists(threatintels.hbaseThreatIntel.domain_without_subdomains.zeusList)",
                     "score" : 5
                    },
                    {
                     "name" : "tld_check",
                     "comment" : "Applies a risk score based on the domain TLD",
                     "reason" : "FORMAT('%s does not end with com or net', domain_without_subdomains)",
                     "rule" : "not(ENDS_WITH(domain_without_subdomains, '.com') or ENDS_WITH(domain_without_subdomains, '.net'))",
                     "score" : 10
                    }
               ],
               "aggregator" : "MAX"
            }
      }
}

In order to apply this triage configuration, we must modify the configuration for the squid sensor in the enrichment topology.  To do this, we should modify ${METRON_HOME}/config/zookeeper/sensors/squid.json on node1.  However, since the configuration in zookeeper may have be out of sync with the configuration on disk, we must make sure they are in sync by executing the following command:

$METRON_HOME/bin/zk_load_configs.sh -m PULL -z $ZOOKEEPER -f -o $METRON_HOME/config/zookeeper

 We should ensure that the configuration for squid exists by checking out

cat $METRON_HOME/config/zookeeper/enrichments/squid.json

Now we can edit the configuration.  In $METRON_HOME/config/zookeeper/enrichments/squid.json edit the section titled riskLevelRules and add the two rules above to the array. Also, ensure that the aggregator field indicates MAX.

After modifying the configuration, we can push the configuration back to zookeeper and have the enrichment topology pick it up with live data via

$METRON_HOME/bin/zk_load_configs.sh -m PUSH -z $ZOOKEEPER -i $METRON_HOME/config/zookeeper

Now, if we reload the data from the part 4 via

tail /var/log/squid/access.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic squid

Now, if we check the squid index using the elasticsearch head plugin, we can see the threats triage as we would expect:

Non-Threat Data

For URL's from cnn.com, we expect to see no threat alert, so no triage level is set.  Run cnn.com with the Squid client and pipe it into Kafka

squidclient http://www.cnn.com

tail /var/log/squid/access.log -n 1 | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic squid

Notice the lack of a threat.triage.level field.

{
"action": "TCP_MISS",
"adapter.simplehbaseadapter.begin.ts": "1492109939268",
"adapter.simplehbaseadapter.end.ts": "1492109939280",
"adapter.threatinteladapter.begin.ts": "1492109939285",
"adapter.threatinteladapter.end.ts": "1492109939289",
"bytes": 128477,
"code": 200,
"domain_without_subdomains": "cnn.com",
"elapsed": 25,
"enrichmentjoinbolt.joiner.ts": "1492109939282",
"enrichments.hbaseEnrichment.domain_without_subdomains.whois.domain": "cnn.com",
"enrichments.hbaseEnrichment.domain_without_subdomains.whois.domain_created_timestamp": "748695600000",
"enrichments.hbaseEnrichment.domain_without_subdomains.whois.home_country": "US",
"enrichments.hbaseEnrichment.domain_without_subdomains.whois.owner": "Turner Broadcasting System, Inc.",
"enrichments.hbaseEnrichment.domain_without_subdomains.whois.registrar": "Domain Name Manager",
"enrichmentsplitterbolt.splitter.begin.ts": "1492109939265",
"enrichmentsplitterbolt.splitter.end.ts": "1492109939265",
"full_hostname": "www.cnn.com",
"guid": "bdf0d0de-3f6d-4479-848b-1c56e06050de",
"ip_dst_addr": "151.101.41.67",
"ip_src_addr": "::1",
"method": "GET",
"original_string": "1492109922.444 25 ::1 TCP_MISS/200 128477 GET http://www.cnn.com/ - DIRECT/151.101.41.67 text/html",
"source.type": "squid",
"threatinteljoinbolt.joiner.ts": "1492109939291",
"threatintelsplitterbolt.splitter.begin.ts": "1492109939283",
"threatintelsplitterbolt.splitter.end.ts": "1492109939283",
"timestamp": 1492109922444,
"url": "http://www.cnn.com/"
}

Threat Data from alamman.com has a triage level of 5

Because webtahmin.com is a malicious host from the zeusList threat intel feed but is a .com address, it's assigned threat.triage.level of 5.

{
"action": "TCP_MISS",
"adapter.simplehbaseadapter.begin.ts": "1492109261268",
"adapter.simplehbaseadapter.end.ts": "1492109261273",
"adapter.threatinteladapter.begin.ts": "1492109261279",
"adapter.threatinteladapter.end.ts": "1492109261287",
"bytes": 69540,
"code": 200,
"domain_without_subdomains": "webtahmin.com",
"elapsed": 4288,
"enrichmentjoinbolt.joiner.ts": "1492109261274",
"enrichmentsplitterbolt.splitter.begin.ts": "1492109261266",
"enrichmentsplitterbolt.splitter.end.ts": "1492109261266",
"full_hostname": "webtahmin.com",
"guid": "cfb72fe1-376a-4850-b2b2-acd36a1f7bf7",
"ip_dst_addr": "185.59.28.14",
"ip_src_addr": "::1",
"is_alert": "true",
"method": "GET",
"original_string": "1492109249.738 4288 ::1 TCP_MISS/200 69540 GET http://webtahmin.com/ - DIRECT/185.59.28.14 text/html",
"source.type": "squid",
"threat.triage.rules.0.comment": "Checks if this domain without subdomains matches against the zeus threat intel list",
"threat.triage.rules.0.name": "in_zeus",
"threat.triage.rules.0.reason": "webtahmin.com exists in the Zeus threat intel list",
"threat.triage.rules.0.score": 5,
"threat.triage.score": 5.0,
"threatinteljoinbolt.joiner.ts": "1492109261293",
"threatintels.hbaseThreatIntel.domain_without_subdomains.zeusList": "alert",
"threatintelsplitterbolt.splitter.begin.ts": "1492109261276",
"threatintelsplitterbolt.splitter.end.ts": "1492109261276",
"timestamp": 1492109249738,
"url": "http://webtahmin.com/"
}

Threat Data from atmape.ru has a triage level of 10

Because atmape.ru is both a malicious host from the zeusList threat intel feed as well as a non .com and non .net address, it's assigned threat.triage.level of 10.

{
"action": "TCP_MEM_HIT",
"adapter.simplehbaseadapter.begin.ts": "1492108679325",
"adapter.simplehbaseadapter.end.ts": "1492108679329",
"adapter.threatinteladapter.begin.ts": "1492108679336",
"adapter.threatinteladapter.end.ts": "1492108679347",
"bytes": 3654,
"code": 200,
"domain_without_subdomains": "atmape.ru",
"elapsed": 0,
"enrichmentjoinbolt.joiner.ts": "1492108679331",
"enrichmentsplitterbolt.splitter.begin.ts": "1492108679324",
"enrichmentsplitterbolt.splitter.end.ts": "1492108679324",
"full_hostname": "www.atmape.ru",
"guid": "524aea4d-f04e-42f4-b5cf-33c8a8e1ae3b",
"ip_src_addr": "::1",
"is_alert": "true",
"method": "GET",
"original_string": "1492108654.717 0 ::1 TCP_MEM_HIT/200 3654 GET http://www.atmape.ru/ - NONE/- text/html",
"source.type": "squid",
"threat.triage.rules.0.comment": "Checks if this domain without subdomains matches against the zeus threat intel list",
"threat.triage.rules.0.name": "in_zeus",
"threat.triage.rules.0.reason": "atmape.ru exists in the Zeus threat intel list",
"threat.triage.rules.0.score": 5,
"threat.triage.rules.1.comment": "Applies a risk score based on the domain TLD",
"threat.triage.rules.1.name": "tld_check",
"threat.triage.rules.1.reason": "atmape.ru does not end with com or net",
"threat.triage.rules.1.score": 10,
"threat.triage.score": 10.0,
"threatinteljoinbolt.joiner.ts": "1492108679349",
"threatintels.hbaseThreatIntel.domain_without_subdomains.zeusList": "alert",
"threatintelsplitterbolt.splitter.begin.ts": "1492108679334",
"threatintelsplitterbolt.splitter.end.ts": "1492108679334",
"timestamp": 1492108654717,
"url": "http://www.atmape.ru/"
}

s{Wwwwww


We{hreatintels.hbaseT{hreatIntel.url.zeusList

Now that we know how to add telemetries and enrichments, as well as how to setup a test framework and troubleshoot them, let's move on to the last step of this blog series and talk about adding threat intelligence.  Metron is designed to work with Stix/Taxii threat feeds, but can also be bulk loaded with threat data from a CSV file.  In this example we will explore the CSV example.  The same loader framework that is used for enrichment here is used for threat intelligence.  Similarly to enrichments we need to setup a data.csv file, the extractor config JSON and the enrichment config JSON.  

For this example we will be using a Zeus malware tracker list located here: https://zeustracker.abuse.ch/blocklist.php?download=domainblocklist 

Update 8/23/19 - The Zeus tracker list was discontinued on July 8, 2019.

For this example we will be using a Squid blacklist malware tracker list located here: https://www.squidblacklist.org/downloads/dg-malicious.acl

curl -o domainblocklist.txt https://www.squidblacklist.org/downloads/dg-malicious.acl

Similarly to enrichment we will need to process this feed into a CSV so we can bulk load it into HBase.  After we process the feed (here is a sample script for doing so):

cat domainblocklist.txt | grep -v "^#" | grep -v "^$" | grep -v "^https" | awk '{print $1",squidblacklist.org"}' > domainblocklist.csv

And produce our domainblocklist.csv that would would look as follows (lets focus on the two specific domains from the list):

....

accounts-google.ru,squidblacklist.org

webtahmin.com,squidblacklist.org

.....

Now that we have the CSV of threat intel extracted we need to define our threat intel configs similarly to how we defined them for enrichment.  

Now let's define our threat intel enrichment config by placing the following in a file named threatintel_config_temp.json. Replace $ZOOKEEPER with your quorum:

{

  "zkQuorum" : "$ZOOKEEPER"

 ,"sensorToFieldList" : {

    "squid" : {

           "type" : "THREAT_INTEL"

          ,"fieldToEnrichmentTypes" : {

             "domain_without_subdomains" : [ "squidBlacklist" ]

          }

    }

  }

}

Again we need to remove non ascii characters we run this:

iconv -c -f utf-8 -t ascii threatintel_config_temp.json -o threatintel_config.json

And now we define the extractor config and place it in a file named threatintel_extractor_config_temp.json:

{

  "config" : {

    "columns" : {

        "domain" : 0

        ,"source" : 1

    }

    ,"indicator_column" : "domain"

    ,"type" : "squidBlacklist"

    ,"separator" : ","

  }

  ,"extractor" : "CSV"

}

And to remove the non-ascii characters we run the following:

iconv -c -f utf-8 -t ascii threatintel_extractor_config_temp.json -o threatintel_extractor_config.json

Now we run the following command to bulk load the threat intel:

${METRON_HOME}/bin/flatfile_loader.sh -n threatintel_config.json -i domainblocklist.csv -t threatintel -c t -e threatintel_extractor_config.json

This command will modify the squid enrichment config in Zookeeper to include the threat intel enrichment as well as import the threat intel data to HBase to a table named "threatintel". There should be around 168k records added.

[root@node1: ~]
# echo "count 'threatintel'" | hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.1.2.2.6.5.1175-1, r897822d4dd5956ca186974c10382e9094683fa29, Thu Jun 20 17:08:24 UTC 2019

count 'threatintel'
Current count: 1000, row: \x01l\xA9M\xB4\x8F]p~\x9E\x9B\x0Ceg\xD5M\x00\x0EsquidBlacklist\x00\x15lp.autocleantools.com
Current count: 2000, row: \x02\xFB\x92\xBEC\x83G\xD7\x853\x02GX\xF9\xD7d\x00\x0EsquidBlacklist\x00\x09kraken.cc
Current count: 3000, row: \x04\x8A*\x83(\xF7P\xBD7Y\x13\xE6\xBD\xBA\xCA\xE4\x00\x0EsquidBlacklist\x00\x085inv.biz

...

Current count: 166000, row: \xFBqYw\x19\xF8>_P9US\xED\xAFW\xF1\x00\x0EsquidBlacklist\x00\x0Brosehill.hu
Current count: 167000, row: \xFC\xF8\xD3\x03\xA7\xCE\x1E\x086Sfd@Sw\x12\x00\x0EsquidBlacklist\x00\x14selfpackshipping.com
Current count: 168000, row: \xFEyE\xD1\x03gG\xF5\xE7T\x9B\xDD\x8F\xE1\xBB\xBB\x00\x0EsquidBlacklist\x00\x11timetodoright.org
168979 row(s) in 13.3170 seconds

168979

You should see a parser config that looks like the following:

[root@node1: ~]
# ${METRON_HOME}/bin/zk_load_configs.sh -m DUMP -z $ZOOKEEPER -c PARSER -n squid

PARSER Config: squid

{

  "parserClassName": "org.apache.metron.parsers.GrokParser",

  "sensorTopic": "squid",

  "parserConfig": {

    "grokPath": "/patterns/squid",

    "patternLabel": "SQUID_DELIMITED",

    "timestampField": "timestamp"

  },

  "fieldTransformations" : [

    {

      "transformation" : "STELLAR"

    ,"output" : [ "full_hostname", "domain_without_subdomains" ]

    ,"config" : {

      "full_hostname" : "URL_TO_HOST(url)"

      ,"domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"

                }

    }

                           ]

}

And an enrichment config that looks like this:

[root@node1: ~]
# ${METRON_HOME}/bin/zk_load_configs.sh -m DUMP -z $ZOOKEEPER -c ENRICHMENT -n squid

ENRICHMENT Config: squid

{

  "enrichment" : {

    "fieldMap" : {

      "hbaseEnrichment" : [ "domain_without_subdomains" ]

    },

    "fieldToTypeMap" : {

      "domain_without_subdomains" : [ "whois" ]

    },

    "config" : { }

  },

  "threatIntel" : {

    "fieldMap" : {

      "hbaseThreatIntel" : [ "domain_without_subdomains" ]

    },

    "fieldToTypeMap" : {

      "domain_without_subdomains" : [ "squidBlacklist" ]

    },

    "config" : { },

    "triageConfig" : {

      "riskLevelRules" : [ ],

      "aggregator" : "MAX",

      "aggregationConfig" : { }

    }

  },

  "configuration" : { }

}

We'll want to maintain a current set of local configs to continue working from, so we'll want to pull them locally. To pull these modifications locally, execute the following:

${METRON_HOME}/bin/zk_load_configs.sh -m PULL -z $ZOOKEEPER -o ${METRON_HOME}/config/zookeeper -f

(Optional) Now let's drop the Elasticserach squid indexes.  

curl -XDELETE "http://${ELASTICSEARCH}:9200/squid*"

After dropping the indexes we re-ingest.  Let's trigger on two of the domains we ingested (note, this list is constantly changing, so verify these domains do in fact exist in the domainblocklist.csv before triggering the squidclient. If either/both are not in the list, choose another domain):

squidclient http://kapriz-podolsk.ru
squidclient http://webtahmin.com

Push the new squid log entries into the squid Kafka topic:

tail -f /var/log/squid/access.log -n 2 | ${HDP_HOME}/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic squid

When the logs are ingested we get messages that has a hit against threat intel:

Notice a couple of characteristics about this message.  It has is_alert=true, which designates it as an alert message.  It also tells us which field received a hit against threat intel (url.zeusList).  Now that we have alerts coming through we need to visualize them in Kibana.  First, we need to setup a pinned query to look for messages where is_alert=true:

And then once we point the alerts table to this pinned query it looks like this: