A simple autofocus application in Python

 · 15 mins read

Hello friends! This tutorial is about using raspberry pi to control the focus level of a webcam. Now the question is how to do that? We are using a normal webcam which has manual turning option to adjust is focus level. We can convert this webcam to a useful application to inspect Printed Circuit Boards PCBs or even for chat. To do this we will use the Lego gears to rotate the lense of the webcam. The motor attached to this Lego gears is controlled by Raspberrypi via an interface. The motor can send control signals like Stop, Forward and Back. The next question is how to know when to rotate the lens clockwise, counter-clockwise and when to stop?

To answer this question we will use an image processing algorithm in PC. This algo. will receive the video frames from camera via usb and find the Laplacian variance in video frames. Note that the blur image has less edges and hence less variance so it will be of low focus level. The sharper image has more edges and hence larger value of laplacian variance. The algo. will act as an agent and take the laplacian variance of current frame and the previous frame. The agent will take an initial forward action and check after this action the current variance is higher than previous, if yes then this action will be given a score +=1 otherwise it will be a score of -=1.

The block diagram shows main setup.

Here is the server.py code that will run on raspberry pi zero w.

server.py


import  time
import threading
import socket, pickle, struct
import RPi.GPIO as io


server_ip = '192.168.10.1'
port = 8000


time.sleep(1)

FORWARD = 6
BACK =  27
LEFT = 26
RIGHT = 22
STRAIGHT = 'STRAIGHT'
STOP = 'STOP'

map_motion =  {
        FORWARD: "FORWARD",
        BACK : "BACK",
        LEFT : "LEFT",
        RIGHT : "RIGHT",
        STRAIGHT : 'STRAIGHT',
        STOP : 'STOP'
        
        }

my_list =[FORWARD,BACK,LEFT,RIGHT]

def run_for (pin,speed=0):
    print(f"action-- {map_motion[pin]}")
    
    if pin != STRAIGHT and pin != STOP:
        io.cleanup()
        io.setmode(io.BCM)
            
        io.setup(pin, io.OUT)
        io.output(pin, True)
        io.setup(FORWARD, io.OUT)
        io.output(FORWARD, True)
        
    elif pin == STRAIGHT:
        io.cleanup()
        io.setmode(io.BCM)
        io.setup(LEFT, io.OUT)
        io.output(LEFT, False)
        io.setup(RIGHT, io.OUT)
        io.output(RIGHT, False)
        io.setup(FORWARD, io.OUT)
        io.output(FORWARD, True)
    elif pin == STOP:
        for x in my_list:
        
            io.setup(x, io.OUT)
            io.output(x, False)
        
def init():
    io.setmode(io.BCM)              
    io.setup(FORWARD,io.OUT)  
    io.setup(BACK,io.OUT)
    io.setup(LEFT,io.OUT)
    io.setup(RIGHT,io.OUT)
    for x in my_list:
        io.output(pin, False)
    io.cleanup()
             
global pin 
pin = FORWARD
init()


server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
host_name  = socket.gethostname()
host_ip = server_ip
print('HOST IP:',host_ip)
port = port+1
socket_address = (host_ip,port)
server_socket.bind(socket_address)
server_socket.listen()
print("Listening at",socket_address)

def show_client(addr,client_socket):
    global pin 
    prev_move = None
    frame= 'stop'
    try:
        print('CLIENT {} CONNECTED!'.format(addr))
        if client_socket: # if a client socket exists
            data = b""
            payload_size = struct.calcsize("Q")
            while True:
                while len(data) < payload_size:
                    packet = client_socket.recv(4*1024) # 4K
                    if not packet: break
                    data+=packet
                packed_msg_size = data[:payload_size]
                data = data[payload_size:]
                msg_size = struct.unpack("Q",packed_msg_size)[0]
                
                while len(data) < msg_size:
                    data += client_socket.recv(4*1024)
                frame_data = data[:msg_size]
                data  = data[msg_size:]
                frame = pickle.loads(frame_data)
                
                if frame == 'forward':
                    pin = FORWARD
                    run_for(pin)
                    prev_move = pin
                elif  frame == 'back':
                    pin = BACK
                    run_for(pin)
                    prev_move = pin
                elif frame == 'left':
                    pin = LEFT
                    run_for(pin)
                    prev_move = pin
                elif frame == 'right':
                    pin = RIGHT
                    run_for(pin)
                    prev_move = pin
                elif frame == 'straight':
                    pin = STRAIGHT
                    run_for(pin)
                    prev_move = pin
                elif frame == 'stop':
                    print(frame)
                    io.setmode(io.BCM)
                    io.setup(pin, io.OUT)
                    io.output(pin, False)
                    io.cleanup()
                
                
    except Exception as e:
        print(f"CLINET {addr} DISCONNECTED")
        pass
        
