Source code for asyncdnspy.dns_raw_message

#!/usr/bin/env python

from enum import Enum
from random import randrange

from asyncdnspy.dnspy_enum import RecordType, SocketType


[docs]class Flag(Enum): """Enum for DNS flags.""" aa = 1 ad = 2 cd = 3 ra = 4 rd = 5 tc = 6
[docs]class State(Enum): """Enum for raw message states.""" initial = 1 have_messageid = 2 have_flags = 3 have_header = 4 have_questions = 5 have_answers = 6
[docs]class DNSRawMessage(object): """Class that creates DNS raw message as byte array.""" def __init__(self): """ Inits DNSRawMessage instance. >>> from asyncdnspy.dns_raw_message import DNSRawMessage >>> dns_raw_message = DNSRawMessage() :rtype: DNSRawMessage instance. """ self.__buf = [] self.__state = State.initial
[docs] def query(self, domain: str, record_type: RecordType, socket_type: SocketType = SocketType.udp): """ Creates a raw dns message query. >>> from asyncdnspy.dns_raw_message import DNSRawMessage >>> dns_raw_message = DNSRawMessage() >>> data = dns_raw_message.query('google.com', RecordType.a) :param domain: Domain to query it's DNS details. :type domain: str :param record_type: DNS record type. :type record_type: RecordType :param socket_type: Socket type. :type socket_type: SocketType :rtype: bytes. """ self.__message_id() self.__add_flags(qr=0, opcode=0, rcode=0, flags=[Flag.rd]) self.__header(question_count=1, answer_count=0, authority_count=0, additional_count=0) self.__question(name=domain, type16=record_type, class16=1) if socket_type == SocketType.udp: return bytes(self.__buf) tcp_buf = [] length_of_message = len(self.__buf) hi = ((length_of_message >> 8) & 0xFF) lo = (length_of_message & 0xFF) tcp_buf.append(hi) tcp_buf.append(lo) tcp_buf.extend(self.__buf) return bytes(tcp_buf)
def __message_id(self): """Appends random message id to buffer.""" if self.__state == State.initial: random_message_id = randrange(65536) self.__push16(random_message_id) self.__state = State.have_messageid def __add_flags(self, qr, opcode, rcode, flags: [Flag]): """ Adds DNS flags to dns message query. :param qr: DNS query or not. :type qr: A single bit field. :param opcode: Op code. :type opcode: A four bit field. :param rcode: Response code. :type rcode: A four bit field. :param flags: DNS flags. :type flags: List of Flag. """ if self.__state == State.have_messageid: flags0 = 0 flags1 = 0 if qr != 0: flags0 |= 0x80 if opcode != 0: code = (opcode & 0x0F) code = code << 3 flags0 |= code for flag in flags: if flag == Flag.aa: flags0 |= (1 << 2) elif flag == Flag.ad: flags1 |= (1 << 5) elif flag == Flag.cd: flags1 |= (1 << 4) elif flag == Flag.ra: flags1 |= (1 << 7) elif flag == Flag.rd: flags0 |= (1 << 0) else: flags0 |= (1 << 0) if rcode != 0: bit = (rcode & 0x0F) flags1 |= bit self.__buf.append(flags0) self.__buf.append(flags1) self.__state = State.have_flags def __header(self, question_count, answer_count, authority_count, additional_count): """ Adds DNS header details to dns message query. :param question_count: Question count. :type question_count: A 16-bit unsigned integer. :param answer_count: Answer count. :type answer_count: A 16-bit unsigned integer. :param authority_count: Authority count. :type authority_count: A 16-bit unsigned integer. :param additional_count: Additional count. :type additional_count: A 16-bit unsigned integer. """ if self.__state == State.have_flags: self.__push16(question_count) self.__push16(answer_count) self.__push16(authority_count) self.__push16(additional_count) self.__state = State.have_header def __question(self, name: str, type16, class16): """ Adds question to dns message query. :param name: Domain name. :type name: str. :param type16: The type of the requested resource record(s). :type type16: A 16-bit unsigned integer. :type class16: The class of the requested resource record(s). :param class16: A 16-bit unsigned integer. """ if self.__state == State.have_header: self.__domain(name) self.__push16(type16.value) self.__push16(class16) self.__state = State.have_questions def __push16(self, ui16): """ Adds 16-bit unsigned integer to dns message query. :type ui16: Value. :param ui16: A 16-bit unsigned integer. """ shift = 8 hi = ((ui16 >> shift) & 0xFF) lo = (ui16 & 0xFF) self.__buf.append(hi) self.__buf.append(lo) def __push32(self, ui32): """ Adds 32-bit unsigned integer to dns message query. :type ui32: Value. :param ui32: A 32-bit unsigned integer. """ hi = ((ui32 >> 24) & 0xFF) m2 = ((ui32 >> 16) & 0xFF) m1 = ((ui32 >> 8) & 0xFF) lo = ((ui32) & 0xFF) self.__buf.append(hi) self.__buf.append(m2) self.__buf.append(m1) self.__buf.append(lo) def __push8(self, ui8): """ Adds 8-bit unsigned integer to dns message query. :type ui8: Value. :param ui8: A 8-bit unsigned integer. """ self.__buf.append(ui8) def __domain(self, domain: str): """ Adds domain to dns message query. :type domain: Domain. :param domain: str. """ labels = domain.split(".") label_length = 0 for label in labels: label_length = len(label) self.__push8(label_length) for byte in str.encode(label): self.__push8(byte) if label_length != 0: self.__push8(0)