From DNS Packet Capture to analysis in Kibana

UPDATE June 2015: Forget this post, just head for the Beats component for ElasticSearch. Beats is based on PacketBeat (the same people). That said, I haven't used it yet.

If you're trying to get analytics on DNS traffic on a busy or potentially overloaded DNS server, then you really don't want to enable query logging. You'd be better off getting data from a traffic capture. If you're capturing this on the DNS server, ensure the capture file doesn't flood the disk or degrade performance overmuch (here I'm capturing it on a separate partition, and running it at a reduced priority).

# nice tcpdump -p -nn -i eth0 -s0 -w /spare/dns.pcap port domain

Great, so now you've got a lot of packets (set's say at least a million, which is a reasonably short capture). Despite being short, that is still a massive pain to work with in Wireshark, and Wireshark is not the best tool for faceting the message stream so you can can look for patterns (eg. to find relationships between requests and failures). An ELK stack is pretty great for that, so that's my (rather well fitted) hammer.




But an ELK stack (or rather, Logstash -- the 'L' in ELK) doesn't readily injest traffic captures... and if it did, I wouldn't want every conceivable field in a traffic capture going into Elasticsearch --- it's good, but come on, be reasonable.

Wireshark (or its command-line partner, tshark) can read the packet capture, and emit it as various formats, including XML (but not yet JSON, at least not in my installed version). Actually, for XML, it has two vocabularies it can use; one for summary data (PSML -- Packet Summary Markup Language), and one containing all the packet details (PDML -- Packet Details Markup Language). Converting PCAP to PDML is a true explosion of data (about 1000 times, IIRC), so best to avoid writing it to disk if you can, and instead factor into some processing pipeline. With Wireshark/tshark, I know what I'm getting, and get some added value (eg. calculated response time).

Another option is Python's scapy library, which is a packet dissector -- I've not used it, and I don't know how useful it would be for DNS traffic, so I won't comment on it here.

Let's see how to do the PCAP to PDML translation with tshark:

$ /usr/sbin/tshark -r /tmp/dns.pcap -T pdml | head -100 | less

Okay, easy bit done. Now we need to take the very-large XML document, and use some XML streaming processor to get the bits we want, and emit it JSON, which can then be sent to a Logstash instance, which will operate lightly on it, and pass it to Elasticsearch, and from there we can make some Kibana dashboards. But we're getting ahead of ourselves; time for a bit of Python.

$ cat dns-pdml-to-json
#!/usr/bin/python2.6

import sys
import xml.sax
import json
from datetime import datetime
import locale
import string

# -------------------------------------------------------------
class PDML_Handler(xml.sax.ContentHandler):
    def __init__(self):
        xml.sax.ContentHandler.__init__(self)  # super constructor

        self.wanted = [
                ( 'frame.number',     'decimal'   ),
                ( 'frame.protocols',  'string'    ),
                ( 'ip.src',           'string'    ),
                ( 'ip.dst',           'string'    ),
                ( 'udp.srcport',      'decimal'   ),
                ( 'udp.dstport',      'decimal'   ),
                ( 'dns.time',         'float'     ),
                ( 'dns.id',           'hex'       ),
                ( 'dns.reponse_in',   'decimal'   ),
                ( 'dns.reponse_to',   'decimal'   ),
                ( 'dns.flags.z',      'ignore'    ),
                ( 'dns.flags.',       'decimal'   ),
                ( 'dns.count.',       'decimal'   ),
                ( 'timestamp',        'timestamp' ),
                ( 'dns.qry.name',     'dnsname'   ),
                ( 'dns.qry.type',     'dnstypeorclass' ),
                ( 'dns.qry.class',    'dnstypeorclass' )]

        self.packet = {}
        self.packet_count = 0

    def init_packet(self):
        """Start collecting a new """
        self.packet = {}

        if (self.packet_count % 100 == 0):
            sys.stderr.write('\rPacket %s ...' % (locale.format('%d',self.packet_count,1)));

        self.packet_count += 1

    def flush_packet(self):
        """Write out the collected parts of the packet as JSON upon
"""
        out = {}

        for (k,v) in self.packet.items():
            if k == 'timestamp':
                out['@timestamp'] = v
            else:
                out[k.replace('.','_')] = v

        print json.dumps(out)

    def parse_decimal(self, name, attrs):
        self.packet[name] = int(attrs['show'], 10)

    def parse_hex(self, name, attrs):
        self.packet[name] = int(attrs['show'], 16)

    def parse_float(self, name, attrs):
        #
        # There is the very real possibility that this could be reported as being negative.
        # This is the case when the request is outside of the capture, but a later request
        # has the same transaction ID [and other things?]
        #
        # In that case, it is better not to include it if it is found to be negative.
        #
        tmp = float(attrs['show'])
        if tmp >= 0.0:
            self.packet[name] = tmp

    def parse_string(self, name, attrs):
        self.packet[name] = attrs['show']

    def parse_dnsname(self, name, attrs):
        self.packet[name] = attrs['show']
        labels = attrs['show'].split('.')
        for nlabels in range(1,4):
            self.packet['%s_%dtld' % (name, nlabels)] = 
                    string.join(labels[-nlabels:], '.')

    def parse_dnstypeorclass(self, name, attrs):
        self.packet[name] = attrs['showname'].split(' ')[1]

    def parse_timestamp(self, name, attrs):
        self.packet[name] = datetime.fromtimestamp(float(attrs['value'])).isoformat()

    def match_prefix(self, name, prefix):
        if prefix.endswith('.'):
            return name[:len(prefix)] == prefix
        else:
            return False

    def startElement(self, name, attrs):
        """Handle the start of an element and do normal processing."""

        if name == 'packet':
            self.init_packet()

        if name == 'field':
            field_name = attrs['name']

            for (name_or_prefix,handler) in self.wanted:
                if field_name == name_or_prefix:
                    pass
                elif self.match_prefix(field_name, name_or_prefix):
                    pass
                else:
                    continue

                if handler == 'decimal':
                    self.parse_decimal(field_name, attrs);
                elif handler == 'hex':
                    self.parse_hex(field_name, attrs);
                elif handler == 'float':
                    self.parse_float(field_name, attrs);
                elif handler == 'string':
                    self.parse_string(field_name, attrs);
                elif handler == 'dnsname':
                    self.parse_dnsname(field_name, attrs);
                elif handler == 'dnstypeorclass':
                    self.parse_dnstypeorclass(field_name, attrs);
                elif handler == 'timestamp':
                    self.parse_timestamp(field_name, attrs);
                elif handler == 'ignore':
                    # for excluding something before a prefix-match
                    pass
                else:
                    raise "Missing handler clause for " + handler

    def endElement(self, name):
        """Handle the end of the element."""

        if name == 'field':
            return

        elif name == 'packet':
            self.flush_packet()
            return

        elif name == 'pdml':
            sys.stderr.write('\rProcessed a total of %s packets\n' % (locale.format('%d',self.packet_count,1)));
            return

# MAIN --------------------------------------------------------

locale.setlocale(locale.LC_ALL, '')
handler = PDML_Handler()

xml.sax.parse(sys.stdin, handler)


TODO: Put this up on my GitHub account

Right, so now we should be able to get some JSON (one JSON document per line of output -- so 'json_lines' in Logstash terms). The output below has been cleaned up and anonymised.

$ /usr/sbin/tshark -r /tmp/dns.pcap -T pdml \
>  | ~/tmp/dns-pdml-to-json | head -1 | python -mjson.tool
{
    "@timestamp": "2015-04-10T10:43:57.196630",
    "dns_count_add_rr": 0,
    "dns_count_answers": 0,
    "dns_count_auth_rr": 0,
    "dns_count_queries": 1,
    "dns_flags_checkdisable": 0,
    "dns_flags_opcode": 0,
    "dns_flags_recdesired": 1,
    "dns_flags_response": 0,
    "dns_flags_truncated": 0,
    "dns_flags_z": 0,
    "dns_id": 34219,
    "dns_qry_class": "IN",
    "dns_qry_name": "foo.bar.example.com",
    "dns_qry_name_1tld": "com",
    "dns_qry_name_2tld": "example.com",
    "dns_qry_name_3tld": "bar.example.com",
    "dns_qry_type": "A",
    "frame_number": 1,
    "frame_protocols": "eth:ip:udp:dns",
    "ip_dst": "1.1.1.1",
    "ip_src": "2.2.2.2",
    "udp_dstport": 53,
    "udp_srcport": 64891
}

