You certainly heard many times about using machine learning to separate spam from ham. Do you want to have a glimpse of how it works?

We chose to stick to the Python standard library as much as possible, but there are great libraries out there to accomplish everything done here.

Having said that, time to get our hands dirty reinventing some wheels!

The data set

Apache SpamAssassin Project maintains a nice collection of old e-mail messages that we can use. The messages age from early 2000’s, and probably the scammers are way smarter now, so please don’t use this in any production environment :)

Download the data set using the following code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from os import makedirs, path, remove, rename, rmdir
from tarfile import open as open_tar
from urllib import request, parse


def download_corpus(dataset_dir: str = 'data'):
    base_url = 'https://spamassassin.apache.org'
    corpus_path = 'old/publiccorpus'
    files = {
        '20021010_easy_ham.tar.bz2': 'ham',
        '20021010_hard_ham.tar.bz2': 'ham',
        '20021010_spam.tar.bz2': 'spam',
        '20030228_easy_ham.tar.bz2': 'ham',
        '20030228_easy_ham_2.tar.bz2': 'ham',
        '20030228_hard_ham.tar.bz2': 'ham',
        '20030228_spam.tar.bz2': 'spam',
        '20030228_spam_2.tar.bz2': 'spam',
        '20050311_spam_2.tar.bz2': 'spam' 
    }
    
    downloads_dir = path.join(dataset_dir, 'downloads')
    ham_dir = path.join(dataset_dir, 'ham')
    spam_dir = path.join(dataset_dir, 'spam')


    makedirs(downloads_dir, exist_ok=True)
    makedirs(ham_dir, exist_ok=True)
    makedirs(spam_dir, exist_ok=True)
    
    for file, spam_or_ham in files.items():
        # download file
        url = parse.urljoin(base_url, f'{corpus_path}/{file}')
        tar_filename = path.join(downloads_dir, file)
        request.urlretrieve(url, tar_filename)
        
        # list e-mails in compressed file
        emails = []
        with open_tar(tar_filename) as tar:
            tar.extractall(path=downloads_dir)
            for tarinfo in tar:
                if len(tarinfo.name.split('/')) > 1:
                    emails.append(tarinfo.name)
        
        # move e-mails to ham or spam dir
        for email in emails:
            directory, filename = email.split('/')
            directory = path.join(downloads_dir, directory)
            rename(path.join(directory, filename),
                   path.join(dataset_dir, spam_or_ham, filename))
        
        rmdir(directory)


download_corpus()

We have a corpus of 6952 hams and 2399 spams.

1
2
3
4
5
6
7
8
from glob import glob
from os import path

ham_dir = path.join('data', 'ham')
spam_dir = path.join('data', 'spam')

print('hams:', len(glob(f'{ham_dir}/*')))  # hams: 6952
print('spams:', len(glob(f'{spam_dir}/*')))  # spams: 2399

Parsing messages

If you open any of these individual files, you will see they are very hard to read. This is because they are in MIME formathttps://en.wikipedia.org/wiki/MIME. Python has a standard library that helps us to extract only the part that we care about, namely subject and body.

Let’s create a class to represent a message. This class will hold the subject, the body, and will have a method to retrieve a clean string containing only letters.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from re import sub


class SimpleEmail:
    def __init__(self, subject: str, body: str):
        self.subject = subject
        self.body = body
    
    @property
    def clean(self):
        sanitizer = '[^A-Za-z]+'
        clean = sub(sanitizer, ' ', f'{self.subject} {self.body}')
        clean = clean.lower()
        return sub('\s+', ' ', clean)
    
    def __str__(self):
        subject = f'subject: {self.subject}'
        body_first_line = self.body.split('\n')[0]
        body = f'body: {body_first_line}...'
        return f'{subject}\n{body}'

    def __repr__(self):
        return self.__str__()

When we first started, we thought these messages would be heavy to load at once in memory, and because of that we built this generator for reading messages from a directory. In the end the messages are not that heavy…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from email import message_from_file
from glob import glob


class EmailIterator:
    def __init__(self, directory: str):
        self._files = glob(f'{directory}/*')
        self._pos = 0
    
    def __iter__(self):
        self._pos = -1
        return self
    
    def __next__(self):
        if self._pos < len(self._files) - 1:
            self._pos += 1
            return self.parse_email(self._files[self._pos])
        raise StopIteration()
    
    @staticmethod
    def parse_email(filename: str) -> SimpleEmail:
        with open(filename,
                  encoding='utf-8',
                  errors='replace') as fp:
            message = message_from_file(fp)
        
        subject = None
        for item in message.raw_items():
            if item[0] == 'Subject':
                subject = item[1]
        
        if message.is_multipart():
            body = []
            for b in message.get_payload():
                body.append(str(b))
            body = '\n'.join(body)
        else:
            body = message.get_payload()
        
        return SimpleEmail(subject, body)

Pre-processing

We can load everything in memory, we will be fine :)

1
2
3
4
5
6
7
8
import numpy as np


ham_emails = EmailIterator('data/ham')
spam_emails = EmailIterator('data/spam')

hams = np.array([email.clean for email in ham_emails])
spams = np.array([email.clean for email in spam_emails])

Since we have an unbalanced data set (6952 hams and 2399 spams) we need to take care when splitting it in training and test sets. We can use Scikit-Learn’s StratifiedShuffleSplit for that. It will make sure that we have a homogeneous distribution of hams and spams in both training and test sets.

1
2
3
4
5
6
7
8
9
10
11
12
13
from sklearn.model_selection import StratifiedShuffleSplit


split = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)

emails = np.concatenate((hams, spams))
labels = np.concatenate((np.zeros(hams.size), np.ones(spams.size)))

for train_index, test_index in split.split(emails, labels):
    emails_train, labels_train = \
        emails[train_index], labels[train_index]
    emails_test, labels_test = \
        emails[test_index], labels[test_index]

Using the messages in the training set, we build a dictionary with the occurrences of each word across all messages.

1
2
3
4
5
6
7
8
from collections import defaultdict


dictionary = defaultdict(int)

for email in emails_train:
    for word in email.split(' '):
        dictionary[word] += 1

And then we select only the top 1000 most frequent ones (you can experiment varying this number). Also, notice we are ignoring single letters (len(word) > 1).

1
2
3
4
5
6
7
8
top = 1000
descending_dictionary = sorted(dictionary.items(),
                               key=lambda v: v[1],
                               reverse=True)
dictionary = [
    word for (word, occur) in descending_dictionary
    if len(word) > 1
][:top]

The idea now is that we will encode each message (subject + body) into an array where each index indicates how many times a given word appears there. For instance, if our dictionary was only:

["yes", "no", "have", "write", "script", "myself", "to"]

And a certain message is “I would prefer not to have to write a script myself”, it would be encoded as:

[0, 0, 1, 1, 1, 1, 2]

Which means:

[
    0,  # zero occurrence(s) of word yes
    0,  # zero occurrence(s) of word no
    1,  # one  occurrence(s) of word have
    1,  # one  occurrence(s) of word write
    1,  # one  occurrence(s) of word script
    1,  # one  occurrence(s) of word myself
    2   # two  occurrence(s) of word to
]

You could also do “0” or “1” for “not occur” or “occur”, respectively. The following function encodes a given message using the approach just described.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def encode_email(email: SimpleEmail,
                 dictionary_: list,
                 binary: bool = False) -> np.array:
    encoded = np.zeros(dictionary_.size)
    words = email.split(' ')
    
    for word in words:
        index = np.where(dictionary_ == word)[0]
        if index.size == 1:  # we ignore unknown words
            if binary:
                encoded[index[0]] = 1
            else:
                encoded[index[0]] += 1
    return encoded

And then we encode our messages.

1
2
3
4
5
6
7
8
from functools import partial


dictionary = np.array(dictionary)
_encode_email = partial(encode_email, dictionary_=dictionary)

encoded_train = np.array(list(map(_encode_email, emails_train)))
encoded_test = np.array(list(map(_encode_email, emails_test)))

Training a model

Think about when you read an e-mail yourself and try to judge whether it is a spam or not. A good approach is to search for certain words or combination of words that you previously saw in spam e-mails. Words are just words, they are valid entities of a given language, there is nothing suspicious about them. But because you saw some of them in a certain combination and in a certain context, using a certain grammatical style, that is what makes them suspicious.

A good model for tackling such a task of comparing a message with other messages is the K Nearest Neighbors. It is a good guess because it doesn’t fit a mathematical expression, it is instance based, which means it will use a measure of distance to compare a new message with all the other known messages, and flag spam or ham according to how similar this new message is with its closest “neighbours”.

Let’s give one shot.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    precision_score,
    recall_score
)
from sklearn.model_selection import cross_val_predict
from sklearn.neighbors import KNeighborsClassifier


knn_clf = KNeighborsClassifier()
labels_pred = cross_val_predict(knn_clf,
                                encoded_train,
                                labels_train,
                                cv=5)

print('accuracy:', accuracy_score(labels_train, labels_pred))
# accuracy: 0.9648395721925134

print('precision:', precision_score(labels_train, labels_pred))
# precision: 0.9630872483221476

print('recall:', recall_score(labels_train, labels_pred))
# recall: 0.897342365815529

print('f1:', f1_score(labels_train, labels_pred))
# f1: 0.9290531427029943

That is cool right? A recall of 89% just out of the box. Well, this also means that 11% of the spams are escaping our detection, and because of the 96% precision, it also means that we are flagging as spam 4% of the valid messages. We can try hyperparameters tuning to improve this.

From our experience, companies prefer to flag a message as spam even when it is not than to allow a spam message into someone’s inbox. For that reason we believe we seek to improve recall here.

Warning: The following code might take one hour or two to run depending on your machine.

1
2
3
4
5
6
7
8
9
10
11
12
13
from sklearn.model_selection import GridSearchCV

params_grid = [{
    'n_neighbors': [2, 5, 10],
    'weights': ['uniform', 'distance'],
    'algorithm': ['auto', 'ball_tree', 'kd_tree', 'brute'],
    'metric': ['minkowski'],
    'metric_params': [{'p': 2}, {'p': 3}, {'p': 4}]
}]

search = GridSearchCV(knn_clf, params_grid, n_jobs=6,
                      scoring='recall', cv=5, verbose=1)
search.fit(encoded_train, labels_train)

The best parameters using this grid search are:

{
    'algorithm': 'ball_tree',
    'metric': 'minkowski',
    'metric_params': {'p': 2},
    'n_neighbors': 2,
    'weights': 'distance'
}

Using the best estimator:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
labels_pred = cross_val_predict(search.best_estimator_,
                                encoded_train,
                                labels_train,
                                cv=5)


print('accuracy:', accuracy_score(labels_train, labels_pred))
# accuracy: 0.9740641711229947

print('precision:', precision_score(labels_train, labels_pred))
# precision: 0.9551451187335093

print('recall:', recall_score(labels_train, labels_pred))
# recall: 0.9431995831162063

print('f1:', f1_score(labels_train, labels_pred))
# f1: 0.9491347666491873

The precision decreased a little bit (from 96% to 95%), but the recall improved a lot (from 89% to 94%).

Let’s now see how it performs in the test set.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
knn_clf = KNeighborsClassifier(algorithm='ball_tree',
                               n_neighbors=2,
                               weights='distance')
knn_clf.fit(encoded_train, labels_train)
labels_pred = knn_clf.predict(encoded_test)

print('accuracy:', accuracy_score(labels_test, labels_pred))
# accuracy: 0.982896846606093

