Blog

Announcing Metron 0.3.0

We are please to announce that Metron 0.3.0 is now out and available for download here: https://dist.apache.org/repos/dist/release/incubator/metron/0.3.0/

 

Upgrade Notes 

Metron 0.3.0 represents a major upgrade in capability and requires an upgrade in the following components:

  • Apache Storm 0.10.0 to Apache Storm 1.0.1
  • Apache Kafka 0.9.0  to Apache Kafka 0.10.0.1

These components have to be upgraded prior to deploying Metron 0.3.0.  After upgrading these components proceed with Metron install as outlined here: Installation

 

This article will introduce Metron's default dashboard that is built upon Kibana 4. It will cover the elements present in the dashboard and how you can extend the dashboard for your own purposes.  This is Part 7 of a multi-part tutorial series covering Apache Metron (incubating).

Metron's Dashboard

Metron's default dashboard is intended to allow you to easily validate the end-to-end functioning of Metron with its default sensor suite. It highlights some of the useful widgets available in Kibana 4, and serves as a starting point for you to build your own customized dashboards.

The first panel in the dashboard highlights the variety of events being consumed by Metron. It shows the total number of events received, the variety of those events, and a histogram showing when the events were received.

The next set of dashboard panels shows how Apache Metron can be used to perform real-time enrichment of telemetry data. All of the IPv4 data received by Metron was cross-referenced against a geo-ip database. These locations were then used to build this set of dashboard widgets.

 

As part of the default sensor suite, YAF is used to generate flow records. These flow records provide significant visibility into which actors are communicating over the target network. A table widget displays the raw details of each flow record. A histogram of the duration of each flow shows that while most flows are relatively short-lived there are a few that are exceptionally longer in this example. Creating an index template that defined this field as numeric was required to generate the histogram.

Snort is a Network Intrusion Detection System (NIDS) that is being used to generate alerts identifying known bad events. Snort relies on a fixed set of rules that act as signatures for identifying abnormal events. Along with displaying the relevant details of each alert, the panel shows that there is only a single unique alert type; a test rule that creates a Snort alert on every network packet. Another table was created to show source/destination pairs that generated the most Snort alerts.

The Bro Network Security Monitor is extracting application-level information from raw network packets. In this example, Bro is extracting HTTP and HTTPS requests being made over the network. The panels highlight the breakdown by request type, the total number of web requests, and raw details from each web request.

Bro is extracting DNS requests and responses being made over the network. Understanding who is making those requests, the frequency, and types can provide a deep understanding of the actors present on the network.

Creating Your Own Dashboard

Now that you understand Metron's default dashboard, let's cover how you might extend this dashboard for your own purposes. We will continue the ongoing example of parsing Squid Proxy logs. The dashboard will be extended to display the Squid log data.

Enhance the Squid Data

The previous tutorials covering Squid produced a limited data set. These consisted of a few basic requests. To make this tutorial more interesting, we are going to need a bit more variety in the sample data.

1. Copy and paste the following set of links to a local file called `links.txt`.

    https://www.amazon.com/Cards-Against-Humanity-LLC-CAHUS/dp/B004S8F7QM/ref=zg_bs_toys-and-games_home_1?pf_rd_p=2140216822&pf_rd_s=center-1&pf_rd_t=2101&pf_rd_i=home&pf_rd_m=ATVPDKIKX0DER&pf_rd_r=2231TS0FE044EZT85PQ4
    https://www.amazon.com/Brain-Game-Cube-Intelligence-Development/dp/B01CRXM1JU/ref=zg_bs_toys-and-games_home_2?pf_rd_p=2140216822&pf_rd_s=center-1&pf_rd_t=2101&pf_rd_i=home&pf_rd_m=ATVPDKIKX0DER&pf_rd_r=MANXEWDTKDH2RD9Y3466
    https://www.amazon.com/Zuru-Balloons-different-colors-Seconds/dp/B00ZPW3U14/ref=zg_bs_toys-and-games_home_3?pf_rd_p=2140216822&pf_rd_s=center-1&pf_rd_t=2101&pf_rd_i=home&pf_rd_m=ATVPDKIKX0DER&pf_rd_r=MANXEWDTKDH2RD9Y3466
    https://www.amazon.com/MAGINOVO-Bluetooth-Headphones-Wireless-Earphones/dp/B01EFKFQL8/ref=zg_bs_electronics_home_1?pf_rd_p=2140225402&pf_rd_s=center-2&pf_rd_t=2101&pf_rd_i=home&pf_rd_m=ATVPDKIKX0DER&pf_rd_r=MANXEWDTKDH2RD9Y3466
    https://www.amazon.com/Amazon-Fire-TV-Stick-Streaming-Media-Player/dp/B00GDQ0RMG/ref=zg_bs_electronics_home_2?pf_rd_p=2140225402&pf_rd_s=center-2&pf_rd_t=2101&pf_rd_i=home&pf_rd_m=ATVPDKIKX0DER&pf_rd_r=MANXEWDTKDH2RD9Y3466
    http://www.walmart.com/ip/All-the-Light-We-Cannot-See/26737727
    http://www.walmart.com/ip/Being-Mortal-Medicine-and-What-Matters-in-the-End/36958209
    http://www.walmart.com/ip/My-Brilliant-Friend-Book-One-Childhood-Adolescence/20527482
    http://www.walmart.com/ip/A-Game-of-Thrones/402949
    http://www.bbc.co.uk/capital/story/20160622-there-are-people-making-millions-from-your-pets-poo
    http://www.bbc.co.uk/earth/story/20160620-can-we-predict-the-time-of-our-death
    http://www.bbc.co.uk/news/uk-england-somerset-36596557

2. Run this command to choose one of the links above at random and make a request for that link through Squid. Leave this command running in a terminal so that a continual feed of data is generated as we work through the remainder of this tutorial.

    while sleep 2; do cat links.txt | shuf -n 1 | xargs -i squidclient -g 4 -v {}; done

3. The previous command is generating log records at `/var/log/squid/access.log`. Run the following command in another terminal to extract this data and publish it to Kafka. Again, leave this command running to generate that continuous feed of data. You will need to have two separate terminal sessions left running.


    tail -F /var/log/squid/access.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKA_BROKER_URL --topic squid

4. Ensure that the parser topology for Squid continues to run based on the steps outlined in the previous tutorials.

Create an Index Template

To work with the Squid data in Kibana, we need to ensure that the data is landing in the search index with the correct data types. This can be achieved by defining an index template.

