Multi-threaded complex processing pipelinesΒΆ
The example below shows how GIFT-Grab can be used for running complex pipelines with multiple intermediate processing nodes and threads. The intermediate processing nodes in this example are built on the same principles as in the Processing video frames with SciPy / NumPy example. Running the example requires an HEVC-encoded MP4 file, an NVENC-capable GPU, and NumPy support.
The full source code of the example is below. Please follow the comments and the flow of code. This example is also available on GitHub:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Example showing a complex GIFT-Grab pipeline with
multiple intermediate processing nodes.
"""
from time import (sleep, time)
import argparse
import os.path
import threading
import numpy as np
import scipy.misc
from pygiftgrab import (VideoSourceFactory, VideoFrame, Device,
ColourSpace, IObservableObserver,
VideoTargetFactory, Codec, IObserver)
# global NumPy buffers
np_buffer_red, np_buffer_orig = None, None
# mutex protecting frame data passed from node to node
# in the GIFT-Grab processing pipeline
lock = threading.Lock()
class SnapshotSaver(IObserver):
"""A snapshot saver for saving incoming frames to PNG files."""
def __init__(self, root_dir, save_freq=5):
"""
Initialise a snapshot saver with a saving frequency.
:param root_dir: the folder where to save the snapshots
:param save_freq: saving frequency. The default value tells
the saver to save every 5 seconds.
"""
super(SnapshotSaver, self).__init__()
assert 5 <= save_freq # to avoid flooding disk with images
self.save_freq = save_freq
self.root_dir = root_dir
self.last_saved = time()
self.num_saved = 0
def update(self, frame):
"""Implement ``IObserver.update``."""
if time() - self.last_saved >= self.save_freq:
self.num_saved += 1
out_file = os.path.join(self.root_dir,
'frame-{:010d}.png'.format(self.num_saved))
scipy.misc.imsave(out_file, frame.data(True))
self.last_saved = time()
class Bufferer(IObservableObserver):
"""GIFT-Grab processing node that updates a frame buffer."""
def __init__(self, np_buffer):
"""Initialise a bufferer that will update given buffer."""
super(Bufferer, self).__init__()
self.buffer = np_buffer
def update(self, frame):
"""Implement ``IObservableObserver.update``."""
with lock:
data = frame.data(True)
self.buffer[:, :, :] = data[:, :, :]
class Histogrammer(threading.Thread):
"""
GIFT-Grab processing node that computes the histogram of
an image channel and prints how "colored" that channel is.
"""
channels = ('Blue', 'Green', 'Red', 'Alpha')
def __init__(self, np_buffer, channel, tag, frame_rate, display_freq):
"""
:param np_buffer: image buffer to use
:param channel: image channel to compute coloredness for
:param tag: a tag describing what this image is, e.g.
how it's been processed within a GIFT-Grab pipeline
:param frame_rate: the rate at which to compute the
coloredness (unit: frames-per-second)
:param display_freq: how many times to skip the display
of computed coloredness, e.g. if 5 is provided, the
coloredness of every 5th frame will be printed to the
console
"""
super(Histogrammer, self).__init__()
assert channel in range(3)
assert 0 < frame_rate <= 60
assert 0 <= display_freq
self.channel = channel
self.buffer = np_buffer
self.tag = tag
self.display_freq = display_freq
self.num_skipped = 0
self.sleep_interval = 1.0 / frame_rate
self.running = False
def run(self):
"""Override ``Thread.run``."""
if self.running:
return
histogram, num_bins = None, 10
scale = np.array([i for i in range(1, num_bins + 1)], np.float)
self.running = True
while self.running:
with lock:
histogram, _ = np.histogram(
self.buffer[:, :, 2], bins=num_bins, range=(0, 256)
)
if histogram is not None:
coloredness = np.sum(histogram * scale)
coloredness /= np.sum(histogram)
coloredness /= num_bins
if self.num_skipped >= self.display_freq:
print('{}ness of {} image: {:.0%}'.format(
Histogrammer.channels[self.channel],
self.tag, coloredness
))
self.num_skipped = 0
else:
self.num_skipped += 1
sleep(self.sleep_interval)
def stop(self):
"""Stop the thread."""
self.running = False
class Dyer(IObservableObserver):
"""Dyes specified channel of an incoming video frame."""
def __init__(self, channel, increment):
"""
:param channel: image channel to dye
:param increment: how much to dye the image channel
"""
super(Dyer, self).__init__()
assert 0 <= channel < 3
assert 0 <= increment < 256
self.channel = channel
self.increment = increment
def update(self, frame):
"""Implement ``IObservableObserver.update``."""
with lock:
data = frame.data(True)
channel_data = data[:, :, self.channel]
channel_data[channel_data < 255 - self.increment] += self.increment
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--input', type=str, required=True,
metavar='VIDEO_INPUT',
help='decklink (for grabbing frames from a Blackmagic DeckLink 4K Extreme 12G),\n'
'dvi2pcie (for grabbing frames from an Epiphan DVI2PCIe Duo SDI port\n'
' or a video file (HEVC-encoded MP4)')
args = parser.parse_args()
video_input = args.input
sfac = VideoSourceFactory.get_instance()
if video_input == 'decklink':
filename, ext = 'decklink', '.mp4'
# start acquiring frames from a DeckLink 4K Extreme 12G
source = sfac.get_device(Device.DeckLink4KExtreme12G, ColourSpace.BGRA)
elif video_input == 'dvi2pcie':
filename, ext = 'dvi2pcie', '.mp4'
# start acquirigin frames from an Epiphan DVI2PCIe Duo SDI port
source = sfac.get_device(Device.DVI2PCIeDuo_SDI, ColourSpace.BGRA)
else:
filename = os.path.basename(video_input)
filename, ext = os.path.splitext(filename)
assert filename
assert ext == '.mp4'
# initialise reading of passed file
source = sfac.create_file_reader(video_input, ColourSpace.BGRA)
frame = VideoFrame(ColourSpace.BGRA, False)
source.get_frame(frame)
frame_shape = (frame.rows(), frame.cols(), 4)
# prepare for creating encoders (writers)
tfac = VideoTargetFactory.get_instance()
frame_rate = source.get_frame_rate()
# create a red and green Dyer
red_dyer = Dyer(2, 128)
green_dyer = Dyer(1, 64)
# create the bufferer for the red and green Dyers
np_buffer_red = np.zeros(frame_shape, np.uint8)
bufferer_red = Bufferer(np_buffer_red)
np_buffer_orig = np.zeros_like(np_buffer_red)
bufferer_orig = Bufferer(np_buffer_orig)
# create the histogrammers for the red-dyed and
# the original video frames, and start them
hist_red = Histogrammer(np_buffer_red, 2, 'red-dyed', 60.0, 10)
hist_red.start()
hist_orig = Histogrammer(np_buffer_orig, 2, 'original', 50.0, 10)
hist_orig.start()
# create encoders for the red-dyed and yellow-dyed (as green
# is applied on top of red) video streams
red_file = os.path.join('.', ''.join([filename, '-red', ext]))
red_writer = tfac.create_file_writer(Codec.HEVC, red_file, frame_rate)
yellow_file = os.path.join('.', ''.join([filename, '-yellow', ext]))
yellow_writer = tfac.create_file_writer(Codec.HEVC, yellow_file, frame_rate)
# create a SnapshotSaver for saving a number of yellow-dyed
# video frames
yellow_snapshots = SnapshotSaver('.', 9)
# assemble the GIFT-Grab pipeline
source.attach(bufferer_orig)
bufferer_orig.attach(red_dyer)
red_dyer.attach(red_writer)
red_dyer.attach(bufferer_red)
red_dyer.attach(green_dyer)
green_dyer.attach(yellow_writer)
green_dyer.attach(yellow_snapshots)
sleep(20) # operate pipeline for 20 sec
# stop the histogrammers
hist_red.stop()
hist_orig.stop()
# disassemble the GIFT-Grab pipeline
source.detach(bufferer_orig)
bufferer_orig.detach(red_dyer)
red_dyer.detach(red_writer)
red_dyer.detach(bufferer_red)
red_dyer.detach(green_dyer)
green_dyer.detach(yellow_writer)
green_dyer.detach(yellow_snapshots)