print('precision:', precision_score(labels_test, labels_pred))
# precision: 0.9666666666666667

print('recall:', recall_score(labels_test, labels_pred))
# recall: 0.9666666666666667

print('f1:', f1_score(labels_test, labels_pred))
# f1: 0.9666666666666667

Nice right? It seems to be generalizing quite well.

Automating previous steps

First thing, we need an easy way to encode messages, and also vary some encoding parameters as hyperparameters if we want to. For that, let’s create a MessageEncoder transformer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from collections import defaultdict
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline


class MessageEncoder(BaseEstimator, TransformerMixin):
    def __init__(self, binary: bool = False, top: int = 1000):
        self.dictionary_ = None
        self.binary = binary
        self.top = top
    
    def fit(self, X, y=None):
        dictionary = defaultdict(int)
        
        for email in X:
            for word in email.split(' '):
                dictionary[word] += 1
        
        descending_dictionary = sorted(dictionary.items(),
                                       key=lambda v: v[1],
                                       reverse=True)
        
        self.dictionary = np.array([
            word for (word, occur) in descending_dictionary
            if len(word) > 1
        ][:self.top])
        
        return self
    
    def transform(self, X):
        return np.array(list(map(self.encode_message, X)))
        
    def encode_message(self, message: str):
        encoded = np.zeros(self.dictionary.size)
        words = message.split(' ')
        
        for word in words:
            index = np.where(self.dictionary == word)[0]
            if index.size == 1:  # we ignore unknown words
                if self.binary:
                    encoded[index[0]] = 1
                else:
                    encoded[index[0]] += 1
        return encoded

Cool, now we can use it like:

1
2
3
encoder = MessageEncoder()
encoder.fit(emails_train)
encoded_emails_train = encoder.transform(emails_train)

Or we can use it directly in a pipeline:

1
2
3
4
5
6
from sklearn.pipeline import Pipeline

pipeline = Pipeline([
    ('encode_messages', MessageEncoder()),
    ('knn_clf', KNeighborsClassifier()),
])

It makes possible to run grid search and cross validation in our pipeline.

Warning: The following code might take four to five hours to run depending on your machine.

1
2
3
4
5
6
7
8
9
10
11
12
13
from sklearn.model_selection import GridSearchCV

params_grid = [{
    'encode_messages__binary': [True, False],
    'encode_messages__top': [500, 1000],
    'knn_clf__n_neighbors': [2, 5, 10],
    'knn_clf__weights': ['uniform', 'distance'],
    'knn_clf__algorithm': ['auto', 'ball_tree', 'kd_tree', 'brute'],
}]

pipe_search = GridSearchCV(pipeline, params_grid, n_jobs=1,
                           scoring='recall', cv=5, verbose=2)
pipe_search.fit(emails_train, labels_train)

And the outcome is:

{
    'encode_messages__binary': True,
    'encode_messages__top': 1000,
    'knn_clf__algorithm': 'auto',
    'knn_clf__n_neighbors': 2,
    'knn_clf__weights': 'distance'
}

The difference is the binary parameter of the MessageEncoder. It seems that flagging absence or presence of a word is better than counting occurrences.

Finally, we can then check if we are still in good shape:

1
2
3
4
5
6
7
from sklearn.metrics import classification_report

labels_test_pred = pipe_search.best_estimator_.predict(emails_test)
print(classification_report(labels_test,
                            labels_pred,
                            target_names=['ham', 'spam'],
                            digits=4))
              precision    recall  f1-score   support

         ham     0.9877    0.9799    0.9838      1391
        spam     0.9430    0.9646    0.9537       480

    accuracy                         0.9759      1871
   macro avg     0.9653    0.9722    0.9687      1871
weighted avg     0.9762    0.9759    0.9760      1871

Conclusion

Although this is not a production ready model, it is nice that we can have such a nice recall without a lot of effort.

Thanks for reading so far, I hope you enjoyed having a few code samples you can copy and paste :)

Please let me know if something is not clear.