Post

Real-Time Mobile Usage Detection in Industrial Settings

Hello! I am super excited to share a recent project that I did (along with 2 other students at Habib University) in collaboration with a major car manufacturer in Pakistan. While I cannot share the entire codebase or any demo of the product due to NDA (non-disclosure agreement with the company), I will try to give a glimpse of my journey and the idea behind the project and also share some code snippets at last if someone is interested in reproducing the work. Our goal was to create a real-time mobile usage detection system for industrial safety (factories and construction sites). Our goal was to develop a system that could automatically identify and flag these violations, helping to make workplaces safer. So, grab a coffee, and let’s get started!

light mode only dark mode only

light mode only dark mode only

What Were We Trying to Achieve?

Our project aimed to tackle the problem of mobile phone usage in industrial environments, which can be a major safety hazard. Traditional methods of enforcing safety policies rely on active human supervision, which is labor-intensive and not always reliable. We wanted to build an automated system that could monitor workers in real-time, detect mobile phone usage, and flag violations without needing constant human oversight.!

The requirement of our system were:

  • Real-Time inference: We wanted a system that could process video feeds and detect violations instantaneously.
  • Is Highly Accurate: It was essential that the system was accurate in identifying both workers and mobile phone usage.
  • Works without GPU: The system had to be efficient enough to run on affordable hardware.
  • Reduces False Alarms: We wanted to minimize false alarms to reduce the burden on human supervisors.
  • Is Semi-Autonomous: Our idea was that flagged instances could be reviewed by an individual to make the final determination of a violation.

How Did We Do It?

Our approach involved a pipeline consisting of four main stages, each designed to contribute to the overall goal of detecting mobile phone usage while walking:

  1. Human and Pose Tracking:
    • We used the YOLOv11-pose model, which is excellent for detecting humans in video feeds, and it also provides us with their pose information, meaning it gives us data about where their joints are located. For tracking individuals between frames we used Roboflow Supervision’s ByteTracker, making sure each person was tracked correctly.
  2. Walking Detection:
    • To reduce false positives, we added a check to see if the tracked individual was walking. The system measures the distance traveled by each individual over time. If the movement is significant, the individual is flagged as walking.
  3. Pose Classification:
    • For this, we used a new network with two hidden layers. This network looks at the pose information and classifies whether or not the person is using a mobile phone. We trained it using labeled images of people with and without mobile phones. We bulk downloaded images from google and did manual labelling. This model was not powerful and it wasn’t meant to be powerful, since there is a slim chance that a person who is standing in a pose similar to a pose as they would have while using mobile phone, without actually using smartphone. This method had several drawbacks like class imbalance but was the optimal choice keeping all requirements in mind.
  4. Violation Classification:
    • Finally, if the person is both walking and using a phone, we flag it as a violation. This combined check dramatically reduces the number of false alarms.

Why This Methodology?

  • YOLO models are known for being fast and lightweight which was crucial for real-time operation.
  • Using pose information allows us to focus on the key features of a person, making our classifier more efficient and accurate.
  • We made sure that we combined walking and mobile phone usage to make sure our classifications are more specific.

The other option was to go with a single powerful model but neither we had such an extensive dataset nor enough computation to make it real-time.

Challenges and Decisions Along the Way

As with any project, we faced our fair share of challenges, and we had to make tough decisions. Here are some key points:

  • Data Collection: Getting a large, diverse dataset is tricky. Our initial dataset consisted of 3 minutes of violation data and 27 minutes of natural data. It was great, but not diverse enough, so our results weren’t as generalizable as we would have like. But this class imbalance was natural and maybe doing a 50-50 would potentially severe other metrics. We also focused on a single camera source, which helped us get a decent amount of data, but it limited our ability to generalize to other environments and camera angles.
  • Occlusion: People blocking each other or parts of a person being hidden by objects, also posed a challenge to accurately estimating the pose of a person.
  • Balancing Speed and Accuracy: It was crucial to ensure our model was fast enough to be used in a real-time setting while maintaining a high level of accuracy.For this went for the YOLO models which gave us the best balance of speed and performance.
  • False Positives: The data was imbalanced with far more instances of non-violations. This led to the model being more likely to misclassify non-violations as violations. The two stage approach of requiring both mobile use and walking resulted in a reduction of false positives.
  • Primary Research Paper: The DeepPose (by Google) paper architecture and its use of a cascade of DNN regressors to predict body joints helped us in making decisions about our pose estimation methodology.

Did It Work?

Our system performed well, especially considering the challenges we faced:

  • High Pose Classifier Accuracy: Our pose classifier achieved an accuracy of 96.75% on the test data set.
  • Good Overall Accuracy: The entire pipeline achieved a test accuracy of 92.9%.
  • Real-Time Speed: The system runs at a processing speed of 85 FPS, which is great for real-time applications.
  • High Recall: The system’s high recall rate of 94.7% means that it rarely misses a violation.
  • Low Precision: Due to the imbalanced data, the precision was relatively low at 0.367, resulting in a higher number of false positives.

These results show that our system can be quite effective at detecting mobile phone usage in industrial settings, although there is still room for improvement, especially in terms of precision.

Parts of Project

  1. Data Pipeline (for pose classifier training):
    • We first converted the images raw training data into 32 vector poses in a CSV file. We used around a 1000 images out of which 300 were violation images. The ideal scenario would be 50-50 positive and negative cases, but if we look from a mathematical perspective, the vector space of mobile usage poses is finite set of infinite ranges whereas the vector space of all poses is infinite set of infinite ranges. The preprocessing included extracting the bounding boxes of people from those image and re-scale all images as 250x250 pixel images.
  2. Model Training:
    • The sample snippet is given is used to extract the upper body pose (joints) vector. The script iterates through images, applies the pose model and saves the normalized keypoints to a CSV file.
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      
      for label, dir in [(1, "Positive"), (0, "Negative")]:
      for root, _, files in os.walk("data/" + dir):
          for file in files:
              if file.endswith(".jpg"):
                  img_path = os.path.join(root, file)
                  img_data = cv2.imread(img_path)
                  result = model(img_data)
                  keypoints = result.keypoints.xy.cpu().numpy().squeeze()
                  if len(keypoints) < 17:
                      continue
                  waist = keypoints
                  left_shoulder = keypoints
                  right_shoulder = keypoints
                  shoulder_distance = np.linalg.norm(left_shoulder - right_shoulder)
                  normalized_keypoints = (keypoints - waist) / shoulder_distance
                  row = [label] + list(normalized_keypoints.flatten())
                  results.append(row)
                  i += 1
      
    • We then train a feed-forward neural network with two hidden layers on the above curated dataset. The model architecture is as follows:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      
      class PoseClassifier(nn.Module):
      def __init__(self, input_dim):
          super(PoseClassifier, self).__init__()
          self.fc = nn.Sequential(
              nn.Linear(input_dim, 128),
              nn.ReLU(),
              nn.Dropout(0.3),
              nn.Linear(128, 64),
              nn.ReLU(),
              nn.Dropout(0.3),
              nn.Linear(64, 1),
              nn.Sigmoid()
          )
              
      def forward(self, x):
          return self.fc(x)
      
  3. Inference Pipeline:
    • A high-level code snippet of inference callback is shown below:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      
      def analysis_callback(frame: np.ndarray, _: int) -> np.ndarray:
      global frame_counter, show_pose, show_only_violations, total_frames, violation_detections, violation_labels, walking_detections, walking_labels, normal_detections, normal_labels
      total_frames += 1
      frame_counter = (frame_counter + 1) % frames_per_inference
      if frame_counter != 0:
          for detections, labels, color in [
              (violation_detections, violation_labels, sv.Color.RED),
              (walking_detections, walking_labels, sv.Color.YELLOW),
              (normal_detections, normal_labels, sv.Color.GREEN)
          ]:
              frame = annotate_detections(frame, detections, labels, color)
          return frame
              
      current_time = time.time()
              
      # Get person detections
      results_pose = model(frame, verbose=False)[0]
      detections = sv.Detections.from_ultralytics(results_pose)
      detections = detections[detections.class_id == 0]
      detections = tracker.update_with_detections(detections)
      
      violation_boxes, walking_boxes, normal_boxes = [], [], []
      violation_confidences, walking_confidences, normal_confidences = [], [], []
      violation_class_ids, walking_class_ids, normal_class_ids = [], [], []
      violation_labels, walking_labels, normal_labels = [], [], []
              
      def add_filtered(filt_boxes, filt_confi, filt_class, detections, i):
          filt_boxes.append(detections.xyxy[i])
          filt_confi.append(detections.confidence[i])
          filt_class.append(detections.class_id[i])
                  
      # Process each detection
      for i, track_id in enumerate(detections.tracker_id):
          if track_id is None:
              continue
                      
          # Get current position for walking detection
          xyxy = detections.xyxy[i]
          current_pos = ((xyxy[0] + xyxy[2])/2, (xyxy[1] + xyxy[3])/2)
          box_width = xyxy[2] - xyxy[0]
                  
          # Check if walking
          is_walking_now = is_walking(track_id, current_pos, box_width, current_time)
                  
          # New person - new history
          if track_id not in mobile_usage_history:
              mobile_usage_history[track_id] = []
                  
          using_mobile = False
          if results_pose.keypoints is not None and i < len(results_pose.keypoints):
              keypoints = results_pose.keypoints[i].xy.cpu().numpy().squeeze()
              using_mobile = is_using_mobile(keypoints)
                  
          # Update history
          mobile_usage_history[track_id].append(using_mobile)
          if len(mobile_usage_history[track_id]) > HISTORY_FRAMES:
              mobile_usage_history[track_id].pop(0)
                  
          # Only show violation if majority of recent frames show mobile usage and person has been present for enough frames
          if sum(mobile_usage_history[track_id]) > len(mobile_usage_history[track_id]) * (1 / 5) and len(mobile_usage_history[track_id]) > HISTORY_FRAMES*2 // 3 and is_walking_now:
              add_filtered(violation_boxes, violation_confidences, violation_class_ids, detections, i)
              violation_labels.append(f"{track_id}: Mobile Usage Violation")
          elif is_walking_now and not show_only_violations:
              add_filtered(walking_boxes, walking_confidences, walking_class_ids, detections, i)
              walking_labels.append(f"{track_id}: Walking")
          else:
              add_filtered(normal_boxes, normal_confidences, normal_class_ids, detections, i)
              normal_labels.append("Normal")
                      
      def create_detections(boxes, confidences, class_ids):
          if len(boxes) == 0:
              return sv.Detections(xyxy=np.empty((0, 4)), confidence=np.empty((0,)), class_id=np.empty((0,))) # Nothing to draw
          else:
              return sv.Detections(xyxy=np.array(boxes), confidence=np.array(confidences), class_id=np.array(class_ids))
              
      violation_detections = create_detections(violation_boxes, violation_confidences, violation_class_ids)
      walking_detections = create_detections(walking_boxes, walking_confidences, walking_class_ids)
      normal_detections = create_detections(normal_boxes, normal_confidences, normal_class_ids)
              
      for detections, labels, color in [
          (violation_detections, violation_labels, sv.Color.RED),
          (walking_detections, walking_labels, sv.Color.YELLOW),
          (normal_detections, normal_labels, sv.Color.GREEN)
      ]:
          frame = annotate_detections(frame, detections, labels, color)
                  
      # Draw pose keypoints and connections
      if show_pose:
          if results_pose.keypoints is not None:
              for keypoints in results_pose.keypoints:
                  # Check if keypoints data exists and is not empty
                  if keypoints.data.shape[0] == 0:
                      continue
                              
                  points = keypoints.data[0].cpu().numpy()
                          
                  # Draw connections first (so they appear behind the points)
                  for p1_idx, p2_idx in POSE_CONNECTIONS:
                      # Add bounds checking
                      if (p1_idx < len(points) and p2_idx < len(points) and 
                          points[p1_idx][0] > 0 and points[p2_idx][0] > 0):  # Check if points are valid
                          pt1 = (int(points[p1_idx][0]), int(points[p1_idx][1]))
                          pt2 = (int(points[p2_idx][0]), int(points[p2_idx][1]))
                          cv2.line(frame, pt1, pt2, (0, 255, 0), 2)
                          
                  # Draw points on top of connections
                  for point in points:
                      x, y = point[:2]
                      if x > 0 and y > 0:  # Only draw valid points
                          cv2.circle(frame, (int(x), int(y)), 4, (0, 255, 0), -1)
                          # End of Selection
      

Final Thoughts

I am happy for what I have achieved with this project. This is not anything ground-breaking or so but it feels nice if your code is being actually used somewhere. I hope I am able to do more interesting projects in the future. Open to ideas!

This post is licensed under CC BY 4.0 by the author.