Writing a Simple Telnet Honeypot in Python

11 minute read

Introduction

A honeypot is a device that poses as a vulnerable system, making it an irresistible target for an attacker. But in reality, the honeypot is a trap that can be used to detect an attacker. This makes honeypots an extremely valuable tool for detecting when your network has been compromised since attackers will often try to pivot from one machine to another within a network.

Honeypots also have uses in malware research and threat intelligence because they can be used to capture malicious software or gather information about attackers while posing as a vulnerable system.

My First Honeypot

One Friday night in 2017 while I was on a bus headed home from university, I popped open my slider phone, ssh’d into a VPS I was using as a game server and began writing some Python code. This was shortly after the source code for Mirai was leaked to the public and incredible numbers of poorly-secured internet of things devices were being infected and used in some of the most high-profile DDoS attacks ever seen. I was curious about how this piece of malware behaved, so I cobbled together a Python program that posed as a vulnerable IoT device with telnet accessible to the internet. Within seconds, I had infected devices trying to connect to my honeypot. After some refinement, I was able to successfully identify a command and control server (which was promptly reported to the ISP serving it).

This was a lot of fun, but the honeypot was very much a cobbled-together mess and I recently thought it might be fun to revisit it in a sort-of tutorial format.

The Telnet Server

In order to write a Telnet honeypot, you must first write a Telnet server. Telnet is a relatively simple protocol defined in RFC854. Telnet allows both data and commands to be sent over the same TCP socket, but for the purposes of this simple server we will just ignore the commands for now. Most automated scanners don’t send telnet commands, anyways.

The server will consist of three classes: A server class that handles incoming connections, a client connection class that handles individual clients and a class for logging activity. Create a new Python file called telnetsvr.py and we will begin adding code to it:

import socket
import queue
import sys
import time
import datetime
import csv
import os

class TelnetServer:

    def __init__(self, port, host="0.0.0.0", logdir=None):
        self.host = host
        self.port = port
        #Create a socket and bind to the host
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind((host, port))
        self.logdir = logdir
    
    def listen(self, count=10):
        self.socket.listen(count)

    def close(self):
        self.socket.close()

    def accept(self):
        conn, addr = self.socket.accept()
        print(addr)
        log = None
        if self.logdir:
            timefmt = str(int(time.time()))
            ipfmt = addr[0].replace(".", "-").replace(":", "-")
            log = os.path.join(self.logdir, timefmt + "-" + ipfmt + ".csv")
        return TelnetServerClient(conn, addr, logfile=log)
    

This class is responsible for setting up a TCP socket on the host and listening for incoming connections. Our main loop will eventually instantiate this class, call listen() and then call accept() in a loop in order accept incoming telnet connections. When an telnet client connects to the server, the accept() function in TelnetServer returns a TelnetServerClient object. Let’s create that next.

First, add the following constants to the top of telnetsvr.py:

CR_NULL = "\x0d\x00"
CR_LF = "\x0d\x0a"

Next, Create the TelnetServerClient class:

class TelnetServerClient:

    def __init__(self, conn, addr, logfile=None):
        self.conn = conn
        self.addr = addr
        self.logfile = logfile
        self.log = None
        if logfile:
            self.log = TelnetLogger(addr, logfile)

    def recv(self, count):
        r = self.conn.recv(count)
        #check if connection closed here
        if len(r) == 0:
            try:
                self.close()
            finally:
                raise ConnectionError(\
                'Connection closed by peer'\
                )
        return r
        
    def send(self, msg):
        if self.log:
            self.log.logSend(msg)
        try:
            self.conn.sendall(bytes(msg, encoding="utf-8"))
        except OSError as e:
            print(e)
            #If the socket is closed, and we try writing to
            #it, the OS raises the BAD_FD (Error 9) error.
            raise ConnectionError('Connection closed.')
            
    def recvstr(self, count):
        s = self.recv(count).decode('utf-8', errors='ignore')
        #Delete CTRL-C
        #s.replace("\x06", "")
        if self.log:
            self.log.logRecv(s)
        print(s)
        return s
            

This class is initialized by the TelnetServer class and has functions to send to or receive from the socket. The recvstr() function is a convenience function that automatically converts the received data to a string. This is a bit of a hack and might break if the remote client sends a telnet command, but we will ignore this for now in the interests of keeping this server simple. Both of these functions automatically perform some logging (which we will add later) if a log object was specified.

This is good, but it would be nice if we could send or receive one line at a time from the client since commands are typically line terminated. In order to do that, we will need a buffer to store the data we’ve received. We’ll also add a queue since it’s a convenient way of storing lines.

Add the following to the __init__() function of TelnetServerClient:

self.buf = ''
self.line_queue = queue.Queue()

Next add a function to TelnetServerClient to receive a data from the connected client into the buffer we just created:

def recvIntoBuffer(self):
    self.buf += self.recvstr(1024)

Now for the complicated part: Parsing the lines. Add the following functions to TelnetServerClient:

'''
Returns a 2-tupple. The first element is a list of 2-tupples 
containing lines split by CR_NULL/CR_LF and the line ending. 
The second element contains any leftover string
'''
def _telnetSplit(self, s):
    #Not enough characters for a CR_LF sequence
    if len(s) < 2:
        return [],s

    if s == CR_NULL or s == CR_LF:
        return [('',s)],''
    
    splits = []
    split_start = 0
    
    i = 0
    while i < len(s) - 2:
        if s[i:i+2] == CR_NULL:
            splits.append((s[split_start:i], s[i:i+2]))
            split_start = i+2
            i = i + 1
        elif s[i:i+2] == CR_LF:
            splits.append((s[split_start:i], s[i:i+2]))
            split_start = i+2
            i = i + 1
        i += 1
    leftover = s[split_start:len(s)]

    #check if leftover ends with a line ending
    if leftover.endswith(CR_LF) or leftover.endswith(CR_NULL):
        splits.append((leftover[:len(leftover)-2],leftover[len(leftover)-2:]))
        leftover = ""
    
    return (splits, leftover)

'''
readline() reads a line from the socket.
Returns a 2-tupple containing the line and the line ending.
If no line is available, this function blocks.
'''
def readLine(self):
    while True:
        #first try to read the split
        #TELNET uses CR NULL terminaton when just a newline 
        #character is desired.
        #These must be translated into separate lines
        #print("RL")
        splits,leftovers = self._telnetSplit(self.buf)
        #print(splits)
        #print(leftovers)
        if len(splits) > 0:
            for split in splits:
                self.line_queue.put(split)
            self.buf = leftovers

        try:
            return self.line_queue.get_nowait()
        except queue.Empty:
            pass
            #print("q empty")

        #print("recv")
        #If there's no split, try reading from the socket again
        self.recvIntoBuffer()
            

readLine() is a function that reads one line sent by the remote system from the queue if a line is available and _splitLines() is a function that is used to tokenize lines from a string, such as the buffer we created earlier.

If you’re familiar with Python, you might be tempted to simply use line.split('\n') to separate the buffer into individual lines. There’s a small complication we deal with in _splitLines(), though! In the telnet protocol, lines may be either CR-LF or CR-NULL terminated and we must consider both cases. Additionally, we may wish to know the termination type as each of these have different meanings. So, we check for both of those cases in the while loop of this function in order to account for this. When a line is found, it is appended to a list along with the line termination. When this while loop finishes executing, any leftover data in the buffer is returned along with the list of lines.

Now reading a line is fairly simple. readLine() processes any data in the buffer, adds any lines it finds to the queue then attempts to read from the queue. If there is nothing in the queue to return, it reads data from the socket into the buffer and tries again. The effect of this is that this function blocks until there’s a line available to be read.

Next, let’s deal with sending a line and closing the socket. Add the following to TelnetServerClient:

def sendLine(self, msg):
    self.send(msg+CR_LF)

def close(self):
    #There may be data left in the buffer when this is called, so you may
    #wish to deal with that here instead of simply closing it.
    self.conn.close()
    if self.log:
        self.log.close()

This is fairly straightforward, sendLine() simply appends a line terminator to a string and sends it over the socket. close() closes the connection and log file if a log object was defined.

Finally, let’s add some logging capabilities to telnetsvr.py by adding this TelnetLogger class:

 
class TelnetLogger:

    def __init__(self, addr, filename):
        self.filename = filename
        self.file = open(filename, 'w')
        self.writer = csv.writer(self.file)
        self.logConnect(addr[0])
        self.file.flush()
        self.closed = False

    def logConnect(self, ip):
        str_time = str(time.time())
        try:
            self.writer.writerow(["OPEN", str_time, ip])
        except ValueError:
            #csvwriter rases this on an attempt to write to a closed file
            pass

    def logRecv(self, msg):
        str_time = str(time.time())
        try:
            self.writer.writerow(["RECV", str_time, msg.encode('unicode_escape')])
            self.flush()
        except ValueError:
            pass

    def logSend(self, msg):
        str_time = str(time.time())
        try:
            self.writer.writerow(["SEND", str_time, msg.encode('unicode_escape')])
            self.flush()
        except ValueError:
            pass

    def close(self):
        self.closed = True
        self.flush()
        self.file.close()

    def flush(self):
        try:
            self.file.flush()
        except ValueError:
            pass
            
 

This class makes use of the built in csvwriter module, which is a simple and convenient log file format. You could certainly make some improvements here like log to a database, but I decided to keep things fairly simple with one file per connection. The try/except blocks around the writer are there to catch attempts to write to a closed log. These writes are simply ignored.

Main loop

Now that we have the telnet server taken care of, we need some code to make use of it. let’s begin by making a new file called honey.py and begin with a little bit of code to start and test the server:

import telnetsvr
 
ts = telnetsvr.TelnetServer(7000, logdir="log")
ts.listen()
while True:
    client = ts.accept()
    client.sendLine("Hello, World!")
    client.close()
 