1. Run the following command to create an index template for Squid.

 curl -XPOST $ES_HOST:$ES_PORT/_template/squid_index -d '
{
"template": "squid_index*",
"mappings": {
"bro_doc": {
"_timestamp": {
"enabled": true
},
"properties": {
"timestamp": {
"type": "date",
"format": "epoch_millis"
},
"source:type": {
"type": "string",
"index": "not_analyzed"
},
"action": {
"type": "string",
"index": "not_analyzed"
},
"bytes": {
"type": "integer"
},
"code": {
"type": "string",
"index": "not_analyzed"
},
"domain_without_subdomains": {
"type": "string",
"index": "not_analyzed"
},
"full_hostname": {
"type": "string",
"index": "not_analyzed"
},
"elapsed": {
"type": "integer"
},
"method": {
"type": "string",
"index": "not_analyzed"
},
"ip_dst_addr": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}'

2. By default, Elasticsearch will attempt to analyze all fields of type string. This means that Elasticsearch will tokenize the string and perform additional processing to enable free-form text search. In many cases, and all cases for the Squid data, we want to treat each of the string fields as enumerations. This is why most fields in the index template are `not_analyzed`.

3. An index template will only apply for indices that are created after the template is created. Delete the existing Squid indices so that new ones can be generated with the index template.


    curl -XDELETE node1:9200/squid*

4. Wait for the Squid index to be re-created. This may take a minute or two based on how fast the Squid data is being consumed in your environment.


curl -XGET node1:9200/squid*

Configure the Squid Index in Kibana

Now that we have a Squid index with all of the right data types, we need to tell Kibana about this index.

Click on the image above to see each of these steps performed.

 

1. Login to your Kibana user interface and then click on 'Settings', then 'Indices'.

2. A text field will prompt for the name of the index. Type `squid*` within the text field. Every hour or day, depending on the specific configuration, a new Squid index will be created. Using this pattern will match against all Squid indices for all time periods.

3. Click outside of that text box and wait for the 'Time-field name' input field to populate. Since there is only one timestamp in the index, this should default to a field called `timestamp`. If this does not happen simply choose the field `timestamp`.

4. Then click the 'Create' button.

Review the Squid Data

Now that Kibana is aware of the new Squid index, let's take a look at the data.

Click on the image above to see each of these steps performed.

 

1. Click on `Discover` and then choose the newly created `squid*` index pattern.

2. By clicking any of the fields on the left menu, you can see a representation of the variety of data for that specific fields.

3. Clicking on a specific record will show each field available in the data.

Save a Squid Search

Let's create a basic data table so that a user can inspect record-level details for Squid.  In Kibana, this is done by creating a 'Saved Search'

 

Click on the image above to see each of these steps performed.

 

1. Click on `Discover` and then choose the newly created `squid*` index pattern.

2. In the 'Fields' panel on the left, choose which fields to include in the saved search.  Click the 'Add' button next to each field.

3. Click on the 'Save' icon near the top-right to save the search.

Visualize the Squid Data

After using the `Discover` panel to better understand the Squid data, let's create a few visualizations.

Click on the image above to see each of these steps performed.

 

1. Click on 'Visualize' in the top level menu.

2. Choose the 'Vertical bar chart' and when prompted to 'Select a search source' choose 'From a new search'. Choose the `squid*` index pattern.

3. Under 'Select bucket types' click the 'X-Axis' and for the 'Aggregation' type choose 'Terms'.

4. Under 'Field' choose the `domain_without_subdomains` field.

5. Click the 'Play' button to refresh the visualization.

6. Near the top-right side of the screen click on the 'Save' icon to save the visualization. Name it something appropriate. This will allow us to use the visualization in a dashboard later.

Customize the Dashboard

Click on the image above to see each of these steps performed.

 

1. Open the Metron Dashboard by clicking on 'Dashboard' in the top-level menu.

2. On the right, click the 'Add' button indicated by a plus sign.

3. Find the visualization that you would like to add.

4. Scroll to the bottom of the dashboard to find the visualization that was added. From here you can resize and move the visualization as needed.

5. Continue enhancing the dashboard by adding the 'Saved Search' that was previously created.

Summary

At this point you should be comfortable customizing a dashboard as you add new sources of telemetry to Metron. This article introduced Metron's default dashboard that is built upon Kibana 4. It covered the elements present in the dashboard and how you can extend the dashboard for your own purposes.

 

As you saw in part 2, we can use HBase to easily enrich data.  In that tutorial, you learned how to load data via a flat CSV file into HBase.  Some data, however, is not static, but rather comes in a constant stream.  For instance, user enrichment sources are often this way.  Capturing login events and associating to source IPs is a good way to associate data coming across Metron with a user, which is a valuable piece of information.

For the purpose of demonstration, let's assume that we are ingesting a CSV file which indicates the username to IP association.  From there, we want to use this mapping from within the enrichment topology.  Because we are defining a streaming source, we will need to create a parser topology to handle the streaming data.

In order to do that, we will need to create a file in ${METRON_HOME}/config/zookeeper/parsers/user.json

{
"parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
,"writerClassName" : "org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter"
,"sensorTopic":"user"
,"parserConfig":
{
"shew.table" : "enrichment"
,"shew.cf" : "t"
,"shew.keyColumns" : "ip"
,"shew.enrichmentType" : "user"
,"columns" : {
"user" : 0
,"ip" : 1
}
}
}

As you can see, we are using a stock CSVParser implemented in Metron and a writer to write out to HBase in the key/value format suitable for use in the enrichment topology.

We configure both the parser and the writer in the parserConfig section and set up the table, column family.  We also specify which columns are to be considered for the key, in our case we want to lookup based on the ip.  Also, we specify what enrichment type we should use in the enrichment topology (see part 2 for more about the enrichment type).  We also can configure the CSVParser to define the structure of the CSV being ingested with the first column being the "user" and the second column being "ip".

This fully defines our input structure and how that data can be used in enrichment.  We can now associate IP addresses with usernames.

We can start this on our cluster by pushing this config to zookeeper and then starting a parser topology by running

${HDP_HOME}/kafka-broker/bin/kafka-topics.sh --create --zookeeper $ZOOKEEPER --replication-factor 1 --partitions 1 --topic user
${METRON_HOME}/bin/zk_load_configs.sh -m PUSH -z $ZOOKEEPER -i ${METRON_HOME}/config/zookeeper
${METRON_HOME}/bin/start_parser_topology.sh -s user -z $ZOOKEEPER


Now for the purpose of demonstration, we can create a simple CSV associating a set of users to IPs and push it to the kafka topic in a file called user.csv

mmiklavcic,192.168.138.158

And we can push data to the kafka topic via

cat user.csv | ${HDP_HOME}/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic user

After a few moments you should see a new enrichment type automatically added to the enrichment_list table

[root@node1: ~]

# echo "scan 'enrichment_list'" | hbase shell

HBase Shell; enter 'help<RETURN>' for list of supported commands.

Type "exit<RETURN>" to leave the HBase Shell

Version 1.1.2.2.6.5.1175-1, r897822d4dd5956ca186974c10382e9094683fa29, Thu Jun 20 17:08:24 UTC 2019


scan 'enrichment_list'

ROW                                         COLUMN+CELL

 user                                       column=t:v, timestamp=1566598361319, value={}

 whois                                      column=t:v, timestamp=1566586822992, value={}

2 row(s) in 0.4410 seconds

From here we have data flowing into the HBase table, but we need to ensure that the enrichment topology can be used to enrich data flowing past.  We can do this by modifying one of the sensors to associate the ip_src_addr with the user enrichment.  For this demo, let's modify bro by editing ${METRON_HOME}/config/zookeeper/enrichments/bro.json like so

{
"enrichment" : {
"fieldMap": {
"geo": ["ip_dst_addr", "ip_src_addr"],
"host": ["host"],
     "stellar" : {
"config" : {
"user" : "ENRICHMENT_GET('user', ip_src_addr, 'enrichment', 't')"
}
}
   }
 },
"threatIntel": {
"fieldMap": {
"hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
},
"fieldToTypeMap": {
"ip_src_addr" : ["malicious_ip"],
"ip_dst_addr" : ["malicious_ip"]
}
}
}

Now we can push this config to zookeeper and have it pick it up after some time

${METRON_HOME}/bin/zk_load_configs.sh -m PUSH -z $ZOOKEEPER -i ${METRON_HOME}/config/zookeeper



In part 4, you learned how we can attach threat intelligence indicators to the messages that are passing through the enrichment Storm topology.  The problem, however, is that not all threat intelligence indicators are made equal.  Some require immediate response, whereas others can be dealt with or investigated as time and availability permits.  What we need is the ability to triage and rank threats by severity.

Now that we know what we should do, the next question is how to accomplish it; in other words, we must define what exactly we mean when we say "severity."  The capability as implemented in Metron is accomplished by providing the ability to associate possibly complex conditions to numeric scores.  Then, for each message, the set of conditions are evaluated and the set of numbers for matching conditions are aggregated via a configurable aggregation function.  This aggregated score is added to the message in the threat.triage.level.  Let's dig a bit deeper into this and provide an example.

Stellar Language

The heart of the problem is how one defines a "condition."  In Metron, we provide a custom domain specific language named "Stellar" for defining conditions.  The documentation can be found here - https://github.com/apache/incubator-metron/blob/master/metron-platform/metron-common/README.md

Consider, for example, the following JSON message:

{
    ...
  "src_ip_addr" : "192.168.0.1"
 ,"is_local" : true
    ...
}

Consider the query:

IN_SUBNET( src_ip_addr, '192.168.0.0/24') or src_ip_addr in [ '10.0.0.1', '10.0.0.2' ] or exists(is_local)

This evaluates to true precisely when one of the following is true for a message:

  • The value of the src_ip_addr field is in the 192.168.0.0/24 subnet
  • The value of the src_ip_addr field is 10.0.0.1 or 10.0.0.2
  • The field is_local exists

Threat Triage Configuration

Now that we have the ability to define conditions, for each sensor we need to associate these conditions to scores.  Since this is a per-sensor configuration, this fits nicely within the sensor enrichment configuration held in zookeeper.  This configuration fits well within the threatIntel section of the configuration like so:

{
  ...,
  "threatIntel" : {
            ...,
           "triageConfig" : {
                     "riskLevelRules" : [
                               {
                                 "name" : "rule1",
                                 "comment" : "comment1",
                                 "rule" : "<...",
                                 "score" : 5,
                                 "reason" : "some reason"
                                },
                               {
                                 "name" : "rule2",
                                 "comment" : "comment2",
                                 "rule" : "<...",
                                 "score" : 10,
                                 "reason" : "some reason"
                                },
                                ...
                     ],
                     "aggregator" : "MAX"
           }
  }
}

 riskLevelRules correspond to the set of condition to numeric level mappings that define the threat triage for this particular sensor. aggregator is an aggregation function that takes all non-zero scores representing the matching queries from riskLevelRules and aggregates them into a single score.  The current supported aggregation functions are

  • MAX : The max of all of the associated values for matching queries
  • MIN : The min of all of the associated values for matching queries
  • MEAN : The mean of all of the associated values for matching queries
  • POSITIVE_MEAN : The mean of the positive associated values for the matching queries.

Example

So, where we left off in part 4 was a working threat intelligence enrichment.  Now, let's see if we can triage those threats for the squid data flowing through.  In particular, let's triage the threat alerts for the squid sensor data higher under the following conditions:

  • If the threat intel enrichment type zeusList as defined in part 4 is alerted, then we want to consider that an alert of score of 5
  • If the url is neither a .com nor a .net, then we want to consider that alert a score of 10

For each message we will assign the maximum score across all conditions as the triage score.  This translates into the following configuration:

{
  ...
  ,"threatIntel" : {
            ...,
           "triageConfig" : {
               "riskLevelRules" : [
{
                     "name" : "in_zeus",
                     "comment" : "Checks if this domain without subdomains matches against the zeus threat intel list",
                     "reason" : "FORMAT('%s exists in the Zeus threat intel list', domain_without_subdomains)",
                     "rule" : "exists(threatintels.hbaseThreatIntel.domain_without_subdomains.zeusList)",
                     "score" : 5
                    },
                    {
                     "name" : "tld_check",
                     "comment" : "Applies a risk score based on the domain TLD",
                     "reason" : "FORMAT('%s does not end with com or net', domain_without_subdomains)",
                     "rule" : "not(ENDS_WITH(domain_without_subdomains, '.com') or ENDS_WITH(domain_without_subdomains, '.net'))",
                     "score" : 10
                    }
               ],
               "aggregator" : "MAX"
            }
      }
}

In order to apply this triage configuration, we must modify the configuration for the squid sensor in the enrichment topology.  To do this, we should modify ${METRON_HOME}/config/zookeeper/sensors/squid.json on node1.  However, since the configuration in zookeeper may have be out of sync with the configuration on disk, we must make sure they are in sync by executing the following command:

$METRON_HOME/bin/zk_load_configs.sh -m PULL -z $ZOOKEEPER -f -o $METRON_HOME/config/zookeeper

 We should ensure that the configuration for squid exists by checking out

cat $METRON_HOME/config/zookeeper/enrichments/squid.json

Now we can edit the configuration.  In $METRON_HOME/config/zookeeper/enrichments/squid.json edit the section titled riskLevelRules and add the two rules above to the array. Also, ensure that the aggregator field indicates MAX.

After modifying the configuration, we can push the configuration back to zookeeper and have the enrichment topology pick it up with live data via

$METRON_HOME/bin/zk_load_configs.sh -m PUSH -z $ZOOKEEPER -i $METRON_HOME/config/zookeeper

Now, if we reload the data from the part 4 via

tail /var/log/squid/access.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic squid

Now, if we check the squid index using the elasticsearch head plugin, we can see the threats triage as we would expect:

Non-Threat Data

For URL's from cnn.com, we expect to see no threat alert, so no triage level is set.  Run cnn.com with the Squid client and pipe it into Kafka

squidclient http://www.cnn.com

tail /var/log/squid/access.log -n 1 | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic squid

Notice the lack of a threat.triage.level field.

{
"action": "TCP_MISS",
"adapter.simplehbaseadapter.begin.ts": "1492109939268",
"adapter.simplehbaseadapter.end.ts": "1492109939280",
"adapter.threatinteladapter.begin.ts": "1492109939285",
"adapter.threatinteladapter.end.ts": "1492109939289",
"bytes": 128477,
"code": 200,
"domain_without_subdomains": "cnn.com",
"elapsed": 25,
"enrichmentjoinbolt.joiner.ts": "1492109939282",
"enrichments.hbaseEnrichment.domain_without_subdomains.whois.domain": "cnn.com",
"enrichments.hbaseEnrichment.domain_without_subdomains.whois.domain_created_timestamp": "748695600000",
"enrichments.hbaseEnrichment.domain_without_subdomains.whois.home_country": "US",
"enrichments.hbaseEnrichment.domain_without_subdomains.whois.owner": "Turner Broadcasting System, Inc.",
"enrichments.hbaseEnrichment.domain_without_subdomains.whois.registrar": "Domain Name Manager",
"enrichmentsplitterbolt.splitter.begin.ts": "1492109939265",
"enrichmentsplitterbolt.splitter.end.ts": "1492109939265",
"full_hostname": "www.cnn.com",
"guid": "bdf0d0de-3f6d-4479-848b-1c56e06050de",
"ip_dst_addr": "151.101.41.67",
"ip_src_addr": "::1",
"method": "GET",
"original_string": "1492109922.444 25 ::1 TCP_MISS/200 128477 GET http://www.cnn.com/ - DIRECT/151.101.41.67 text/html",
"source.type": "squid",
"threatinteljoinbolt.joiner.ts": "1492109939291",
"threatintelsplitterbolt.splitter.begin.ts": "1492109939283",
"threatintelsplitterbolt.splitter.end.ts": "1492109939283",
"timestamp": 1492109922444,
"url": "http://www.cnn.com/"
}

Threat Data from alamman.com has a triage level of 5

Because webtahmin.com is a malicious host from the zeusList threat intel feed but is a .com address, it's assigned threat.triage.level of 5.

{
"action": "TCP_MISS",
"adapter.simplehbaseadapter.begin.ts": "1492109261268",
"adapter.simplehbaseadapter.end.ts": "1492109261273",
"adapter.threatinteladapter.begin.ts": "1492109261279",
"adapter.threatinteladapter.end.ts": "1492109261287",
"bytes": 69540,
"code": 200,
"domain_without_subdomains": "webtahmin.com",
"elapsed": 4288,
"enrichmentjoinbolt.joiner.ts": "1492109261274",
"enrichmentsplitterbolt.splitter.begin.ts": "1492109261266",
"enrichmentsplitterbolt.splitter.end.ts": "1492109261266",
"full_hostname": "webtahmin.com",
"guid": "cfb72fe1-376a-4850-b2b2-acd36a1f7bf7",
"ip_dst_addr": "185.59.28.14",
"ip_src_addr": "::1",
"is_alert": "true",
"method": "GET",
"original_string": "1492109249.738 4288 ::1 TCP_MISS/200 69540 GET http://webtahmin.com/ - DIRECT/185.59.28.14 text/html",
"source.type": "squid",
"threat.triage.rules.0.comment": "Checks if this domain without subdomains matches against the zeus threat intel list",
"threat.triage.rules.0.name": "in_zeus",
"threat.triage.rules.0.reason": "webtahmin.com exists in the Zeus threat intel list",
"threat.triage.rules.0.score": 5,
"threat.triage.score": 5.0,
"threatinteljoinbolt.joiner.ts": "1492109261293",
"threatintels.hbaseThreatIntel.domain_without_subdomains.zeusList": "alert",
"threatintelsplitterbolt.splitter.begin.ts": "1492109261276",
"threatintelsplitterbolt.splitter.end.ts": "1492109261276",
"timestamp": 1492109249738,
"url": "http://webtahmin.com/"
}

Threat Data from atmape.ru has a triage level of 10

Because atmape.ru is both a malicious host from the zeusList threat intel feed as well as a non .com and non .net address, it's assigned threat.triage.level of 10.

{
"action": "TCP_MEM_HIT",
"adapter.simplehbaseadapter.begin.ts": "1492108679325",
"adapter.simplehbaseadapter.end.ts": "1492108679329",
"adapter.threatinteladapter.begin.ts": "1492108679336",
"adapter.threatinteladapter.end.ts": "1492108679347",
"bytes": 3654,
"code": 200,
"domain_without_subdomains": "atmape.ru",
"elapsed": 0,
"enrichmentjoinbolt.joiner.ts": "1492108679331",
"enrichmentsplitterbolt.splitter.begin.ts": "1492108679324",
"enrichmentsplitterbolt.splitter.end.ts": "1492108679324",
"full_hostname": "www.atmape.ru",
"guid": "524aea4d-f04e-42f4-b5cf-33c8a8e1ae3b",
"ip_src_addr": "::1",
"is_alert": "true",
"method": "GET",
"original_string": "1492108654.717 0 ::1 TCP_MEM_HIT/200 3654 GET http://www.atmape.ru/ - NONE/- text/html",
"source.type": "squid",
"threat.triage.rules.0.comment": "Checks if this domain without subdomains matches against the zeus threat intel list",
"threat.triage.rules.0.name": "in_zeus",
"threat.triage.rules.0.reason": "atmape.ru exists in the Zeus threat intel list",
"threat.triage.rules.0.score": 5,
"threat.triage.rules.1.comment": "Applies a risk score based on the domain TLD",
"threat.triage.rules.1.name": "tld_check",
"threat.triage.rules.1.reason": "atmape.ru does not end with com or net",
"threat.triage.rules.1.score": 10,
"threat.triage.score": 10.0,
"threatinteljoinbolt.joiner.ts": "1492108679349",
"threatintels.hbaseThreatIntel.domain_without_subdomains.zeusList": "alert",
"threatintelsplitterbolt.splitter.begin.ts": "1492108679334",
"threatintelsplitterbolt.splitter.end.ts": "1492108679334",
"timestamp": 1492108654717,
"url": "http://www.atmape.ru/"
}

s{Wwwwww


We{hreatintels.hbaseT{hreatIntel.url.zeusList

Now that we know how to add telemetries and enrichments, as well as how to setup a test framework and troubleshoot them, let's move on to the last step of this blog series and talk about adding threat intelligence.  Metron is designed to work with Stix/Taxii threat feeds, but can also be bulk loaded with threat data from a CSV file.  In this example we will explore the CSV example.  The same loader framework that is used for enrichment here is used for threat intelligence.  Similarly to enrichments we need to setup a data.csv file, the extractor config JSON and the enrichment config JSON.  

For this example we will be using a Zeus malware tracker list located here: https://zeustracker.abuse.ch/blocklist.php?download=domainblocklist 

Update 8/23/19 - The Zeus tracker list was discontinued on July 8, 2019.

For this example we will be using a Squid blacklist malware tracker list located here: https://www.squidblacklist.org/downloads/dg-malicious.acl

curl -o domainblocklist.txt https://www.squidblacklist.org/downloads/dg-malicious.acl

Similarly to enrichment we will need to process this feed into a CSV so we can bulk load it into HBase.  After we process the feed (here is a sample script for doing so):

cat domainblocklist.txt | grep -v "^#" | grep -v "^$" | grep -v "^https" | awk '{print $1",squidblacklist.org"}' > domainblocklist.csv

And produce our domainblocklist.csv that would would look as follows (lets focus on the two specific domains from the list):

....

accounts-google.ru,squidblacklist.org

webtahmin.com,squidblacklist.org

.....

Now that we have the CSV of threat intel extracted we need to define our threat intel configs similarly to how we defined them for enrichment.  

Now let's define our threat intel enrichment config by placing the following in a file named threatintel_config_temp.json. Replace $ZOOKEEPER with your quorum:

{

  "zkQuorum" : "$ZOOKEEPER"

 ,"sensorToFieldList" : {

    "squid" : {

           "type" : "THREAT_INTEL"

          ,"fieldToEnrichmentTypes" : {

             "domain_without_subdomains" : [ "squidBlacklist" ]

          }

    }

  }

}

Again we need to remove non ascii characters we run this:

iconv -c -f utf-8 -t ascii threatintel_config_temp.json -o threatintel_config.json

And now we define the extractor config and place it in a file named threatintel_extractor_config_temp.json:

{

  "config" : {

    "columns" : {

        "domain" : 0

        ,"source" : 1

    }

    ,"indicator_column" : "domain"

    ,"type" : "squidBlacklist"

    ,"separator" : ","

  }

  ,"extractor" : "CSV"

}

And to remove the non-ascii characters we run the following:

iconv -c -f utf-8 -t ascii threatintel_extractor_config_temp.json -o threatintel_extractor_config.json

Now we run the following command to bulk load the threat intel:

${METRON_HOME}/bin/flatfile_loader.sh -n threatintel_config.json -i domainblocklist.csv -t threatintel -c t -e threatintel_extractor_config.json

This command will modify the squid enrichment config in Zookeeper to include the threat intel enrichment as well as import the threat intel data to HBase to a table named "threatintel". There should be around 168k records added.

[root@node1: ~]
# echo "count 'threatintel'" | hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.1.2.2.6.5.1175-1, r897822d4dd5956ca186974c10382e9094683fa29, Thu Jun 20 17:08:24 UTC 2019

count 'threatintel'
Current count: 1000, row: \x01l\xA9M\xB4\x8F]p~\x9E\x9B\x0Ceg\xD5M\x00\x0EsquidBlacklist\x00\x15lp.autocleantools.com
Current count: 2000, row: \x02\xFB\x92\xBEC\x83G\xD7\x853\x02GX\xF9\xD7d\x00\x0EsquidBlacklist\x00\x09kraken.cc
Current count: 3000, row: \x04\x8A*\x83(\xF7P\xBD7Y\x13\xE6\xBD\xBA\xCA\xE4\x00\x0EsquidBlacklist\x00\x085inv.biz

...

Current count: 166000, row: \xFBqYw\x19\xF8>_P9US\xED\xAFW\xF1\x00\x0EsquidBlacklist\x00\x0Brosehill.hu
Current count: 167000, row: \xFC\xF8\xD3\x03\xA7\xCE\x1E\x086Sfd@Sw\x12\x00\x0EsquidBlacklist\x00\x14selfpackshipping.com
Current count: 168000, row: \xFEyE\xD1\x03gG\xF5\xE7T\x9B\xDD\x8F\xE1\xBB\xBB\x00\x0EsquidBlacklist\x00\x11timetodoright.org
168979 row(s) in 13.3170 seconds

168979

You should see a parser config that looks like the following:

[root@node1: ~]
# ${METRON_HOME}/bin/zk_load_configs.sh -m DUMP -z $ZOOKEEPER -c PARSER -n squid

PARSER Config: squid

{

  "parserClassName": "org.apache.metron.parsers.GrokParser",

  "sensorTopic": "squid",

  "parserConfig": {

    "grokPath": "/patterns/squid",

    "patternLabel": "SQUID_DELIMITED",

    "timestampField": "timestamp"

  },

  "fieldTransformations" : [

    {

      "transformation" : "STELLAR"

    ,"output" : [ "full_hostname", "domain_without_subdomains" ]

    ,"config" : {

      "full_hostname" : "URL_TO_HOST(url)"

      ,"domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"

                }

    }

                           ]

}

And an enrichment config that looks like this:

[root@node1: ~]
# ${METRON_HOME}/bin/zk_load_configs.sh -m DUMP -z $ZOOKEEPER -c ENRICHMENT -n squid

ENRICHMENT Config: squid

{

  "enrichment" : {

    "fieldMap" : {

      "hbaseEnrichment" : [ "domain_without_subdomains" ]

    },

    "fieldToTypeMap" : {

      "domain_without_subdomains" : [ "whois" ]

    },

    "config" : { }

  },

  "threatIntel" : {

    "fieldMap" : {

      "hbaseThreatIntel" : [ "domain_without_subdomains" ]

    },

    "fieldToTypeMap" : {

      "domain_without_subdomains" : [ "squidBlacklist" ]

    },

    "config" : { },

    "triageConfig" : {

      "riskLevelRules" : [ ],

      "aggregator" : "MAX",

      "aggregationConfig" : { }

    }

  },

  "configuration" : { }

}

We'll want to maintain a current set of local configs to continue working from, so we'll want to pull them locally. To pull these modifications locally, execute the following:

${METRON_HOME}/bin/zk_load_configs.sh -m PULL -z $ZOOKEEPER -o ${METRON_HOME}/config/zookeeper -f

(Optional) Now let's drop the Elasticserach squid indexes.  

curl -XDELETE "http://${ELASTICSEARCH}:9200/squid*"

After dropping the indexes we re-ingest.  Let's trigger on two of the domains we ingested (note, this list is constantly changing, so verify these domains do in fact exist in the domainblocklist.csv before triggering the squidclient. If either/both are not in the list, choose another domain):

squidclient http://kapriz-podolsk.ru
squidclient http://webtahmin.com

Push the new squid log entries into the squid Kafka topic:

tail -f /var/log/squid/access.log -n 2 | ${HDP_HOME}/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic squid

When the logs are ingested we get messages that has a hit against threat intel:

Notice a couple of characteristics about this message.  It has is_alert=true, which designates it as an alert message.  It also tells us which field received a hit against threat intel (url.zeusList).  Now that we have alerts coming through we need to visualize them in Kibana.  First, we need to setup a pinned query to look for messages where is_alert=true:

And then once we point the alerts table to this pinned query it looks like this:


Now that we have created a new telemetry we can see how we can add new enrichments to that telemetry.  In this exercise we will be looking at adding a whois enrichment to the Squid telemetry we setup in the previous entry.  Whois data is expensive so we will not be providing it.  Instead I wrote a basic whois scraper (out of context for this exercise) that produces a CSV format for whois data as follows:

google.com, "Google Inc.", "US", "Dns Admin",874306800000
work.net, "", "US", "PERFECT PRIVACY, LLC",788706000000
capitalone.com, "Capital One Services, Inc.", "US", "Domain Manager",795081600000
cisco.com, "Cisco Technology Inc.", "US", "Info Sec",547988400000
cnn.com, "Turner Broadcasting System, Inc.", "US", "Domain Name Manager",748695600000
news.com, "CBS Interactive Inc.", "US", "Domain Admin",833353200000
nba.com, "NBA Media Ventures, LLC", "US", "C/O Domain Administrator",786027600000
espn.com, "ESPN, Inc.", "US", "ESPN, Inc.",781268400000
pravda.com, "Internet Invest, Ltd. dba Imena.ua", "UA", "Whois privacy protection service",806583600000
hortonworks.com, "Hortonworks, Inc.", "US", "Domain Administrator",1303427404000
microsoft.com, "Microsoft Corporation", "US", "Domain Administrator",673156800000
yahoo.com, "Yahoo! Inc.", "US", "Domain Administrator",790416000000
rackspace.com, "Rackspace US, Inc.", "US", "Domain Admin",903092400000
1and1.co.uk, "1 & 1 Internet Ltd","UK", "Domain Admin",943315200000

Please cut and paste this data into a file called "whois_ref.csv" on your virtual machine.

The schema of this enrichment is domain|owner|registeredCountry|registeredTimestamp.  Make sure you don't have an empty newline character as the last line of the CSV file, as that will result in a pull pointer exception. The first thing we need to do is setup the enrichment source.  In order to do this we first need to setup the extractor config as so:

{
  "config" : {
    "columns" : {
        "domain" : 0
        ,"owner" : 1
        ,"home_country" : 2
        ,"registrar": 3
        ,"domain_created_timestamp": 4
    }
    ,"indicator_column" : "domain"
    ,"type" : "whois"
    ,"separator" : ","
  }
  ,"extractor" : "CSV"
}

Please cut and paste this file into a file called "extractor_config_temp.json" on the virtual machine.  Because copying and pasting from this blog will include some non-ascii invisible characters, to strip them out please run 

iconv -c -f utf-8 -t ascii extractor_config_temp.json -o extractor_config.json

 

And another config to load the zookeeper enrichment config. Be sure to replace the $ZOOKEEPER placeholder with your Zookeeper quorum list:

{
"zkQuorum" : "$ZOOKEEPER"
,"sensorToFieldList" : {
"squid" : {
"type" : "ENRICHMENT"
,"fieldToEnrichmentTypes" : {
"domain_without_subdomains" : [ "whois" ]
}
}
}
}

Please cut and paste this file into a file called "enrichment_config_temp.json" on the virtual machine.  Because copying and pasting from this blog will include some non-ascii invisible characters, to strip them out please run 

iconv -c -f utf-8 -t ascii enrichment_config_temp.json -o enrichment_config.json

Which means that the system will map the whois enrichment to the field URL.  Then execute the following command:

${METRON_HOME}/bin/flatfile_loader.sh -n enrichment_config.json -i whois_ref.csv -t enrichment -c t -e extractor_config.json

After this your enrichment data will be loaded in Hbase and a Zookeeper mapping will be established.  The data will be populated into HBase table called enrichment.  To verify that the logs were properly ingested into HBase run the following command

echo "scan 'enrichment'" | hbase shell

Note, you should also see a separate HBase table, enrichment_list, automatically populated with a single new enrichment type named "whois."

[root@node1(127.0.0.1 192.168.66.121): ~]

# echo "scan 'enrichment_list'" | hbase shell

HBase Shell; enter 'help<RETURN>' for list of supported commands.

Type "exit<RETURN>" to leave the HBase Shell

Version 1.1.2.2.6.5.1175-1, r897822d4dd5956ca186974c10382e9094683fa29, Thu Jun 20 17:08:24 UTC 2019


scan 'enrichment_list'

ROW                                         COLUMN+CELL

 whois                                      column=t:v, timestamp=1566586822992, value={}

1 row(s) in 0.4950 seconds

You should see the table bulk loaded with data from the CSV file.  Now check if Zookeeper enrichment tag was properly populated:

${METRON_HOME}/bin/zk_load_configs.sh -m DUMP -z $ZOOKEEPER -c ENRICHMENT -n squid

This spits out configs to standard out. We provided a sensor name arg, so you should see one named "squid."

If you want to start with a fresh index for squid, you can delete the existing index by doing the following:

curl -XDELETE "http://node1:9200/squid*"

Re-ingest the data (see previous blog post for more detail)

cat /var/log/squid/access.log | ${HDP_HOME}/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic squid

and the new messages should be automatically enriched. Using the ES Head browser plugin, the new message should look as follows:



Notice the enrichments here (whois.owner, whois.domain_created_timestamp, whois.registrar, whois.home_country) 




In this blog post we will walk through what it takes to setup a new telemetry source in Metron.  For this example we will setup a new sensor, capture the sensor logs, pipe the logs to Kafka, pick up the logs with a Metron parsing topology, parse them, and run them through the Metron stream processing pipeline. 

Our example sensor will be a Squid Proxy.  Squid is a caching proxy for the Web supporting HTTP, HTTPS, FTP, and more.  Squid logs are simple to explain and easy to parse and the velocity of traffic coming from Squid is representative of a a typical network-based sensor.  Hence, we feel it's a good telemetry to use for this tutorial.

Step 1: Download the Metron source code

If you are not running Metron from the USB stick you need to download and build the code.   Please see here for full Metron installation and validation instructions.  Verify that the project has been built before creating the VM.  First lets get Metron from Apache.

git clone https://github.com/apache/metron.git

git tag -l

Now you will see a list of Metron releases.  You will see major releases, minor releases, and release candidates.  Refer to the Metron website with regards to which is the current stable release recommended for downloading.  Once you select the Metron release run the following command to download it:

cd metron

git checkout tags/[MetronReleaseVersion]

Step 2: Build the Metron dev environment

Now that we have downloaded Metron and checked out the desired version, we need to setup our environment. There are a few choices as described here https://github.com/apache/metron/tree/master/metron-deployment. We'll choose Centos 6 for this example.

cd metron/metron-deployment/development/centos6

vagrant up

This will build Metron (without running the tests), package up relevant project artifacts as RPMs, setup and install Ambari to install and manage the single-node Hadoop cluster, and finally install Metron. Once the Vagrant command is finished, you should have a fully-running and self-contained virtual environment with Metron running inside of it.

TASK [deployment-report : debug] ***********************************************

ok: [node1] => {

    "success": [

        "Apache Metron deployed successfully",

        "   Ambari          @ http://node1:8080",

        "   Zookeeper       @ node1:2181",

        "   Kafka           @ node1:6667",

        "For additional information, see https://metron.apache.org/'"

    ]

}


PLAY RECAP *********************************************************************

node1                      : ok=152  changed=64   unreachable=0    failed=0

Step 3 : Installing a sample sensor

Log into the sensors node and install the squid sensor.  If you are on the local FullDev Vagrant development platform your VM will be called node1.  See https://github.com/apache/metron/tree/master/metron-deployment/development/centos6 for example. If you are on AWS environment your sensor node will be tagged with the [sensors] tag.  You can look through the AWS console to find which node in your cluster has this tag.  For the Centos 6 local development environment, login as follows with password (in all lowercase) "vagrant"

ssh root@node1

Once you log into the sensor node you can install the Squid sensor.  

sudo yum install squid

sudo service squid start 

This will run through the install and the Squid sensor will be installed and started.  Now let's look at Squid logs.

sudo su -

cd /var/log/squid

ls 

You see that there are three types of logs available:

  • access.log
  • cache.log
  • squid.out

We are interested in access.log as that is the log that records the proxy usage.  We see that initially the log is empty.  Lets generate a few entries for the log.

squidclient "http://www.aliexpress.com/af/shoes.html?ltype=wholesale&d=y&origin=n&isViewCP=y&catId=0&initiative_id=SB_20160622082445&SearchText=shoes"
squidclient "http://www.help.1and1.co.uk/domains-c40986/transfer-domains-c79878"
squidclient "http://www.pravda.ru/science/"
squidclient "https://www.google.com/maps/place/Waterford,+WI/@42.7639877,-88.2867248,12z/data=!4m5!3m4!1s0x88059e67de9a3861:0x2d24f51aad34c80b!8m2!3d42.7630722!4d-88.2142563"
squidclient "http://www.brightsideofthesun.com/2016/6/25/12027078/anatomy-of-a-deal-phoenix-suns-pick-bender-chriss"
squidclient "https://www.microsoftstore.com/store/msusa/en_US/pdp/Microsoft-Band-2-Charging-Stand/productID.329506400"
squidclient "http://www.autonews.com/article/20151115/RETAIL04/311169971/toyota-fj-cruiser-is-scarce-hot-and-high-priced"
squidclient "https://tfl.gov.uk/plan-a-journey/"
squidclient "https://www.facebook.com/Africa-Bike-Week-1550200608567001/"
squidclient "http://www.ebay.com/itm/02-Infiniti-QX4-Rear-spoiler-Air-deflector-Nissan-Pathfinder-/172240020293?fits=Make%3AInfiniti%7CModel%3AQX4&hash=item281a4e2345:g:iMkAAOSwoBtW4Iwx&vxp=mtr"
squidclient "http://www.recruit.jp/corporate/english/company/index.html"
squidclient "http://www.lada.ru/en/cars/4x4/3dv/about.html"
squidclient "http://www.help.1and1.co.uk/domains-c40986/transfer-domains-c79878"
squidclient "http://www.aliexpress.com/af/shoes.html?ltype=wholesale&d=y&origin=n&isViewCP=y&catId=0&initiative_id=SB_20160622082445&SearchText=shoes"

In production environments you would configure your users web browsers to point to the proxy server, but for the sake of simplicity of this tutorial we will use the client that is packaged with the Squid installation  After we use the client to simulate proxy requests the Squid log entries would look as follows:

1467011157.401    415 127.0.0.1 TCP_MISS/200 337891 GET http://www.aliexpress.com/af/shoes.html? - DIRECT/207.109.73.154 text/html
1467011158.083    671 127.0.0.1 TCP_MISS/200 41846 GET http://www.help.1and1.co.uk/domains-c40986/transfer-domains-c79878 - DIRECT/212.227.34.3 text/html
1467011159.978    1893 127.0.0.1 TCP_MISS/200 153925 GET http://www.pravda.ru/science/ - DIRECT/185.103.135.90 text/html
1467011160.044    58 127.0.0.1 TCP_MISS/302 1471 GET https://www.google.com/maps/place/Waterford,+WI/@42.7639877,-88.2867248,12z/data=cdcd/var/log/squidm5squidclienthttp://www.aliexpress.com/af/shoes.html? - DIRECT/172.217.3.164 text/html
1467011160.145    155 127.0.0.1 TCP_MISS/200 133234 GET http://www.brightsideofthesun.com/2016/6/25/12027078/anatomy-of-a-deal-phoenix-suns-pick-bender-chriss - DIRECT/151.101.41.52 text/html
1467011161.224    1073 127.0.0.1 TCP_MISS/200 141323 GET https://www.microsoftstore.com/store/msusa/en_US/pdp/Microsoft-Band-2-Charging-Stand/productID.329506400 - DIRECT/2.19.142.162 text/html
1467011161.491    262 127.0.0.1 TCP_MISS/302 1955 GET http://www.autonews.com/article/20151115/RETAIL04/311169971/toyota-fj-cruiser-is-scarce-hot-and-high-priced - DIRECT/54.88.37.253 text/html
1467011162.627    1133 127.0.0.1 TCP_MISS/200 88544 GET https://tfl.gov.uk/plan-a-journey/ - DIRECT/54.171.145.187 text/html
1467011163.515    879 127.0.0.1 TCP_MISS/200 461930 GET https://www.facebook.com/Africa-Bike-Week-1550200608567001/ - DIRECT/69.171.230.68 text/html
1467011164.286    749 127.0.0.1 TCP_MISS/200 190407 GET http://www.ebay.com/itm/02-Infiniti-QX4-Rear-spoiler-Air-deflector-Nissan-Pathfinder-/172240020293? - DIRECT/23.74.62.44 text/html
1467011164.447    128 127.0.0.1 TCP_MISS/404 12920 GET http://www.recruit.jp/corporate/english/company/index.html - DIRECT/23.74.66.205 text/html
1467011166.125    1659 127.0.0.1 TCP_MISS/200 69469 GET http://www.lada.ru/en/cars/4x4/3dv/about.html - DIRECT/195.144.198.77 text/html
1467011166.543    401 127.0.0.1 TCP_MISS/200 41846 GET http://www.help.1and1.co.uk/domains-c40986/transfer-domains-c79878 - DIRECT/212.227.34.3 text/html
1467011168.519    445 127.0.0.1 TCP_MISS/200 336155 GET http://www.aliexpress.com/af/shoes.html? - DIRECT/207.109.73.154 text/html

The format of the log is:

timestamp | time elapsed | remotehost | code/status | bytes | method | URL rfc931 peerstatus/peerhost | type

Now that we have the sensor set up and generating logs we need to figure out how to pipe these logs to a Kafka topic.  To do so the first thing we need to do is setup a new Kafka topic for Squid.

Step 4 : Define Environment Variables 

If you are using the quick-dev image your links are:

Ambari: http://node1:8080/

Storm: http://node1:8744/index.html

Now lets setup the following environment variables on node1 to make it easier to navigate and carry over the commands from full-dev to AWS or bare metal deployment.

source /etc/default/metron


export HDP_HOME="/usr/hdp/current"

Note: It's worth checking the the values of ZOOKEEPER and BROKERLIST before continuing. You should supply a comma-delimited list of host:port items for the ZOOKEEPER and BROKERLIST variables if you are running in an environment with multiple hosts for Zookeeper and the Kafka brokers.

Step 5 : Create Kafka topics and ingest sample data 

${HDP_HOME}/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic squid --partitions 1 --replication-factor 1

${HDP_HOME}/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --list

The following commands will setup a new Kafka topic for squid.  Now let's test how we can pipe the Squid logs to Kakfka

cat /var/log/squid/access.log | ${HDP_HOME}/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic squid

${HDP_HOME}/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $BROKERLIST --topic squid --from-beginning

Note: The following steps for manually creating the Grok expression, copying the pattern to HDFS, and creating the parser and indexing json configs for the sensor is no longer necessary in full dev. The files are installed by default and you can simply start the squid topology as described below to achieve the end result of these steps.


This should ingest our Squid logs into Kafka.  Now we are ready to tackle the Metron parsing topology setup.  The first thing we need to do is decide if we will be using the Java-based parser of a Grok-based parser for the new telemetry.  In this example we will be using the Grok parser.  Grok parser is perfect for structured or semi-structured logs that are well understood (check) and telemetries with lower volumes of traffic (check).  The first thing we need to do is define the Grok expression for our log.  Refer to Grok documentation for additional details.  In our case the pattern is:

SQUID_DELIMITED %{NUMBER:timestamp}[^0-9]*%{INT:elapsed} %{IP:ip_src_addr} %{WORD:action}/%{NUMBER:code} %{NUMBER:bytes} %{WORD:method} %{NOTSPACE:url}[^0-9]*(%{IP:ip_dst_addr})?

Notice that I apply the UNWANTED tag for any part of the message that I don't want included in my resulting JSON structure.  Finally, notice that I applied the naming convention to the IPV4 field by referencing the following list of field conventions.  The last thing I need to do is to validate my Grok pattern to make sure it's valid. For our test we will be using a free Grok validator called Grok Constructor.  A validated Grok expression should look like this:



Now that the Grok pattern has been defined we need to save it and move it to HDFS.  Existing Grok parsers that ship with Metron are staged under /apps/metron/patterns/

First we do a directory listing to see which patterns are available with the platform

# hdfs dfs -ls /apps/metron/patterns/

Found 7 items

-rwxr-xr-x   1 metron hdfs      13748 2019-08-21 20:37 /apps/metron/patterns/asa

-rwxr-xr-x   1 metron hdfs       5202 2019-08-21 20:37 /apps/metron/patterns/common

-rwxr-xr-x   1 metron hdfs        524 2019-08-21 20:37 /apps/metron/patterns/fireeye

-rwxr-xr-x   1 metron hdfs       2551 2019-08-21 20:37 /apps/metron/patterns/sourcefire

-rwxr-xr-x   1 metron hdfs        180 2019-08-21 20:37 /apps/metron/patterns/squid

-rwxr-xr-x   1 metron hdfs       2220 2019-08-21 20:37 /apps/metron/patterns/websphere

-rwxr-xr-x   1 metron hdfs        879 2019-08-21 20:37 /apps/metron/patterns/yaf

Now we add a new pattern need to move our new Squid pattern into the same directory.  Create a file from the grok pattern above: 

touch /tmp/squid

vi /tmp/squid

Then move it to HDFS:

su - hdfs

**if the pattern already exists and you need to replace also run hdfs dfs -rm /apps/metron/patterns/squid

hdfs dfs -put /tmp/squid /apps/metron/patterns/

exit

Now that the Grok pattern is staged in HDFS we need to define a parser configuration for the Metron Parsing Topology.  The configurations are kept in Zookeeper so the sensor configuration must be uploaded there after it has been created.  A Grok parser configuration follows this format:

{
  "parserClassName": "org.apache.metron.parsers.GrokParser",
  "sensorTopic": "sensor name",
  "parserConfig": {
    "grokPath": "grok pattern",
    "patternLabel": "grok label",
    ... other optional fields
  }
}

There is a pre-packaged Squid Grok parser configuration file at ${METRON_HOME}/config/zookeeper/parsers/squid.json with the following contents:

{
  "parserClassName""org.apache.metron.parsers.GrokParser",
  "sensorTopic""squid",
  "parserConfig": {
    "grokPath""/patterns/squid",
    "patternLabel""SQUID_DELIMITED",
    "timestampField": "timestamp"
  },

  "fieldTransformations" : [

     {

     "transformation" : "STELLAR"
    ,"output" : [ "full_hostname", "domain_without_subdomains" ]
    ,"config" : {
                    "full_hostname" : "URL_TO_HOST(url)"
                   ,"domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"
                   }
     }
]

}

 

Notice the use of the fieldTransformations in the parser configuration.  Our Grok Parser is set up to extract the URL, but really we want just the domain or even the domain without subdomains.  To do this, we can use the Metron Transformation Language field transformation.  The Metron Transformation Language is a Domain Specific Language which allows users to define extra transformations to be done on the messages flowing through the topology.  It supports a wide range of common network and string related functions as well as function composition and list operations.  In our case, we extract the hostname from the URL via the URL_TO_HOST function and remove the domain names with DOMAIN_REMOVE_SUBDOMAINS thereby creating two new fields, "full_hostname" and "domain_without_subdomains" to each message.

We can also setup index types and batch sizing. Add the following lines to a file named ${METRON_HOME}/config/zookeeper/indexing/squid.json

{
"hdfs" : {
"index": "squid",
"batchSize": 5,
"enabled" : true
},
"elasticsearch" : {
"index": "squid",
"batchSize": 5,
"enabled" : true
},
"solr" : {
"index": "squid",
"batchSize": 5,
"enabled" : true
}
}

Another thing we can do is validate our messages.  Lets say we wanted to make sure that source IPs and destination IPs are valid.  The validators are global so we set them up on the global JSON and push them into Zookeeper.  To do so perform the following commands:

vi ${METRON_HOME}/config/zookeeper/global.json

and update the json to contain at least the following:

{
"es.clustername": "metron",
"es.ip": "node1:9300",
"es.date.format": "yyyy.MM.dd.HH",

"parser.error.topic": "indexing",
"fieldValidations" : [
{
"input" : [ "ip_src_addr", "ip_dst_addr" ],
"validation" : "IP",
"config" : {
"type" : "IPV4"
}
}
]

}

A script is provided to upload configurations to Zookeeper. Upload the configs with the PUSH option.

${METRON_HOME}/bin/zk_load_configs.sh -i ${METRON_HOME}/config/zookeeper -m PUSH -z $ZOOKEEPER

And we can verify our configurations have been uploaded by using the DUMP command.

${METRON_HOME}/bin/zk_load_configs.sh -m DUMP -z $ZOOKEEPER

Now, install an Elasticsearch template for your new sensor so that we can effectively query results in the Metron Alerts UI.

Note: This is a new step that is necessary as of the meta alerts feature and Elasticsearch 5.6.2 upgrade.

Run the following commands from the CLI.

curl -XPUT 'http://node1:9200/_template/squid_index' -d '
{
  "template": "squid_index*",
  "mappings": {
    "squid_doc": {
      "dynamic_templates": [
      {
        "geo_location_point": {
          "match": "enrichments:geo:*:location_point",
          "match_mapping_type": "*",
          "mapping": {
            "type": "geo_point"
          }
        }
      },
      {
        "geo_country": {
          "match": "enrichments:geo:*:country",
          "match_mapping_type": "*",
          "mapping": {
            "type": "keyword"
          }
        }
      },
      {
        "geo_city": {
          "match": "enrichments:geo:*:city",
          "match_mapping_type": "*",
          "mapping": {
            "type": "keyword"
          }
        }
      },
      {
        "geo_location_id": {
          "match": "enrichments:geo:*:locID",
          "match_mapping_type": "*",
          "mapping": {
            "type": "keyword"
          }
        }
      },
      {
        "geo_dma_code": {
          "match": "enrichments:geo:*:dmaCode",
          "match_mapping_type": "*",
          "mapping": {
            "type": "keyword"
          }
        }
      },
      {
        "geo_postal_code": {
          "match": "enrichments:geo:*:postalCode",
          "match_mapping_type": "*",
          "mapping": {
            "type": "keyword"
          }
        }
      },
      {
        "geo_latitude": {
          "match": "enrichments:geo:*:latitude",
          "match_mapping_type": "*",
          "mapping": {
            "type": "float"
          }
        }
      },
      {
        "geo_longitude": {
          "match": "enrichments:geo:*:longitude",
          "match_mapping_type": "*",
          "mapping": {
            "type": "float"
          }
        }
      },
      {
        "timestamps": {
          "match": "*:ts",
          "match_mapping_type": "*",
          "mapping": {
            "type": "date",
            "format": "epoch_millis"
          }
        }
      },
      {
        "threat_triage_score": {
          "mapping": {
            "type": "float"
          },
          "match": "threat:triage:*score",
          "match_mapping_type": "*"
        }
      },
      {
        "threat_triage_reason": {
          "mapping": {
            "type": "text",
            "fielddata": "true"
          },
          "match": "threat:triage:rules:*:reason",
          "match_mapping_type": "*"
        }
      },
      {
        "threat_triage_name": {
          "mapping": {
            "type": "text",
            "fielddata": "true"
          },
          "match": "threat:triage:rules:*:name",
          "match_mapping_type": "*"
        }
      }
      ],
      "properties": {
        "timestamp": {
          "type": "date",
          "format": "epoch_millis"
        },
        "source:type": {
          "type": "keyword"
        },
        "ip_dst_addr": {
          "type": "ip"
        },
        "ip_dst_port": {
          "type": "integer"
        },
        "ip_src_addr": {
          "type": "ip"
        },
        "ip_src_port": {
          "type": "integer"
        },
        "alert": {
          "type": "nested"
        },
        "metron_alert" : {
         "type" : "nested"
        },
        "guid": {
          "type": "keyword"
        }
      }
    }
  }
}
'
# Verify the template installs as expected 
curl -XGET 'http://node1:9200/_template/squid_index?pretty'

This template accomplishes two things:

  1. Sets up default mappings for metron-specific types, e.g. timestamps.
  2. Sets up types for properties that will come from the parsed data, e.g. ip_src_addr.

If you're using the Full dev environment, you might want to stop some of the other parsers to free up resources.

for parser in bro__snort__yaf profiler pcap batch_indexing; do storm kill parser; done

Now start the new squid parser topology:

${METRON_HOME}/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s squid

Navigate to the squid parser topology in the Storm UI at http://node1:8744/index.html and verify the topology is up with no errors:



Now that we have a new running squid parser topology, generate some data to parse by running this command several times:

sudo tail /var/log/squid/access.log | ${HDP_HOME}/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic squid

Refresh the Storm UI and it should report data being parsed:

Then navigate Elasticsearch at http://node1:9200/_cat/indices?v and verify that a squid index has been created:

health status index                     pri rep docs.count docs.deleted store.size pri.store.size
yellow open   yaf_index_2016.04.25.15     5   1       5485            0        4mb            4mb 
yellow open   snort_index_2016.04.26.12   5   1      24452            0     14.4mb         14.4mb 
yellow open   bro_index_2016.04.25.16     5   1       1295            0      1.9mb          1.9mb
yellow open   squid_index_2016.04.26.13   5   1          1            0      7.3kb          7.3kb 
yellow open   yaf_index_2016.04.25.17     5   1      30750            0     17.4mb         17.4mb 


In order to verify that the messages were indexed correctly first install elastic search Head plugin:

The Elasticsearch Head plugin is no longer available post 5.x. You have 3 options now:

  1. curl + REST API from the command line
  2. Google Chrome Head plugin
  3. The Kibana UI - see details here


And navigate to http://node1:9200/_plugin/head/ one of the above mentioned tools for data exploration.

There you will see parsed message + performance timestamps.  We will discuss the performance timestamps in another blog entry.

Now lets see how we create a Kibana dashboard to visualize data in metron.  First click on Visualize, select a squid index, and add the fields you wan to display




Then click on save to save the query and import it into the main Metron dashboard:




We are getting closer to releasing the first Beta Apache build of Metron.  Please help us by validating the our build.

The code is staged at http://home.apache.org/~jsirota/metron/Metron_0.1BETA_RC/RC_7/

The following are instructions for verifying the build.

Step 1 – Build Metron

cd incubator-metron/
mvn apache-rat:check && cd metron-streaming && mvn clean integration-test
&& cd ..

Verify that all tests are passing

Step 2 – Deploy metron as a single VM via vagrant and ansible

cd deployment/vagrant/singlenode-vagrant
vagrant plugin install vagrant-hostmanager
vagrant up

For a more complete set of instructions refer to:
https://github.com/apache/incubator-metron/tree/master/deployment

Verify metron is working:
- Check Ambari to make sure all the services are up by going to ambari in a browser at http://node1:8080
- Check Storm to make sure all the topologies are up
      From Ambari navigate to Storm -> Quick Links -> Storm UI
- Check that the enrichment topology has emitted some data (could take a few minutes to show
up in the Storm UI)
- Check indexes to make sure indexing is done correctly and data is visualized in Kibana in
a browser at http://node1:5000
- Check that some data is written into HDFS for at least one of the data sources
      Look in HDFS under /apps/metron/enrichment/indexed/yaf_doc|bro_doc|snort_doc
      This can be done from the browser by going to http://node:50070/explorer.html#/apps/metron/enrichment/indexed

Step 3 (optional) – Verify AWS Multi-Node Deploy with Ansible
cd deployment/amazon-ec2
ansible-playbook -i ec2.py playbook.yml

For a more complete set of instructions refer to:
https://github.com/apache/incubator-metron/tree/master/deployment

To verify the working build go through the same verifications as in Step2, but on AWS.  Reference
playbook.yml for location of the services.
Ambari-master contains Ambari, web contains Kibana and sensors.