Skip to content

Commit

Permalink
Add traffic light classifier for capstone project
Browse files Browse the repository at this point in the history
Simple traffic light classifier designed classify images collected
from Udacity simulator.
  • Loading branch information
ndrplz committed Oct 14, 2017
1 parent 1a62774 commit d8f32f8
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 0 deletions.
67 changes: 67 additions & 0 deletions capstone_traffic_light_classifier/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import tensorflow as tf
from traffic_light_dataset import TrafficLightDataset
from traffic_light_classifier import TrafficLightClassifier


if __name__ == '__main__':

# Parameters
n_classes = 4 # Namely `void`, `red`, `yellow`, `green`
input_h, input_w = 64, 64 # Shape to which input is resized

# Init traffic light dataset
dataset = TrafficLightDataset()
# dataset.init_from_files('../traffic_light_dataset', resize=(input_h, input_w))
# dataset.dump_to_npy('traffic_light_dataset.npy')
dataset.init_from_npy('traffic_light_dataset.npy')

# Placeholders
x = tf.placeholder(dtype=tf.float32, shape=[None, input_h, input_w, 3]) # input placeholder
p = tf.placeholder(dtype=tf.float32) # dropout keep probability
targets = tf.placeholder(dtype=tf.int32, shape=[None])

# Define model
classifier = TrafficLightClassifier(x, targets, p, n_classes, learning_rate=1e-4)

# Define metrics
correct_predictions = tf.equal(tf.argmax(classifier.inference, axis=1),
tf.argmax(tf.one_hot(targets, depth=n_classes), axis=1))
accuracy = tf.reduce_mean(tf.cast(correct_predictions, tf.float32))

with tf.Session() as sess:

# Initialize all variables
sess.run(tf.global_variables_initializer())

# Training parameters
batch_size = 32
batches_each_epoch = 1000

while True:

loss_cur_epoch = 0

for _ in range(batches_each_epoch):

# Load a batch of training data
x_batch, y_batch = dataset.load_batch(batch_size)

# Actually run one training step here
_, loss_this_batch = sess.run(fetches=[classifier.train_step, classifier.loss],
feed_dict={x: x_batch, targets: y_batch, p: 0.5})

loss_cur_epoch += loss_this_batch

loss_cur_epoch /= batches_each_epoch
print('Loss cur epoch: {:.04f}'.format(loss_cur_epoch))

# Eventually evaluate on whole test set when training ends
average_test_accuracy = 0.0
num_test_batches = 500
for _ in range(num_test_batches):
x_batch, y_batch = dataset.load_batch(batch_size)
average_test_accuracy += sess.run(fetches=accuracy,
feed_dict={x: x_batch, targets: y_batch, p: 1.})
average_test_accuracy /= num_test_batches
print('TEST accuracy: {:.03f}'.format(average_test_accuracy))
print('*' * 50)
70 changes: 70 additions & 0 deletions capstone_traffic_light_classifier/traffic_light_classifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import numpy as np
import tensorflow as tf


EPS = np.finfo('float32').eps


class TrafficLightClassifier:

def __init__(self, x, targets, keep_prob, n_classes, learning_rate):

self.x = x
self.targets = targets
self.keep_prob = keep_prob

self.n_classes = n_classes # {void, red, yellow, green}
self.learning_rate = learning_rate # learning rate used in train step

self._inference = None
self._loss = None
self._train_step = None
self._summaries = None

self.inference
self.loss
self.train_step
# self.summaries # todo add these

@property
def inference(self):
if self._inference is None:
with tf.variable_scope('inference'):

conv1_filters = 32
conv1 = tf.layers.conv2d(self.x, conv1_filters, kernel_size=(3, 3), padding='same', activation=tf.nn.relu)
pool1 = tf.layers.max_pooling2d(conv1, pool_size=(2, 2), strides=(2, 2), padding='same')

conv2_filters = 64
conv2 = tf.layers.conv2d(pool1, conv2_filters, kernel_size=(3, 3), padding='same', activation=tf.nn.relu)
pool2 = tf.layers.max_pooling2d(conv2, pool_size=(2, 2), strides=(2, 2), padding='same')

_, h, w, c = pool2.get_shape().as_list()
pool2_flat = tf.reshape(pool2, shape=[-1, h * w * c])

pool2_drop = tf.nn.dropout(pool2_flat, keep_prob=self.keep_prob)

hidden_units = self.n_classes
hidden = tf.layers.dense(pool2_drop, units=hidden_units, activation=tf.nn.relu)

logits = tf.layers.dense(hidden, units=self.n_classes, activation=None)

self._inference = tf.nn.softmax(logits)

return self._inference

@property
def loss(self):
if self._loss is None:
with tf.variable_scope('loss'):
predictions = self.inference
targets_onehot = tf.one_hot(self.targets, depth=self.n_classes)
self._loss = tf.reduce_mean(-tf.reduce_sum(targets_onehot * tf.log(predictions + EPS), reduction_indices=1))
return self._loss

@property
def train_step(self):
if self._train_step is None:
with tf.variable_scope('training'):
self._train_step = tf.train.AdamOptimizer(learning_rate=1e-4).minimize(self.loss)
return self._train_step
Binary file not shown.
101 changes: 101 additions & 0 deletions capstone_traffic_light_classifier/traffic_light_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import cv2
import numpy as np
from glob import glob
from tqdm import tqdm
from os.path import join


class TrafficLightDataset:

def __init__(self):
self.root = None
self.initialized = False
self.dataset_npy = None

def init_from_files(self, dataset_root, resize=(256, 256)):
""""
Initialize the dataset from a certain dataset location on disk
"""
self.root = dataset_root

self.dataset_npy = []
frame_list = glob(join(self.root, '*', '*.jpg'))

for frame_path in tqdm(frame_list, desc='Loading frames'):
frame = cv2.imread(frame_path)
frame = cv2.resize(frame, resize[::-1]) # cv2 invert rows and cols
label = self.infer_label_from_frame_path(frame_path)
self.dataset_npy.append([frame, label])

self.initialized = True

def init_from_npy(self, dump_file_path):
"""
Initialize the dataset from a previously created `.npy` dump
"""
self.dataset_npy = np.load(dump_file_path)
self.initialized = True

def dump_to_npy(self, dump_file_path):
"""
Dump the initialized dataset to a binary `.npy` file
"""
if not self.initialized:
raise IOError('Please initialize dataset first.')
np.save(dump_file_path, self.dataset_npy)

def load_batch(self, batch_size):

if not self.initialized:
raise IOError('Please initialize dataset first.')

X_batch, Y_batch = [], []

loaded = 0
while loaded < batch_size:
idx = np.random.randint(0, len(self.dataset_npy))
x = self.dataset_npy[idx][0]
y = self.dataset_npy[idx][1]

X_batch.append(x)
Y_batch.append(y)

loaded += 1

X_batch = self.preprocess(X_batch)

return X_batch, Y_batch

@staticmethod
def preprocess(x):
"""
Roughly center on zero and put in range [-1, 1]
"""
x = np.float32(x) - np.mean(x)
x /= x.max()
return x

@staticmethod
def infer_label_from_frame_path(path):
label = -1
if 'none' in path:
label = 0
elif 'red' in path:
label = 1
elif 'yellow' in path:
label = 2
elif 'green' in path:
label = 3
return label

def print_statistics(self):

if not self.initialized:
raise IOError('Please initialize dataset first.')

color2label = {'none': 0, 'red': 1, 'yellow': 2, 'green': 3}

statistics = {}
for (color, num_label) in color2label.items():
statistics[color] = np.sum(self.dataset_npy[:, 1] == num_label)
print(statistics)

0 comments on commit d8f32f8

Please sign in to comment.