immersinn-ds

Fri 21 October 2016

Frequent Pattern Mining - Apriori Pt. 01

Posted by TRII in frequent-pattern-mining-course   

Introduction / Overview

Continuing with the Coursera Courses Theme, this post will kick off a series of posts related to the Pattern Discovery in Data Mining Course. As one might expect, the course covers introductory topics related to finding interesting pattern in data. "Pattern" is a fairly loaded term, so for now we'll leave the definition and exploration of the general field to the course and other resources.

For the first article in the series, we look into the Apriori Principle (Algorithm) and how it can be used to find frequent patterns in a dataset. The bare-bones explanation of Apriori is that large frequent patterns cannot have subsets that are not frequent in the dataset. This principle really is as intuitive as it seems! Why, then, is it useful?

In [1]:
import os
import sys
import pickle
In [2]:
import numpy
import scipy
import pandas
In [3]:
from collections import defaultdict
from nltk import FreqDist
In [4]:
%matplotlib inline

import matplotlib as mpl
import matplotlib.pyplot as plt

import seaborn as sns
sns.set(style="whitegrid", color_codes=True)
In [5]:
rnd = numpy.random.RandomState(8675309)

General Problem Overview

First, what is a pattern in the sense we are using? Many types of patterns manifest themselves, and the general practice of finding them is referred to as Pattern Recognition. Our particular use case revolves around co-occurring items in itemsets, or Association Rule Learning.

The typical example is to imagine a shopper buying a set of items at a grocery store. In this case, things like milk, cheese, bread, eggs, and peanut butter are all items. For a particular transaction -- that is, a particular shopping visit -- an individual purchases some subset of the items available in a grocery store. If we were to take all transactions that occurred in a week or month or year, we could count up how many transactions contained each grocery store item. This count or frequency would give us the support of each item (the support can refer to both the raw count of occurrences and the fraction of transactions containing an item).

While determining the most frequent items may be of interest, we would also likely be interested in which items were often purchased together. In other words, which itemsets are frequent in the dataset? However, given all of the items in a grocery store, there are a humongous number of possible item combinations that could occur in any given transaction. How do we efficiently sort through all of these patterns and count only the ones we are interested in?

This is where the Apriori Principle comes in. Starting with itemsets containing only a single item, we determine the items' respective supports. If the support for an item meets the given threshold, then the item is deemed a Frequent-1 itemset. For instance, if we say that all items occurring in more than 100 total transactions are frequent, and milk occurs in 2200 transactions, but sardines only shows up in 30 transactions, then we say that "milk" is a frequent itemset, but "sardines" is not.

Once we have the Frequent-1 itemsets, we can intelligently construct the potential Frequent-2 candidates using Apriori. Since we know that no Frequent-2 itemset can contain an itemset that is not a Frequent-1 itemset, the only possible candidates for Frequent-2 itemsets are the pairwise combinations of all Frequent-1 itemsets. We can therefore avoid counting the supports for a large number of patterns that cannot be frequent in the dataset! We can continue this process for finding Frequent-3 itemsets and beyond.

In practice, while Apriori does allow us to prune a significant number of options, for any moderately sized dataset, a large number of potential candidates still exists. While methods exist for making the process a bit more efficient, for now we look to Parallel Processing to speed up the support counting.

Additionally, while many variations exist with regard to applying Apriori and pattern finding in general, we only look at using the basic principle and a straight-forward implementation of the process. Later posts will examine various other avenues for finding frequent patterns and actually doing things with those patterns.

Data Overview

For the core set of the Pattern Discovery articles, we will utilize the Talking Data dataset from a past Kaggle competition. The goal of the competition was to use the collection of installed and active applications on an individual's smart phone in order to determine the user's demographics (Male v. Female and age group).

The dataset contains a large number of readings from users' phones. Each reading specifies the set of installed and active applications on the user's phone. Location data exists as well, but we are not currently interested in that data. From the full set of readings, we can determine the set of installed an active applications for each user in the dataset.

Readings are timestamped, and several readings exist for each user in the dataset. This provides us with each user's application history over time. While later articles will hopefully focus on this aspect of the data, for now we simply collect all readings for a user in order to define each user's transaction.

Specifically, we only look at those applications flagged as "active" for a particular user for at least one reading associated with that user. This reduces the dataset quite a bit, while also ensuring that the patterns found are "interesting" in the sense that they provide pertinent information about application use and not just application presence. The set of applications that are "active" for at least one reading associated with a user are that user's transaction. We will mine patterns from the set of user transactions in the dataset.

Much of the preprocessing on the original Talking Data dataset was performed previously in an older post (which apparently is no longer hosted by Google; need to fix that), so that will not be covered here. We do cover transitioning the output of that post to a transaction table, though.

Goals for Current Dataset

To summarize, the goal of this post is to generate all of the pieces required for extracting frequent patterns across user transactions generated from the Talking Data dataset.

Constructing the Transaction DataFrame from the Talking Data dataset involves merging all rows in the dataset corresponding to a single User / Device ID that consists of (AppID, is_installed, is_active, DeviceID) tuples. Application IDs in the resulting transaction dataset correspond to the union of installed Applications across all sample points in the data for the specified User.

For the current iteration of the Frequent Pattern Mining Problem, we treat each of these Application ID sets as a Transaction in our dataset, with one Transaction per User. The goal is to mine all frequent patterns across this set of transactions. This involves utilizing the Apriori Algorithm in a very straight-forward manner.

This post develops the tools required to accomplish the goal. The next post will carry out the full implementation of the method on the dataset and review the findings.

Load & Preprocess Data

Here we load the existing dataset from previous work and further process it into the form desired for pattern mining.

In [6]:
base = "../../../Analytics/Kaggle/TalkingData Moble User Demographics/"
user_apps_data_file = base + "data/proc/user_apps_train_df.pkl"
user_info_file = base + "data/gender_age_train.csv"
In [7]:
user_info = pandas.read_csv(user_info_file)
with open(user_apps_data_file, 'rb') as f:
    user_apps = pickle.load(f)
In [8]:
user_info.head()
Out[8]:
device_id gender age group
0 -8076087639492063270 M 35 M32-38
1 -2897161552818060146 M 35 M32-38
2 -8260683887967679142 M 35 M32-38
3 -4938849341048082022 M 30 M29-31
4 245133531816851882 M 30 M29-31
In [9]:
user_apps.head()
Out[9]:
app_id is_installed is_active device_id
0 NaN NaN NaN -9215638741604613652
1 NaN NaN NaN -9212424538588397592
2 4.460015e+18 1.0 0.0 -9212424538588397592
3 8.693964e+18 1.0 1.0 -9212424538588397592
4 -1.254571e+18 1.0 1.0 -9212424538588397592
In [10]:
user_apps.shape
Out[10]:
(1171795, 4)

Restrict Data

Clean up Empty Rows, Remove Rows that are not "active"

For a previous iteration, a "dummy row" were introduced for each User to ensure that every User occurring in the original dataset had an entry in the User-Application dataset, regardless of whether the User was ever recorded as having an application installed or active. Here we remove those rows as they will not contribute to the transaction dataset.

Additionally, the dataset is restricted to only those rows for which the corresponding application is recorded as "active". We have decided to define patterns of interest in the dataset as patterns corresponding to Active applications, not Installed applications. Restricting the problem in this manner has the added benefit of reducing the number of rows in the dataset, and thus reducing the overall transaction dataset. This dataset reduction will have an impact on the overall computation time.

In [11]:
# Remove all NaN rows (added to force a user to exist in the data)
user_apps = user_apps.dropna()

# Keep only rows pertaining to "active" apps (example of "Constraint Based Pattern Mining" --> Data Constraints)
user_apps = user_apps[user_apps.is_active==1]
In [12]:
user_apps.shape
Out[12]:
(349505, 4)
In [13]:
# Number of unique users
len(user_apps.device_id.unique())
Out[13]:
23247

Remove all rows corresponding to items that are active less than the min_sup value

For this step, we perform a sort of pre-Apriori Apriori method. Instead of building a transaction for each User that contains all installed Applications in the dataset corresponding to that User, we preemptively remove Applications that are not frequent in the dataset and do not include them in User applications.

In doing this, we reduce the size of many User transaction sets. Reducing transaction sizes means that the time spent looking for itemsets within transactions is also reduced. This reduction is achieved in every iteration over the dataset. Removing non-frequent-1 itemsets, then, has a compound reduction effect.

If complete transaction sets were desired at the end of the process in order to conduct more complex analyses, the removed itemsets (Applications in this case) could be re-added to the appropriate Users' transactions after frequent pattern mining was complete.

In [14]:
min_sup = 5
In [15]:
# Usage count per app
app_counts = user_apps.groupby('app_id').agg({'is_active' : sum})
In [16]:
active_apps = set(app_counts.index[app_counts.is_active >= min_sup])
In [17]:
user_apps = user_apps[user_apps.apply(lambda x: x['app_id'] in active_apps, axis=1)]
In [18]:
user_apps.shape
Out[18]:
(342708, 4)

Construct Transaction DataFrame