Run this code and try connecting to the server using telnet. On most Linux systems, you can do this with:

$ telnet localhost 7000

If everything works, you should see “Hello, World!” printed to the terminal.

Making it useful

We can now accept incoming connections, but can’t do much else beyond that. To fix that, let’s create a fake login prompt and a fake shell. Add the following to honey.py:

'''
FakeGetty
Fake login prompt
'''
class FakeGetty:
    def __init__(self, client):
        self.client = client

    def motd(self):
        self.client.send("\r\nYOUR SYSTEM NAME HERE\r\n\r\n")
        self.client.send("YOUR ORGANIZATION NAME HERE\r\n")
        self.client.send("UNAUTHORIZED ACCESS \
        PROHIBITED\r\n\r\n\r\n")

    def run(self):
        self.motd()
        while True:
            self.client.send("Login: ")
            user, ending = client.readLine()
            self.client.send("Password: ")
            pw, ending = client.readLine()
            if user == "root" and pw == 'root':
                return True
            else:
                self.client.sendLine("Login incorrect.")   

'''
FakeShell
Approximtes the behaviour of a real shell
'''
class FakeShell:

    def __init__(self, client):
        self.client = client
        self.fs = FakeFS.FakeFS("fsconfig.json")

    def motd(self):
        self.client.send("\r\n\r\nsh 1993-02-04\
        \r\nSAMPLE TEXT\r\n")

    def prompt(self):
        self.client.send("# ")

    '''
    runCommand(self, cmdline)
    Parses a single statement from the shell and runs the 
    appropriate command
    '''
    def runCommand(self, cmdline):
        args = cmdline.strip().split(" ")
        
        if len(args) > 0:
            cmd = args[0]
        else:
            return
        #print(args)
        if cmd == "exit":
            client.close()
        elif cmd == "sh":
            self.motd()
        elif cmd == "":
            return
        else:
            self.client.sendLine(cmd + ": not found")

    '''
    run()
    Main loop of the shell. First presents the user with a 
    login prompt via FakeGetty and then presents the user 
    with a shell.
    '''
    def run(self):
        fg = FakeGetty(client)
        fg.run()
        self.motd()
        while True:
            self.prompt()
            line, ending = client.readLine()
            print("LINE: " + line)
            #Split commands at semicolons. This is not perfect,
            #but is good enough to approximate shell behavior
            statements = line.split(";")
            for statement in statements:
                    self.runCommand(statement)

So what’s going on here? FakeGetty is instantiated by FakeShell, prints a banner and then prompts the client for a user name and password. If the user name and password are both “root”, FakeShell presents the client with, well, a fake shell.

FakeShell needs to be instantiated with a TelnetServerClient, so we shall do so in our main loop after a client connects. Let’s update the main loop in honey.py to properly instantiate the FakeShell:

ts = telnetsvr.TelnetServer(7000, logdir="log")
ts.listen()
while True: 
    client = ts.accept()
    fs = FakeShell(client)
    fs.run()
    client.close()

You should now be able to run honey.py and you will be prompted with a password when you connect with telnet. After “logging in” you should be greeted with a shell prompt. Of course, it still doesn’t do much beside allow you to exit or print the banner again.

Adding a program

Let’s extend the functionality of the honeypot by adding a program that emulates the functionality of a common *nix utility, echo.

Add the following class to honey.py:

'''
    Fake echo command
    Simply echoes the arguments passed to it back to the user
'''
class FakeEcho:
    def __init__(self, client, cmdline):
        self.client = client
        self.cmdline = cmdline

    def run(self):
        self.client.sendLine("".join(self.cmdline))

FakeEcho simply echos back the arguments that were passed to it. These arguments are passed in the form of an array of strings, much in the way they would in a C program. "".join(self.cmdline) simply concatenates all of these arguments into a string, which isn’t quite how echo works, but this works sufficiently well for illustrative purposes.

Now let’s make it possible to run FakeEcho from our FakeShell. Update the if statement in runCommand() to the following:

if cmd == "exit":
    client.close()
elif cmd == "sh":
    self.motd()
elif cmd == "echo":
    fe = FakeEcho(self.client, args[1:])
    fe.run()
elif cmd == "":
    return
else:
    self.client.sendLine(cmd + ": not found")

You should now be able to log in to the honeypot and run the echo command from the shell.

Conclusion

Thank you for reading. I hope you found this post helpful, or at least interesting. Keep in mind that the honeypot presented here isn’t particularly robust and does have a few flaws such as a lack of an upper bound on the size of the receive buffer, so I would suggest being careful if you do decide to expose it to the internet.

Now that you have a basic telnet honeypot together though, you might try extending it with more commands, implementing a fake filesystem or making the server threaded so that it can service multiple client connections as well.

If you’re really feeling up for a challenge, you might extend the server and shell to emulate process signaling or try implementing pipes.

Updated: