Youtube Without Youtube
Isn’t it odd how subscribing to a channel on Youtube means telling Youtube to aggregate that content on your behalf? Perhaps I’m spoiled by RSS where I can configure my own software to download content listings from sites I subscribe to. But in the cloud model, I have to trust Google to manage my subscriptions. As far as I can tell there is no benefit for me the user in this arrangement, and in fact it is a detriment because I lose control and become dependent on some corporation to faithfully keep my best interests in the forefront.
To alleviate my ails, I wrote a python script called utube
(code included at the end of this page). The dependencies are youtube-dl and mplayer (though mplayer could be swapped out if need be). The intention of this project is multi-fold:
- Move the management of subscriptions and marking content as watched from the cloud to the local machine to drastically improve independence and privacy.
- Integrate multiple video platforms (e.g., youtube and bitchute) into one frontend to improve interoperability.
- Eliminate dependence on the webapp for retrieval and playback of the video to improve performance.
The utube script maintains and updates a list of videos in a .utube
file. Here are a few examples of subscribing to one of my favourite youtubers, The Hated One:
Subscribe on Youtube
The url for The Hated One’s channel on Youtube is https://www.youtube.com/channel/UCjr2bPAyPV7t35MvcgT3W8Q/videos
. I can be subscribe using the following command:
$ utube -a https://www.youtube.com/channel/UCjr2bPAyPV7t35MvcgT3W8Q/videos
Subscribe on Bitchute
The url for The Hated One’s channel on Bitchute is https://www.bitchute.com/channel/thehatedone/
. Using this url the same way as subscribing on Youtube will work, but updating the listing of videos is slow using Bitchute’s API. Luckily, Bitchute provides a RSS feed at https://www.bitchute.com/feeds/rss/channel/thehatedone/
(this pattern of inserting /feeds/rss/
in the URL works in general on Bitchute).
$ utube -a https://www.bitchute.com/feeds/rss/channel/thehatedone/
Subscribe Elsewhere
If you wish to subscribe to someone on a platform supported by youtube-dl but which has a prohibitively slow API, then it is easy to set up your own RSS feed on your server!
root@server # utube -a https://www.prohibitively-slow-api.com/channel/someone/
root@server # mkdir /var/www/rss/
root@server # crontab -e
[inside editor]
30 * * * * utube -u && cd /var/www/rss && utube -x
Then, on your client machine:
$ utube -a https://subdomain.domain.com/rss/someone.xml
The source code
#!/usr/bin/env python3
#
# Version 1.3
# https://vance.homelinuxserver.org/coolstuff/youtube-without-youtube.html
#
import argparse
import subprocess
from pathlib import Path
import os
import getpass
import re
import html
import json
import xml.etree.ElementTree as ET
CONF= os.path.expanduser('~/.utube')
parser = argparse.ArgumentParser(description='Play vids made by your favourite creators!')
add = parser.add_argument_group('add', 'Add a creator to be tracked by utube')
add.add_argument('-a', '--add', metavar='URL', help='The url of the channel/playlist/RSS feed to add')
parser.add_argument('-u', '--update', action='store_true', help='Update listing of videos')
parser.add_argument('-U', '--forceupdate', action='store_true', help='Clear and re-download listing of videos')
parser.add_argument('-x', '--xml', action='store_true', help='Dump listing of videos in XML format (for RSS)')
parser.add_argument('-l', '--list', action='store_true', help='List videos available')
parser.add_argument('-w', '--watch', nargs=1, type=int, help='Watch a specific video')
parser.add_argument('-L', '--listen', nargs=1, type=int, help='Listen to a specific video')
parser.add_argument('-d', '--download', nargs=1, type=int, help='Download a specific video')
parser.add_argument('-W', metavar='URL', help='Watch a specific url')
parser.add_argument('--markAllWatched', action='store_true', help='Mark all videos as watched')
parser.add_argument('--unwatch', nargs=1, type=int, help='Mark a specific video as unwatched')
parser.add_argument('--autosync', metavar='HOST', help='Name of host to sync .utube file with (operates over ssh)')
args = parser.parse_args()
def meAtMyself():
return '{}@{}'.format(getpass.getuser(), os.uname()[1])
def readConf():
utubrs = {} # { (url, name) : [ {'title': title, 'url': url, 'watched': True/False], ...] }
syncDests = [meAtMyself()] # [ "`whoami`@`hostname`", "user1@host1", ... ]
# Ensure it exists!
Path(CONF).touch()
with open(CONF) as f:
lines = f.read().split('\n')
if lines[0] == 'SYNCDESTS':
end = lines.index('')
hosts = lines[1:end]
lines = lines[end+1:]
for host in hosts:
if host not in syncDests:
syncDests.append(host)
utubr = None
for line in lines:
if not utubr and line:
utubr = tuple(line.rsplit('&', 1))
utubrs[utubr] = []
elif not line:
utubr = None
else:
(title, url, watched) = line.rsplit('&', 2)
if watched == 'True':
watched = True
else:
watched = False
utubrs[utubr].append({'title': title, 'url': url, 'watched': watched})
return utubrs, syncDests
def writeConf(utubrs, syncDests):
with open(CONF, 'w') as f:
f.write('SYNCDESTS\n')
for host in syncDests:
f.write('{}\n'.format(host))
f.write('\n')
for utubr in sorted(utubrs):
f.write('{}&{}\n'.format(utubr[0], utubr[1]))
for vid in utubrs[utubr]:
f.write('{}&{}&{}\n'.format(vid['title'], vid['url'], vid['watched']))
f.write('\n') # blank line
me = meAtMyself()
for host in syncDests:
if host != me:
subprocess.run('scp {} {}:.utube 1> /dev/null'.format(CONF, host), shell=True)
def getVid(index):
i = 0
for utubr in sorted(utubrs):
for vid in utubrs[utubr]:
if i == index:
return vid
i += 1
def printFancy(vids = []):
index = 0
for utubr in sorted(utubrs):
name = utubr[1]
hasPrintedName = False
for vid in utubrs[utubr]:
color = '\033[91m'
if vid['watched']:
color = ''
if not vids or any(v['url'] == vid['url'] for v in vids):
if not hasPrintedName:
print('\033[36m\033[1m{}\033[0m:'.format(name))
hasPrintedName = True
print('\t{}\033[1m{}\033[0m) {}'.format(color, index, vid['title']))
index += 1
if hasPrintedName:
print()
def watchUrl(url):
subprocess.run('youtube-dl --no-check-certificate -o - {} | mplayer -'.format(url), shell=True)
def listenUrl(url):
subprocess.run('youtube-dl --no-check-certificate -o - {} | mplayer -vo null -'.format(url), shell=True)
def downloadUrl(url):
subprocess.run('youtube-dl --no-check-certificate {} -o \'%(title)s.%(ext)s\''.format(url), shell=True)
utubrs, syncDests = readConf()
def group(lst, n):
for i in range(0, len(lst), n):
val = lst[i:i+n]
if len(val) == n:
yield tuple(val)
# Optimized for RSS downloads
def processRss(xmlData):
try:
root = ET.fromstring(xmlData)
channel = root.find('channel')
name = channel.find('title').text
index = []
for vid in channel.findall('item'):
title = html.unescape(vid.find('title').text)
url = vid.find('link').text
index.append({'title': title, 'url': url, 'watched': False})
return index, name
except:
return [], ''
# Optimized for Youtube downloads
def processYoutube(httpData):
search = re.search('(?<=<meta name="title" content=").*(?=">)', httpData)
if not search:
return [], ''
name = search.group(0)
index = []
for line in re.findall('^.*<h3 class="yt-lockup-title ">.*$', httpData, re.MULTILINE):
title = html.unescape(re.search('(?<=rel="nofollow">).*(?=</a>)', line).group(0))
if not re.search('(?<=Duration: ).*(?=</span>)', line):
continue # It's a live stream
url = "https://www.youtube.com" + re.search('(?<=href=").*(?=" rel=)', line).group(0)
index.append({'title': title, 'url': url, 'watched': False})
return index, name
def downloadMetadata(utubrUrl, maxDownloads):
# Let's first try some optimized download strategies
data = subprocess.run('curl -sL {}'.format(utubrUrl), shell=True, stdout=subprocess.PIPE).stdout.decode('utf-8')
index, name = processRss(data)
if not name:
index, name = processYoutube(data)
if not name:
for line in subprocess.run('youtube-dl {} -i -j --no-check-certificate --max-downloads {}'.format(utubrUrl, maxDownloads), shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout.decode('utf-8').split('\n'):
if not line:
continue
vid = json.loads(line)
title = ''
url = 'err'
if 'title' in vid:
title = vid['title']
if 'webpage_url' in vid:
url = vid['webpage_url']
elif 'url' in vid:
url = vid['url']
index.append({'title': title, 'url': url, 'watched': False})
if not name and 'uploader_id' in vid:
name = vid['uploader_id']
if not name and 'playlist' in vid:
name = vid['playlist']
if len(index) > maxDownloads:
index = index[0:maxDownloads]
return index, name
def mergeVids(index, utubr):
merged = []
seen = 0
#print('\nProcessing {}'.format(utubr[1]))
for new in index:
if not any(new['url'] == vid['url'] for vid in utubrs[utubr]):
#print('Appending {}'.format(new))
merged.append(new)
utubrs[utubr].insert(seen, new)
seen += 1
else:
#print('I has seen "{}" before...'.format(new['title']))
seen = [i for i, vid in enumerate(utubrs[utubr]) if vid['url'] == new['url']][0]
#print('seen is now {}'.format(seen))
# Remember at most 10 items per utubr
if len(utubrs[utubr]) > 10:
utubrs[utubr] = utubrs[utubr][0:10]
return merged
if args.add:
index, name = downloadMetadata(args.add, 10)
utubr = (args.add, name)
utubrs[utubr] = []
mergeVids(index, utubr)
printFancy(index)
writeConf(utubrs, syncDests)
if args.forceupdate:
for utubr in utubrs:
utubrs[utubr] = []
args.update = True #hacky, but whatever
if args.update:
alldaNewStuff = []
for utubr in utubrs:
newstuff, name = downloadMetadata(utubr[0], 10)
alldaNewStuff += mergeVids(newstuff, utubr)
if alldaNewStuff:
print('New stuff emerges!')
printFancy(alldaNewStuff)
else:
print('No new vids.')
writeConf(utubrs, syncDests)
if args.xml:
for utubr in utubrs:
# Create an output file
with open('{}.xml'.format(utubr[1].replace(' ', '')), 'w') as f:
f.write('<?xml version="1.0" encoding="ISO-8859-1" ?>\n<rss version="2.0">\n')
f.write(' <channel>\n <title>{}</title>\n'.format(html.escape(utubr[1])))
for vid in utubrs[utubr]:
f.write(' <item><title>{}</title>'.format(html.escape(vid['title'])))
f.write('<link>{}</link>'.format(vid['url']))
f.write('<description></description></item>\n')
f.write(' </channel>\n')
f.write('</rss>\n')
if args.list:
printFancy()
if args.watch:
vid = getVid(args.watch[0])
utubr = next(utubr for utubr in utubrs if vid in utubrs[utubr])
vid['watched'] = True
writeConf(utubrs, syncDests)
watchUrl(vid['url'])
if args.listen:
vid = getVid(args.listen[0])
utubr = next(utubr for utubr in utubrs if vid in utubrs[utubr])
vid['watched'] = True
writeConf(utubrs, syncDests)
listenUrl(vid['url'])
if args.W:
watchUrl(args.W)
if args.download:
vid = getVid(args.download[0])
utubr = next(utubr for utubr in utubrs if vid in utubrs[utubr])
vid['watched'] = True
writeConf(utubrs, syncDests)
downloadUrl(vid['url'])
if args.markAllWatched:
for utubr in utubrs:
for vid in utubrs[utubr]:
vid['watched'] = True
writeConf(utubrs, syncDests)
if args.unwatch:
vid = getVid(args.unwatch[0])
vid['watched'] = False
writeConf(utubrs, syncDests)
if args.autosync:
host=args.autosync
if ':' in host:
host = host.split(':')[0]
syncDests.append(host)
writeConf(utubrs, syncDests)