while True:
    client_socket,addr = server_socket.accept()
    thread = threading.Thread(target=show_client, args=(addr,client_socket))
    thread.start()
    print("TOTAL CLIENTS ",threading.activeCount() - 2)
    time.sleep(0.1)

Once the server starts listening at its socker address, we can run the client.py on PC.

Here is the client.py code:

# Autofocus application with Python
import cv2, os
import _thread
import socket,pickle,struct
import time
import numpy as np

from collections import deque
vid = cv2.VideoCapture(1)

def cam_run():
    server_ip = '192.168.10.1'
    port = 8000
    queue = deque([],maxlen=7)
    i=0
    cnt=0
    frames_to_count = 40
    client_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    host_ip = server_ip # Here according to your server ip write the address
    port = port +1
    client_socket.connect((host_ip,port))
    msg = 'forward'
    forward_done = False
    stop_cnt=0
    _cx,_cy =50,50
    ret, frame = vid.read()
    prev_l = 0
    print(str(0)+',' +str(0),  file=open('file.csv','w')) 
    score={'forward':50,'back':50}
    prev_action='forward'
    if client_socket: 
        while(True):
            ret, frame = vid.read()
            img_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) 
            H,W = img_gray.shape
            img_gray = img_gray[H//4:(H//2+H//4),W//4:(W//2+W//4) ]
            color = (0, 242, 255)
            thickness = 1

            horizontal_sp = (W//4, H//2)
            horizontal_ep = (W//4+20, H//2)
            frame = cv2.line(frame, horizontal_sp, horizontal_ep, color, thickness)
            
            horizontal_sp = (W//2+W//4-20, H//2)
            horizontal_ep = (W//2+W//4, H//2)
            frame = cv2.line(frame, horizontal_sp, horizontal_ep, color, thickness)
            
            vertical_sp = (W//2, H//4)
            vertical_ep = (W//2, H//4+20)
            frame = cv2.line(frame, vertical_sp, vertical_ep, color, thickness)
            
            vertical_sp = (W//2, H//2+H//4-20)
            vertical_ep = (W//2, H//2+H//4)
            frame = cv2.line(frame, vertical_sp, vertical_ep, color, thickness)
                        
            start_point = (W//4, H//4)
            end_point = (W//4 +W//2 ,H//4 +H//2 )
            frame = cv2.rectangle(frame, start_point, end_point, color, thickness)
        
            laplacian_var = round(cv2.Laplacian(img_gray, cv2.CV_64F).var(),4)
            cv2.putText(frame, 'Focus level: '+str(int(laplacian_var)), (20, 40), fontFace=1, fontScale=2, color=color, thickness=2)
            queue.append(laplacian_var)
            cv2.imshow('PyShine AutoFocus', frame)
            
            if cnt == frames_to_count//4:
                score={'forward':50,'back':50}
            
            if cnt == frames_to_count:
                laplacian_var = np.median(queue)
                if laplacian_var>prev_l:
                    score[prev_action]+=1
                else:
                    score[prev_action]-=1
                    
                if score['forward']>score['back']:
                    msg ='forward'
                else:
                    msg = 'back'
                
                prev_action = msg
                if laplacian_var>500 or laplacian_var < 1:
                    msg = 'stop'

                    
                a = pickle.dumps(msg)
                message = struct.pack("Q",len(a))+a
                client_socket.sendall(message)
                
                cnt=0
                print(str(i)+',' +str(laplacian_var),  file=open('file.csv','a')) 
                prev_l = laplacian_var
                print(laplacian_var,msg, score)
            if i==3000:
                print(str(i)+',' +str(laplacian_var),  file=open('file.csv','w')) 
                i=0
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            i+=1
            cnt+=1

def plot():
    os.system('python plot.py')
_thread.start_new_thread(plot,())
cam_run()
vid.release()
cv2.destroyAllWindows() 

The client code will also call the plot.py code, to read and plot the csv in real-time. Please note that set the proper version of python or python3 here os.system('python plot.py').

Just keep the plot.py in the same PC and same directory as client.py code.

Here is the plot.py code.

import matplotlib.pyplot as plt
import matplotlib.animation as animation

fig = plt.figure()
ax = fig.add_subplot(1,1,1)
ax.set_facecolor((0,0,0)) 
def animate(i):
	ax.clear()
	xs = []
	ys = []
	graph_data = open('file.csv','r').read()
	lines = graph_data.split('\n')
	for line in lines[1:]:
		if len(line) > 1: 
			x, y = line.split(',')
			xs.append(float(x))
			ys.append(float(y))
	ax.clear()
	ax.plot(xs, ys,'-o', color = (0,1,0.25))
	ax.set_ylim( ymin=0, ymax=1800)
	ax.set_xlabel("Samples")
	ax.set_ylabel("")
	ax.set_title("Auto Focus")
	fig.tight_layout() # To remove outside borders
	ax.yaxis.grid(True)
ani = animation.FuncAnimation(fig, animate, interval=100)
plt.show()