Note the following:

  • This is a request (dns_flags_response == 0) and therefore does not have the response time (dns_time attribute)
  • The contents of the responses are not captured anyway (they are multivalued and I don't have a particular use for them at present). The request stream is fairly intact, and I've even broken it down into cascading parent domains.
  • There are strings, and numbers (integers mostly, but dns_time is a floating-point number)
  • Although the output from dns-pdml-to-json is a single JSON document (representing a single packet) per line, the python -mjson.tool pretty-prints a single JSON document so its easier to read.
The output is already completely suitable for Elasticsearch, but to stream stuff into Elasticsearch, you really want to use something like Logstash, which takes care of the chunking necessary for using the Bulk API and HTTP. It's much easier to throw JSON lines at something than it is to take bunches of JSON lines, turn them into Bulk API commands, and batch them in via cURL.

So you'll need Logstash. Here's a very simple logstash.conf just for this purpose. In this example, I've also added a GeoIP lookup, just to give a taste of what Logstash can add in terms of value for analytics. The slowest part of the processing chain is the Python script (at the moment, I'm making it correct, not making it fast); the addition of the GeoIP processing didn't slow things down noticably.


input {
  stdin {
    codec => "json_lines"
  }
}

filter {
  geoip {
    source => "ip_src"
    target => "ip_src_geoip"
    fields => "continent_code"
  }
  geoip {
    source => "ip_dst"
    target => "ip_dst_geoip"
    fields => "continent_code"
  }
}

output {
  elasticsearch { host => localhost }
  #stdout { codec => rubydebug }
}


Let's put this all together. Make sure that Elasticsearch is running, and you may like to delete any previous logstash-* index if you don't care to keep it. The command below deletes such indexes (I'm running this on my local workstation; you probably wouldn't want to do this on a production cluster!!!) On a production cluster, I'd have logstash put them into a different index, but that's outside the scope of this post.

$ curl -XDELETE 'localhost:9200/logstash-*?pretty'; \
> /usr/sbin/tshark -r /tmp/dns.pcap -T pdml \
>  | ~/tmp/dns-pdml-to-json \
>  | ~/tmp/logstash-1.4.2/bin/logstash -f ~/tmp/logstash-1.4.2/logstash.conf
{
  "acknowledged" : true
}                      (this is from the curl command)
Packet 33,000 ...      (this will tick away.... I captured ~1.5M packets)


Okay, now for some Kibana 4 exploration... I should say that I'm new to Kibana 4 (as are most other people), and parts of this would be better back in Kibana 3 (largely due to Kibana 4 not having the 'other' bucket available for Top-N type of queries).

I'm not going to explain how to create a Kibana version 4 dashboard, or how to make a dashboard for this particular use; my use of it is quite early at this stage. But I will briefly describe what is present on this dashboard:

  • I've turned off legends for all of the visualisations on this dashboard to reduce clutter (you'd have to hover over things).
  • I deliberated presented you with boring data.
  • This is all from the point of view of one [recursive] DNS server.
  • Response times (dns_time) are shown broken down in percentiles; both for responses from the [recursive] server, and responses to the [recursing] server. (top two charts on the left).
  • In order to explore the slow response-time space, the bottom left pie-chart has the top few source IPs in the inter ring, and then breaking out into the 2nd-level domains. That chart operates only on responses that took longer than 5 seconds, so be careful to look at the numbers in the hover-over view.
  • A similar breakdown is shown on the right-most chart, showing the breakdown of labels (eg. com, example.com, foo.example.com). This operates on all responses. It would be very useful if it had the 'other' bucket, because otherwise you can't really tell how significant is the data you are looking at. Be careful of the inferences you make here.
  • The middle charts show various histograms meant to try and find correlations. There are examples of looking at the SERVFAIL responses (note: dropped packets cannot be shown); requests to a particular domain; number of distinct source IPs (Kibana 4 wins here); and a breakdown of request types (A versus AAAA, versus TXT, etc.) over time.



Enjoy, and may your enhanced understanding bring forth joyful resolution!

Improvements


  • Try to use the 'fields' output format instead.
    /usr/sbin/tshark -r /tmp/dns.pcap -n -T fields -E separator=, -E quote=d -e frame.time_epoch -e frame.number -e frame.protocols -e ip.src -e ip.dst -e udp.srcport -e udp.dstport -e tcp.srcport -e tcp.dstport -e dns.time -e dns.id -e dns.response_in -e dns.response_to -e dns.flags. -e dns.qry.name -e dns.qry.type -e dns.qry.class
  • Put this into github

Comments

Popular posts from this blog

ORA-12170: TNS:Connect timeout — resolved

Getting MySQL server to run with SSL