Constructing the Transaction DataFrame involves merging all rows in the "unrolled" dataset corresponding to a single User / Device ID. Application IDs in the resulting set correspond to the union of installed Applications across all sample points in the data for the specified User.

Analyzing the total number of installed, active applications for each User, we notice that most users only have a handful of applications active across the sample points. Most Users have less than 30 active applications overall, while some "super Users" have upwards of 100 active application.

The "Super Users" seem a bit exceptional in comparison to most Users. Thus, we restrict the dataset to include only those users with less than 40 unique active applications in their corresponding transaction. This corresponds to about 95% of the total User base in the current dataset.

In [19]:
# Generate the application list, hash table
apps = list(user_apps.app_id.unique())
apps_hash = {v : str(i) for (i,v) in enumerate(apps)}
In [20]:
# Construct the user : apps (transaction : items) table
# Replace actual application ids with the hash key
trans_list = []
for user,group in user_apps.groupby(['device_id']):
    curr_apps = set(group['app_id'])
    curr_apps = set([apps_hash[a] for a in curr_apps])
    trans_list.append({'user' : user,
                       'apps' : curr_apps})
trans_list = pandas.DataFrame(trans_list)
trans_list['length'] = trans_list.apply(lambda x: len(x['apps']), axis=1)
In [21]:
trans_list.head()
Out[21]:
apps user length
0 {1592, 1962, 14, 2430, 1060, 777, 595, 417, 19... -9222956879900151005 55
1 {40, 390, 105, 18, 336, 266, 214, 464, 17, 210... -9221026417907250887 29
2 {83, 98, 38, 1420, 70} -9220830859283101130 5
3 {70, 118, 156, 2616} -9220061629197656378 4
4 {2499, 283, 144, 108, 149, 106, 147, 208, 1442... -9218960997324667698 20
In [22]:
g = sns.distplot(trans_list.length, kde=False);
g.figure.set_size_inches(12,8);
plt.title("Distribution of Total Applications for Users", size=14);
plt.xlabel('Unique Applications for User', size=12);
plt.ylabel('Count', size=12);
In [23]:
pandas.DataFrame(trans_list.length.describe(percentiles=[0.1, 0.2, 0.5, 0.8, 0.9, 0.95, 0.99])).transpose()
Out[23]:
count mean std min 10% 20% 50% 80% 90% 95% 99% max
length 23235.0 14.743318 12.91813 1.0 2.0 3.0 11.0 24.0 32.0 39.0 58.0 144.0
In [24]:
trans_list = trans_list[trans_list.length <= 40]

Take a Random Sample, Save for Later

Even with the restrictions made up to this point, there remain about 22K transactions. With this many transactions to check, the full dataset takes awhile to process. Thus, for the purposes of this article, and general testing, development and comparison, we will obtain a random subset of the data.

This dataset will be used in subsequent posts as well as we construct various parts of the method. At the end of the process, we will use the full dataset from the previous step.

In [25]:
trans_list.shape
Out[25]:
(22174, 3)
In [26]:
trans_list = trans_list.sample(n=7500, replace=False, random_state=rnd)
In [27]:
trans_list.shape
Out[27]:
(7500, 3)
In [28]:
cd Dropbox/Analytics/Frequent\ Pattern\ Mining
[Errno 2] No such file or directory: 'Dropbox/Analytics/Frequent Pattern Mining'
/home/immersinn/Dropbox/Analytics/Frequent Pattern Mining
In [29]:
with open('transaction_list_subset.pkl', 'wb') as f:
    pickle.dump(trans_list, f)
In [30]:
with open('application_dictionary.pkl', 'wb') as f:
    pickle.dump(apps_hash, f)

Implementation: Part I

Background

Overview of the Apriori property (downward closure) and how it can be utilized for searching for patterns:

Understanding that for any any pattern $B$ such that $A \subseteq B$ cannot be frequent given that $A$ is observed in the dataset less than min_sup is the crux of the Apriori methodology. Working from singletons (individual applications in our example) upwards, then, we only construct candidate patterns from smaller patterns that have been observed to be frequent in the dataset.

While this is certainly useful in pattern detection, we will see that the number of candidate patterns generated remains extremely large.

To review, the steps in the iterative process are:

  1. Generate new candidates for Frequent-k patterns from Frequent-(k-1) patterns using the Apriori property
  2. Find the supports for each of the Frequent-k candidate patterns by iterating over the transaction dataset
  3. Determine which of the candidates have supports that meet the min_sup threshold; these are the Frequent-k itemsets

Steps 1-3 are repeated until no Frequent-k itemsets are found.

Start Finding Frequent Itemsets: Breakdown

Find Frequent-1 Itemsets

The first thing to do is find all single items in the transactions that are frequent. These will compose the Frequent-1 itemsets. Simply counting the number of transactions that each App appears in accomplishes this task.

To formalize, we first generate the set of candidate Frequent-1 patterns, which consists of all the Apps from the App ID lookups from the hash table constructed previously, which was used to create the Transaction Table. We then count the number of transactions each of these candidates occur in. Finally, counts are compared to the $min\_sup$ value, and any itemset occurring at least $min\_sup$ times in the dataset is added to the Frequent-1 list.

Three functions (shown below) are used for this task. Given a transaction and the set of candidates, the function check the transaction for all candidates, and record if a candidate pattern was observed within the transaction. These are general functions and are applicable for patterns of any length.

In [31]:
def updateCounts(transaction, patterns, curr_counts, ret=True):
    new_items = checkMatches(transaction, patterns)
    curr_counts.update(new_items)
    if ret:
        return(curr_counts)

def patternInTransaction(transaction, pattern):
    return(pattern.intersection(transaction) == pattern)
       
def checkMatches(transaction, patterns):
    new_items = [pattern for pattern in patterns \
                 if patternInTransaction(transaction,  pattern)]
    new_items = ['-'.join([i for i in sorted(ni)]) for ni in new_items]
    return(new_items)
In [32]:
candidates = [set([a]) for a in sorted(apps_hash.values())]
print(len(candidates))
2761
In [33]:
%%time

counts = FreqDist()
_ = trans_list.apply(lambda x: updateCounts(x['apps'],
                                            candidates,
                                            counts,
                                            ret=False),
                    axis=1)
CPU times: user 5.97 s, sys: 0 ns, total: 5.97 s
Wall time: 5.97 s

Using the DataFrame apply class method, the updateCounts function can be applied to each row in the DataFrame in order to update the "counts" frequency distribution.

After obtaining the counts for each item, we determine which items are Frequent-1 patterns by comparing the counts to the $min_sup$ value. We observe that about 1400 out of the total 2700 candidates were found to be frequent with support 5.

In [34]:
# Set Minimum Support Value
min_sup = 5
In [35]:
frequent = dict()
frequent[1] = [(k,v) for (k,v) in counts.items() if v >= min_sup]
In [36]:
len(frequent[1])
Out[36]:
1471

Generate Frequent-2 Candidates

Now we need to generate the candidates for Frequent-2 patterns. For Frequent-1 itemsets, this is straightforward. The Apriori property tells us that every pairwise combination of Frequent-1 itemsets is a Frequent-2 candidate. The itertools module can be used to easily generate these cnadidates.

In [37]:
import itertools
In [38]:
new_cands = [set(combo) for combo in itertools.combinations([k for (k,v) in frequent[1]], 2)]
In [39]:
len(new_cands)
Out[39]:
1081185
In [40]:
# Number of set check matches we need to do
freq_2_ops = len(new_cands) * trans_list.shape[0]
print(freq_2_ops)
8108887500
In [41]:
freq_1_ops = len(candidates) * trans_list.shape[0]
print(freq_1_ops)
20707500
In [42]:
freq_2_ops / freq_1_ops * 5.25 / 60
Out[42]:
34.26428377399493

One may being to suspect a slight issue with this approach. The approximately 1400 Frequent-1 itemsets result in about 1 million candidates for Frequent-2 itemsets, which implies that we will be performing about 8 billion set intersections and comparisons to obtain the Frequent-2 itemsets and get the Frequent-3 candidates.

For comparison, note that we performed only about 20 million operations to determine the Frequent-1 patterns, which took about 5 seconds. Thus, by rough estimates, counting supports for all of the Frequent-2 candidates would take about half an hour on our random data subset. While this is not a particularly long time in comparison to some processes, it is significantly longer than the sub-six-seconds observed for counting supports for single items.

In the next subsection, we look into using the ipyparallel module for using parallel processing to reduce the processing time.

Using ipyparallel for Speedup

The task at hand is ripe for speed-up from utilizing parallel processing. Each transaction (pattern) independent of all other transactions (patterns). Thus by splitting the transactions (patterns) into parts, pattern supports can be checked for each section and finally merged when all sections have been checked.

Below we focus on partitioning the transactions across cores in two ways. The first method involves splitting the data at every row -- every transaction --, and the second involves chunking the DataFrame into several large sections, each of which is sent off for processing. The two methods are then compared in terms speed.

Non-Parallel

For a baseline measure, each method is first performed sans parallelization. Two additional functions are introduced as wrappers around the pattern-checking functions introduced earlier.

One function takes a row object and the candidate patterns as an argument, and returns the counts for each candidate pattern in the row's transaction.

The second function -- "section_loop" -- takes a section of the transaction list and the candidate patterns as arguments. After checking each row in the section for the given set of candidate patterns, the function updates the total candidate pattern counts for the section. After checking all rows in the section, the total recorded counts are returned.

In [43]:
def row_loop(row, patterns):
    counts = FreqDist()
    transaction = row[1]['apps']
    counts = updateCounts(transaction, patterns, counts)
    return(counts)


def section_loop(section, patterns):
        
    counts = FreqDist()
    _ = section.apply(lambda x: updateCounts(x['apps'],
                                             candidates,
                                             counts,
                                             ret=False),
                        axis=1)
    return(counts)

Each row as separate item

In [44]:
%%time

results_1a = []
for row in trans_list.iterrows():
    # This line submits the tasks for parallel computation.
    results_1a.append(row_loop(row, candidates))
CPU times: user 7.16 s, sys: 44 ms, total: 7.2 s
Wall time: 7.2 s
In [45]:
new_counts_1a = FreqDist()
for result in results_1a:
    new_counts_1a.update(result)

Sections of the DataFrame as Separate Items

In [46]:
%%time
results_2a = []
n_sections = 8
len_section = trans_list.shape[0] // n_sections
start_cuts = [0 + len_section*i for i in range(n_sections)]

for i, ind in enumerate(start_cuts):
    if i==(n_sections-1):
        section = trans_list[ind:]
    else:
        section = trans_list[ind:start_cuts[i+1]]
        
    results_2a.append(section_loop(section, candidates))
CPU times: user 5.94 s, sys: 4 ms, total: 5.94 s
Wall time: 5.94 s
In [47]:
new_counts_2a = FreqDist()
for result in results_2a:
    new_counts_2a.update(result)
In [48]:
new_counts_1a == new_counts_2a
Out[48]:
True

As a sanity check, we ensure that both methods return the same supports for all candidate patterns in the dataset.

The "row loop" method takes over a second longer (about 20% longer) than the "section loop" method, probably because the latter method is able to make use of the built-in DataFrames "apply" class method, which has been optimized for these kinds of operations. Note that the "section loop" method is comparable in performance to the straight-forward DataFrame "apply" method used originally.

Parallel

Updating the Loop Functions

For parallel processing, some modifications must be made to the two functions introduced above. Each spawned process will be created in its own environment. Thus, the processes will not have any knowledge of content contained within this notebook that is not explicitly passed to the process.

In order for the processes to have access to the original pattern checking methods, they need to somehow be passed. The most straight-forward way to do this is to embed the functions in the overall loop functions. Additionally, we need to explicitly import any external modules that are used within any of the methods. Only the FreqDist class from the nltk module is used, which we import.

An alternative method allowed by ipyparallel is to import the necessary modules, functions, and methods into the engine(s) namespace(s) using the views.

In [49]:
def row_loop_parallel(row, patterns):
    from nltk import FreqDist
    
    def updateCounts(transaction, patterns, curr_counts):
        new_items = checkMatches(transaction, patterns)
        curr_counts.update(new_items)
        return(curr_counts)

    def patternInTransaction(transaction, pattern):
        return(pattern.intersection(transaction) == pattern)

    def checkMatches(transaction, patterns):
        new_items = [pattern for pattern in patterns \
                     if patternInTransaction(transaction,  pattern)]
        new_items = ['-'.join([i for i in sorted(ni)]) for ni in new_items]
        return(new_items)


    counts = FreqDist()
    transaction = row[1]['apps']
    counts = updateCounts(transaction, patterns, counts)
    return(counts)


def section_loop_parallel(section, patterns):
    from nltk import FreqDist
    
    def updateCounts(transaction, patterns, curr_counts):
        new_items = checkMatches(transaction, patterns)
        curr_counts.update(new_items)
        return(curr_counts)

    def patternInTransaction(transaction, pattern):
        return(pattern.intersection(transaction) == pattern)

    def checkMatches(transaction, patterns):
        new_items = [pattern for pattern in patterns \
                     if patternInTransaction(transaction,  pattern)]
        new_items = ['-'.join([i for i in sorted(ni)]) for ni in new_items]
        return(new_items)
    
    
    counts = FreqDist()
    _ = section.apply(lambda x: updateCounts(x['apps'],
                                             patterns,
                                             counts),
                        axis=1)
    return(counts)

Initilizing the Client

To utilize ipyparallel, we need to initialize a client that allows us to tap into the multiple cores on the system. Prior to doing this, however, we need to start ipcluster on the machine. This can be done from a terminal. While IPython allows issing shell commands from the notebook, in this case the window running the activation command is consumed with running the task. Thus, running the command in an IPython cell will result in a cell that never "completes" the process.

As a note, various Magic Commands also allow running commands, cells and / or blocks of cells using the parallel engines. One such example, taken from the ipyparallel help docs, is shown below. We choose to explicitly evoke the engines utilizing a client instead.

In [77]:
%%px --targets ::2
        print('I am even')
[stdout:0] I am even
[stdout:2] I am even
[stdout:4] I am even
[stdout:6] I am even
In [50]:
import ipyparallel as ipp
In [52]:
rc = ipp.Client()
print(rc.ids)
view = rc.load_balanced_view()
[0, 1, 2, 3, 4, 5, 6, 7]

After initializing a client object with the default profile, we create a view instance for the client. In this case, we choose the LoadBalancedView option (as opposed to the DirectView option), which handles the loads over the engines for us. That is, subsequent tasks beyond the number of engines are divvied up to engines in an "intelligent" manner.

The integers printed correspond to the engine ids accessible from the client. In this case, we see that 8 engines are available, each corresponding to a "core".

Each row as separate item

In [53]:
%%time

async_results = []
for row in trans_list.iterrows():
    # This line submits the tasks for parallel computation.
    ar = view.apply_async(row_loop_parallel, row, candidates)
    async_results.append(ar)

rc.wait(async_results)  # Wait until all tasks are done.
CPU times: user 1min 1s, sys: 2.72 s, total: 1min 4s
Wall time: 1min 7s
In [54]:
# Collect all of the results from teh separate processes
results_1b = [ar.get() for ar in async_results]
In [55]:
new_counts_1b = FreqDist()
for result in results_1b:
    new_counts_1b.update(result)
In [56]:
new_counts_1a == new_counts_1b
Out[56]:
True

Sections of the DataFrame as Separate Items

In [57]:
%%time

async_results = []
n_sections = 8
len_section = trans_list.shape[0] // n_sections
start_cuts = [0 + len_section*i for i in range(n_sections)]

for i, ind in enumerate(start_cuts):
    if i==(n_sections-1):
        section = trans_list[ind:]
    else:
        section = trans_list[ind:start_cuts[i+1]]
        
    ar = view.apply_async(section_loop_parallel, section, candidates)
    async_results.append(ar)

rc.wait(async_results)  # Wait until all tasks are done.
CPU times: user 112 ms, sys: 8 ms, total: 120 ms
Wall time: 1.57 s
In [58]:
# Collect all of the results from teh separate processes
results_2b = [ar.get() for ar in async_results]
In [59]:
new_counts_2b = FreqDist()
for result in results_2b:
    new_counts_2b.update(result)
In [60]:
new_counts_2a == new_counts_2b
Out[60]:
True

Running the methods again in parallel mode results in fairly surprising results. The first thing that should stand out is how much longer the "row loop" method took than both of the non-parallel versions and the "section loop" parallel code. The parallel "row loop" method takes upward of a minute, or about 9 times longer than the original methods. Let's consider this for a moment.

Each time a new operation starts on an engine, the environment needs to be prepared. This includes passing any data and functions required for the task over to the engine. In the "row loop" case, this happens for every row in the transaction DataFrame, or 7500 times. Only after the environment is prepped can the engine perform the actual computational task. In the row loop case, the task is to look for the pattern candidates passed in the single transaction passed. After the engine processes the transaction, it returns what was found. Then, a new transaction gets passed along with the candidate data and functions. Etc.

For the "section loop" method, however, each engine only has to do the "setup" step once. Then, it iterates over the rows in the section it was given and returns the result. Thus, the “section loop” method has significantly less overhead costs than the “row loop” method.

While the "section loop" method performs the fastest so far, it is not as fast as we would expect given that it uses 8 processes instead of just 1. The method performs only about 4 times faster than the single process methods, which suggests that setup time may still be having an effect on total processing time and / or we're hitting the lower bound of the benefits gained by splitting a task across multiple cores.

Note: we do not expect 8X speedup by going form 1 to 8 cores, as other factors are at play; however, only a 4X speedup seems to imply we have room for improvement.

So, the question remains, are overhead costs associated with “spinning up” engines limiting the potential speedup from parallel processing? What happens if we only use 6 processes? Or 4, or 2?

Comparing Performance for Varying n

While the "timeit" cell or line magic works well for testing the time it takes to run a single line or cell, the goal here is to capture the results for variations in order to compare the results. To loop over values of "n", we will use the timeit library explicitly. The results are shown below.

In [61]:
def parallel_sections_wrapper(n):
    
    async_results = []
    n_sections = n
    len_section = trans_list.shape[0] // n_sections
    start_cuts = [0 + len_section*i for i in range(n_sections)]

    for i, ind in enumerate(start_cuts):
        if i==(n_sections-1):
            section = trans_list[ind:]
        else:
            section = trans_list[ind:start_cuts[i+1]]

        ar = view.apply_async(section_loop_parallel, section, candidates)
        async_results.append(ar)

    rc.wait(async_results)  # Wait until all tasks are done.
In [62]:
import timeit
In [63]:
times = dict()
for n in range(1,9):
    time = timeit.repeat('parallel_sections_wrapper(' + str(n) + ')', 
                         setup = "from __main__ import parallel_sections_wrapper, trans_list, candidates",
                         number=1)
    times[n] = time
In [64]:
times_min = pandas.DataFrame([{'n_engines': k,
                               'min_time' : min(v)} \
                              for (k,v) in times.items()])
In [65]:
g = plt.scatter(x='n_engines', y='min_time', s=40, linewidths=1, data=times_min);
g.figure.set_size_inches(12, 8);
plt.title("Min Computation Time Across Number of Processes", size=14);
plt.xlabel('Number of Processes', size=12);
plt.ylabel('Minumum Time (sec)', size=12);
plt.ylim([0,6]);

We observe that the minimum run times bottom out after 4 cores. All minimum times recorded for 4 - 8 engines are about the same (i.e. about 1.5 seconds). Thus, it seems that we are hitting a trade-off between setup time and reduced computation time for tasks of the current size.

Why do we use the minimum value recorded for a value of $n$ and not the mean? From the python help docs:

Note: It’s tempting to calculate mean and standard deviation from the result vector and report these. However, this is not very useful. In a typical case, the lowest value gives a lower bound for how fast your machine can run the given code snippet; higher values in the result vector are typically not caused by variability in Python’s speed, but by other processes interfering with your timing accuracy. So the min() of the result is probably the only number you should be interested in. After that, you should look at the entire vector and apply common sense rather than statistics.

The time difference results between 4 and 8 cores are not significant enough to compel us to use only 4 engines, especially given that larger datasets will likely have comparable setup times but significantly more computations. For the remainder of these exercises, then, we will continue to use 8 engines unless evidence arises that convinces us to do otherwise.

Finding Frequent-2 Itemsets

With our parallel method in hand, we now progress to counting the supports for all Frequent-2 candidates. Note that this process takes a bit over 8 minutes, which is in line with expectations. The non-parallel version was estimated to take around 30 minutes. A speedup factor of about 4 was observed for counting the supports for Frequent-1 candidates. Thus, 9 minutes is in line with what we expect, but perhaps a bit slower. This is likely due to the slightly more complicated patterns being searched for in the transactions.

In [66]:
%%time

async_results = []
n_sections = 8
len_section = trans_list.shape[0] // n_sections
start_cuts = [0 + len_section*i for i in range(n_sections)]

for i, ind in enumerate(start_cuts):
    
    if i==(n_sections-1):
        section = trans_list[ind:]
    else:
        section = trans_list[ind:start_cuts[i+1]]
        
    ar = view.apply_async(section_loop_parallel, section, new_cands)
    async_results.append(ar)

rc.wait(async_results)  # Wait until all tasks are done.
CPU times: user 26.1 s, sys: 1.04 s, total: 27.1 s
Wall time: 9min 11s
In [67]:
# Collect all of the results from teh separate processes
results_freq2 = [ar.get() for ar in async_results]
In [68]:
counts_freq2 = FreqDist()
for result in results_freq2:
    counts_freq2.update(result)

Filtering out items that did not occur a sufficient number of times in the transaction dataset, we see that about 32,000 itemsets are in the Frequent-2 patterns. This constitutes only 3% of the total candidates, but is still about a factor of 20 larger than the Frequent-1 itemsets!

In order to move forward with the pattern mining process, we now need to devise a general method for constructing Frequent-$k$ itemset candidates from the Frequent-($k-1$) itemsets.

In [69]:
frequent[2] = [(k,v) for (k,v) in counts_freq2.items() if v >= min_sup]
In [70]:
print("A total of {0} Frequent-2 candidates were found to be frequent.".format(len(frequent[2])))
A total of 31692 Frequent-2 candidates were found to be frequent.
In [71]:
print("The Frequent-2 patterns represent only {0:0.1f}% of the total candidates.".format(len(frequent[2]) / len(new_cands) * 100))
The Frequent-2 patterns represent only 2.9% of the total candidates.

Generating Frequent-3 (and beyond) Candidate Itemsets

In this section, we build a general function for generating new Frequent-k candidates from Frequent-(k-1) itemsets. An outline of the algorithm is as follows:

  1. Collect all itemsets starting with the same k-2 items
    • This means collect all items sharing every item except for the last item (i.e., a common "head" of length k-2)
  2. Pairwise join all items that share the same "head" in Step 1
    +In practice, this can mean, for each "head" in Step 1:
    1. Generate all of the 2-item combinations of the last items associated with the head to create a set of "tails"
    2. Append each new tail in turn to the head to generate a candidate

  3. Remove new candidates whose last k-2 items are not an element of the frequent k-1 dataset

Our implementation utilizes default dictionaries for the "head" lookup hash table. We also generate an additional dictionary to make Step 3 easier.

The "tail lookup" hash table's keys are Frequent-1 itemsets that appear as the last item of a Frequent-(k-1) itemset. Because of how we order items in itemsets and how new candidates are generated, very Frequent-k candidate item generated must end with an item that is the last item in a Frequent-(k-1) itemset. Associated with each of these keys is the set -- a Python set object -- of all Frequent-(k-1) itemsets for with the item is the last item. When performing Step 3 for a particular candidate, we find the set corresponding to the last item in the candidate, and check if the tail of the candidate (the itemset corresponding to the last (k-1) items in the candidate) is a member of this set. If it is, then the new candidate is valid in terms of the Apriori Property. Otherwise, it is discarded because a subset of the candidate is not frequent in the transaction dataset.

In [72]:
def generateCandidates(frequent_k1_items):
    
    # Map items in "freq" with all but last item matching
    # to the same list in the hash table
    matchup_dict = defaultdict(list)
    endwith_dict = defaultdict(set)

    for item in frequent_k1_items:
        n = item.split('-')
        key = '-'.join(n[:-1])
        matchup_dict[key].append(n[-1])
        endwith_dict[n[-1]].update([item])
        
    # For each key in the hash list, construct new candidates
    # by iterating overall pairwise combination of the corresponding
    # values.  Merge each pairwise combo with the key to create 
    # a new candidate.  Check if the pattern consisting of the
    # last k-1 items in the candidate itemset constitute a
    # frequent pattern in the k-1 Frequent itemsets.  If yes,
    # add as a candidate pattern.
    ncs = []
    for k,v in matchup_dict.items():
        for tail in itertools.combinations(v, 2):
            last = tail[-1]
            tail = '-'.join(tail)
            cand = '-'.join([k, tail])
            test = '-'.join(cand.split('-')[1:])
            if test in endwith_dict[last]:
                ncs.append(cand)
                
    return(ncs)
In [73]:
f2_items = [k for k,v in frequent[2]]
In [74]:
new_cands_3 = generateCandidates(f2_items)
In [75]:
len(new_cands_3)
Out[75]:
387748
In [76]:
len(new_cands)
Out[76]:
1081185

We observe that the pool of candidates for Frequent-3 patterns are significantly smaller than the pool of candidates for the Frequent-2 itemsets. Given previous performances, we would expect that calculating the supports for these candidates would take about 2 minutes. We will leave this task -- and the task of completing the full process on the dataset -- to the next article.

Summary

In this notebook, we covered some of the preliminaries for carrying out Frequent Pattern Mining (FPM) using only the Apriori Principle.

We first introduced the Apriori Principle that was utilized throughout the post, and outlined how the principle would be used for finding patterns in the transactions.

Next, an overview of the dataset we will be using for the remainder of the FPM examples was provided. The Kaggle Talking Data dataset was restricted to only active applications that occurred at least $min_sup$ times in the reduced dataset. Then, we covered converting the dataset to a transaction list so that patterns could be mined.

Then we began writing the necessary functions for counting supports for itemsets and creating new candidate itemsets based on the Apriori Principle. After finding the Frequent-1 itemsets and generating the candidate patters for Frequent-2 itemset, we realized that the support counting problem had grown significantly.

Due to the large number of candidate patterns generated, parallel processing was investigated. From our trials, we observed that parallelizing by splitting the transaction list into sections and passing each section to an engine for candidate pattern counting showed the most significant speedup. Additionally, we decided that using 8 engines would likely provide the largest benefit moving forward.

Finally, we created a general function for generating Frequent-k itemset candidates from Frequent-(k-1) itemsets. This was the final step needed for implementing the full Frequent Pattern Mining process.

In the next section, we clean up and merge the code from above in order to implement the full method on the dataset and look into the patterns we discover.