Random Forest Builder

From Control Systems Technology Group
Jump to navigation Jump to search

from random import seed from random import randrange from csv import reader from math import sqrt import re import csv

def load_csv(filename): dataset = list() with open(filename, 'r') as file: csv_reader = reader(file) for row in csv_reader: if not row: continue dataset.append(row) return dataset

  1. Makes an array of the data. Each row is a point in time and
  2. each column is a channel, except for the last column, which contains
  3. the desired output.

def make_data(dataname, startname, labelname):

   data = load_csv(dataname)
   starttimes = load_csv(startname)
   labels = load_csv(labelname)
   numtrials = len(labels)
   regex = r"NaN\s+"
   #Convert the data, each row is one second and each column is one channel
   for i in range(0, len(data)):
       data[i] = [float(j) for j in data[i][0].split()]
   #Convert starttimes and labels. for labels, 0 indicates a test trial
   for i in range(0, numtrials):
       starttimes[i] = int(starttimes[i][0])
       if re.search(regex, labels[i][0]): labels[i] = 0 
       else: labels[i] = int(labels[i][0])
   #Add the labels to the data matrix
   for i in range(0,numtrials):
       if i == 0: begin, end = 0, starttimes[0]
       else: begin, end = starttimes[i-1], starttimes[i]
       for j in range(begin, end):
           if i == 0: data[j].append(0)
           else: data[j].append(labels[i])
   for j in range(starttimes[-1], len(data)):
   return data
  1. Delete the rows with an unknown desired output

def delete_test_trials(data):

   new_data = list()
   for row in data:
       if row[-1] != 0: new_data.append(row)
   return new_data
  1. Make a smaller set without replacement

def smaller_set(data, n_rows):

   data_copy = data
   new_data = list()
   while len(new_data) < n_rows:
       index = randrange(0, len(data_copy))
   return new_data
  1. Convert string column to float

def str_column_to_float(dataset, column): for row in dataset: row[column] = float(row[column].strip())

  1. Convert string column to integer

def str_column_to_int(dataset, column): class_values = [row[column] for row in dataset] unique = set(class_values) lookup = dict() for i, value in enumerate(unique): lookup[value] = i for row in dataset: row[column] = lookup[row[column]] return lookup

  1. Split a dataset into k folds

def cross_validation_split(dataset, n_folds): dataset_split = list() dataset_copy = list(dataset) fold_size = int(len(dataset) / n_folds) for i in range(n_folds): fold = list() while len(fold) < fold_size: index = randrange(len(dataset_copy)) fold.append(dataset_copy.pop(index)) dataset_split.append(fold) return dataset_split

  1. Calculate accuracy percentage

def accuracy_metric(actual, predicted): correct = 0 for i in range(len(actual)): if actual[i] == predicted[i]: correct += 1 return correct / float(len(actual)) * 100.0

  1. Evaluate an algorithm using a cross validation split

def evaluate_algorithm(dataset, algorithm, n_folds, *args): folds = cross_validation_split(dataset, n_folds) scores = list() for fold in folds: train_set = list(folds) train_set.remove(fold) train_set = sum(train_set, []) test_set = list() for row in fold: row_copy = list(row) test_set.append(row_copy) #row_copy[-1] = None predicted = algorithm(train_set, test_set, *args) actual = [row[-1] for row in fold] accuracy = accuracy_metric(actual, predicted) scores.append(accuracy) return scores

  1. Split a dataset based on an attribute and an attribute value

def test_split(index, value, dataset): left, right = list(), list() for row in dataset: if row[index] < value: left.append(row) else: right.append(row) return left, right

  1. Calculate the Gini index for a split dataset

def gini_index(groups, classes): # count all samples at split point n_instances = float(sum([len(group) for group in groups])) # sum weighted Gini index for each group gini = 0.0 for group in groups: size = float(len(group)) # avoid divide by zero if size == 0: continue score = 0.0 # score the group based on the score for each class for class_val in classes: p = [row[-1] for row in group].count(class_val) / size score += p * p # weight the group score by its relative size gini += (1.0 - score) * (size / n_instances) return gini

  1. Select the best split point for a dataset

def get_split(dataset, n_features): class_values = list(set(row[-1] for row in dataset)) b_index, b_value, b_score, b_groups = 999, 999, 999, None features = list() while len(features) < n_features: index = randrange(len(dataset[0])-1) if index not in features: features.append(index) for index in features: for row in dataset: groups = test_split(index, row[index], dataset) gini = gini_index(groups, class_values) if gini < b_score: b_index, b_value, b_score, b_groups = index, row[index], gini, groups return {'index':b_index, 'value':b_value, 'groups':b_groups}

  1. Create a terminal node value

def to_terminal(group): outcomes = [row[-1] for row in group] return max(set(outcomes), key=outcomes.count)

  1. Create child splits for a node or make terminal

def split(node, max_depth, min_size, n_features, depth): left, right = node['groups'] del(node['groups']) # check for a no split if not left or not right: node['left'] = node['right'] = to_terminal(left + right) return # check for max depth if depth >= max_depth: node['left'], node['right'] = to_terminal(left), to_terminal(right) return # process left child if len(left) <= min_size: node['left'] = to_terminal(left) else: node['left'] = get_split(left, n_features) split(node['left'], max_depth, min_size, n_features, depth+1) # process right child if len(right) <= min_size: node['right'] = to_terminal(right) else: node['right'] = get_split(right, n_features) split(node['right'], max_depth, min_size, n_features, depth+1)

  1. Build a decision tree

def build_tree(train, max_depth, min_size, n_features): root = get_split(train, n_features) split(root, max_depth, min_size, n_features, 1) return root

  1. Make a prediction with a decision tree

def predict(node, row):

   if row[node['index']] < node['value']:
       if isinstance(node['left'], dict):
           return predict(node['left'], row)
           return node['left']
       if isinstance(node['right'], dict):
           return predict(node['right'], row)
           return node['right']
  1. Create a random subsample from the dataset with replacement

def subsample(dataset, ratio): sample = list() n_sample = round(len(dataset) * ratio) while len(sample) < n_sample: index = randrange(len(dataset)) sample.append(dataset[index]) return sample

  1. Make a prediction with a list of bagged trees

def bagging_predict(trees, row): predictions = [predict(tree, row) for tree in trees] return max(set(predictions), key=predictions.count)

  1. Random Forest Algorithm

def random_forest(train, test, max_depth, min_size, sample_size, n_trees, n_features):

   trees = list()
   for i in range(n_trees):
       sample = subsample(train, sample_size)
       tree = build_tree(sample, max_depth, min_size, n_features)
       print("Tree " + str(i) + " is done!")

#predictions = [bagging_predict(trees, row) for row in test]

  1. Test the random forest algorithm


dataname = 'k3b_s.txt' startname = 'k3b_HDR_TRIG.txt' labelname = 'k3b_HDR_Classlabel.txt'

  1. Creating the dataset

dataset = make_data(dataname, startname, labelname) print("Dataset is ready!") trainset = delete_test_trials(dataset) print("Trainset is ready!") smallset = smaller_set(trainset, 2000) print("Smallset is ready!")

  1. Tried this part to see if it helps, but it raises it's own error
  1. convert string attributes to floats
  2. for i in range(0, len(smallset[0])-1):
  3. str_column_to_float(smallset, i)
  1. convert class column to integers
  2. str_column_to_int(smallset, len(smallset[0])-1)
  1. evaluate algorithm

n_folds = 5 max_depth = 10 min_size = 1 sample_size = 1.0 n_features = int(sqrt(len(trainset[0])-1)) n_trees = 10

  1. scores = evaluate_algorithm(smallset, random_forest, n_folds, max_depth, min_size, sample_size, n_trees, n_features)
  2. print('Trees: %d' % n_trees)
  3. print('Scores: %s' % scores)
  4. print('Mean Accuracy: %.3f%%' % (sum(scores)/float(len(scores))))
  1. Make an output text file containing the random forest

forest = random_forest(smallset, smallset, max_depth, min_size, sample_size, n_trees, n_features) keys = forest[0].keys() with open('A forest.csv', 'w') as output_file:

   dict_writer = csv.DictWriter(output_file, keys)