Source code for asyncdnspy.dns_message_decoder

#!/usr/bin/env python

import sys
import array

from asyncdnspy.dns_message_header import Header
from asyncdnspy.dns_message_question import Question
from asyncdnspy.error.asyncdnspy_error import AsyncDNSPyError
from asyncdnspy.dnspy_enum import SocketType, RecordType
from asyncdnspy.dns_message_resourcerecord import ResourceRecord
from asyncdnspy.dns_message import DNSMessage

from typing import List


[docs]class DNSMessageDecoder(object): """Decoder class that decodes raw bytes to meaningful dns message.""" socket_type = SocketType.udp offset = 0
[docs] @staticmethod def decode(buffer: List, received_data: List, record_type: RecordType, socket: SocketType = SocketType.udp): """ Decodes dns message response into a readable dns message. :param buffer: Raw byte list of dns message query. :type buffer: list :param received_data: Raw byte list of dns message response. :type received_data: list :param record_type: DNS message record type. :type record_type: RecordType :param socket_type: DNS client socket type. :type socket_type: SocketType :rtype: DNSMessage """ if not buffer or not received_data: return None socket_type = socket if socket_type == SocketType.tcp: offset = 2 received_data = list(received_data) dns_message = DNSMessage() try: header = DNSMessageDecoder.read_header(received_data) dns_message.header = header except AsyncDNSPyError as error: print(error.message) if not dns_message.header: return None try: question_count = int(dns_message.header.question_count) questions = DNSMessageDecoder.read_questions(received_data, question_count) dns_message.questions = questions except AsyncDNSPyError as error: print(error.message) answer_count = dns_message.header.answer_count if answer_count == 0: return dns_message try: answers = DNSMessageDecoder.read_resource_records(buffer, received_data, answer_count) dns_message.answers = answers except AsyncDNSPyError as error: print(error.message) return dns_message
[docs] @staticmethod def read_header(bytes: List): """ Reads byte list and creates a high level dns message header. :param bytes: Raw byte list of dns header. :type bytes: list :rtype: Header """ if len(bytes) < 12: raise AsyncDNSPyError('Error occured on decoding dns message header ' + sys._getframe(1).f_code.co_name) header = Header() header.message_id = DNSMessageDecoder.ui16(bytes[0:2]) header.flags = DNSMessageDecoder.ui16(bytes[2:4]) header.question_count = DNSMessageDecoder.ui16(bytes[4:6]) header.answer_count = DNSMessageDecoder.ui16(bytes[6:8]) header.authority_count = DNSMessageDecoder.ui16(bytes[8:10]) header.additional_count = DNSMessageDecoder.ui16(bytes[10:12]) return header
[docs] @staticmethod def read_questions(bytes: List, count: int): """ Reads byte list and creates a high level dns message questions list. :param bytes: Raw byte list of dns header. :type bytes: list :param count: Question count. :type count: int :rtype: list (list of question/s) """ if len(bytes) < 16: raise AsyncDNSPyError('Not enough data to decode questions ' + sys._getframe(1).f_code.co_name) if count == 0: raise AsyncDNSPyError('No questions found on dns response ' + sys._getframe(1).f_code.co_name) questions = [] name_length = bytes[12] name_start_index = 13 extension_length = bytes[name_start_index + int(name_length)] name = '' try: name = DNSMessageDecoder.decode_name(bytes=bytes, name_start_index=name_start_index, name_length=name_length) except AsyncDNSPyError as error: print(error.message) domain_length = name_start_index + int(name_length) + 1 + int(extension_length) + 1 question_type = DNSMessageDecoder.ui16(bytes[domain_length:domain_length + 2]) class_type = DNSMessageDecoder.ui16(bytes[domain_length + 2:domain_length + 4]) question = Question() question.name = name question.question_type = question_type question.class_type = class_type questions.append(question) return questions
[docs] @staticmethod def read_resource_records(buffer: List, received_data: List, count: int): """ Reads byte list and create resource records. :param buffer: Raw byte list of sent dns message. :type bytes: list :param received_data: Raw byte list of sent dns message response. :type received_data: list :param count: Question count. :type count: int :rtype: list (list of resource record/s) """ data = received_data[len(buffer):len(received_data)] data = DNSMessageDecoder.__split_into_answers(data) if len(data) < 1: raise AsyncDNSPyError('Not enough data to read resource records ' + sys._getframe(1).f_code.co_name) if count == 0: raise AsyncDNSPyError('No resource records ' + sys._getframe(1).f_code.co_name) records = [] for bytes in data: if len(bytes) > 12: name_length = bytes[0] name = '' if name_length > 63: index = int(bytes[1]) + DNSMessageDecoder.offset name_length = received_data[index] name_start_index = index + 1 try: name = DNSMessageDecoder.decode_name(bytes=received_data, name_start_index=name_start_index, name_length=int(name_length)) except AsyncDNSPyError as error: print(error.message) type16 = DNSMessageDecoder.ui16(bytes[2:4]) class16 = DNSMessageDecoder.ui16(bytes[4:6]) ttl = DNSMessageDecoder.ui32(bytes[6:10]) record_data_length = DNSMessageDecoder.ui16(bytes[10:12]) if len(bytes) >= 12 + int(record_data_length): record_data = bytes[12:12 + int(record_data_length)] resource_record = ResourceRecord() resource_record.name = name resource_record.record_type = type16 resource_record.class_type = class16 resource_record.ttl = ttl resource_record.record_data_length = record_data_length resource_record.record_data = record_data records.append(resource_record) else: name_length = bytes[0] name_start_index = 1 extension_length = bytes[name_start_index + int(name_length)] try: name = DNSMessageDecoder.decode_name(bytes=bytes, name_start_index=name_start_index, name_length=int(name_length)) except AsyncDNSPyError as error: print(error.message) index = 1 + int(name_length) + 1 + int(extension_length) type16 = DNSMessageDecoder.ui16(bytes[index + 2:index]) class16 = DNSMessageDecoder.ui16(bytes[index + 2:index + 4]) ttl = DNSMessageDecoder.ui32(bytes[index + 4:index + 8]) record_data_length = DNSMessageDecoder.ui16(bytes[index + 8:index + 10]) if len(bytes) >= 12 + int(record_data_length): record_data = bytes[index + 10:index + 10 + int(record_data_length)] resource_record = ResourceRecord() resource_record.name = name resource_record.record_type = type16 resource_record.class_type = class16 resource_record.ttl = ttl resource_record.record_data_length = record_data_length resource_record.record_data = record_data records.append(resource_record) return records
@staticmethod def __split_into_answers(bytes: List): """ Split bytes into answers. :param bytes: Raw byte list of dns header. :type bytes: list :rtype: list of lists """ answers = [] answer = [] for index, i in enumerate(bytes): answer.append(i) if index > 1 and i == 32: current = answer[-1] previous = answer[-2] if current == 12 and previous == 192: del answer[-1] del answer[-2] answers.append(answer) answer.clear() answer.append(previous) answer.append(current) answers.append(answer) return answers
[docs] @staticmethod def decode_name(bytes: List, name_start_index: int, name_length: int): """ Decodes list of unsigned 8-bit array to domain str. :param bytes: Raw byte list of dns header. :type bytes: list :param name_start_index: Starting index of name. :type name_start_index: int :param name_length: Length of name. :type name_length: int :rtype: array """ if len(bytes) < name_start_index: raise AsyncDNSPyError('Not enough data to decode name ' + sys._getframe(1).f_code.co_name) byte_array = [] for byte in bytes[name_start_index:len(bytes)]: byte_array.append(byte) if len(byte_array) == name_length + 1: # drop extension length _ = byte_array.pop() # append '.' byte_array.append(46) if byte == 0: _ = byte_array.pop() break return array.array('b', byte_array).tobytes().decode('utf-8')
[docs] @staticmethod def ui16(bytes: List): """ Creates unsigned 16 bit int. :param bytes: List of raw bytes. :type bytes: list :rtype: unsigned 16 bit int """ return int(bytes[0] << 8) + bytes[1]
[docs] @staticmethod def ui32(bytes: List): """ Creates unsigned 32 bit int. :param bytes: List of raw bytes. :type bytes: list :rtype: unsigned 32 bit int """ return int(bytes[0] << 24) + int(bytes[1] << 16) + int(bytes[2] << 8) + bytes[3]
[docs] @staticmethod def string(bytes: List): """ Creates str from bytes. :param bytes: List of raw bytes. :type bytes: bytes :rtype: str """ return bytes.decode('utf-8')