Twitch.tv is a livestream platform where people have the chance to discover different types of topics. Founded in 2011 and now owned by Amazon.com, Inc. it is mainly used to entertain and divulge digital content that can be viewed either live or on demand. With an average of 26.5 million daily visitors (2020, Twitch), Twitch is a web service and a global community where users have the opportunity to learn new skills, discover new artists, spend free time and more.
Secondly, log in.
NB: The 1st and 2nd step can be reversed. If you are already logged in, you can ignore this (2nd) step.
Here there are the API Key and the API Secret:
NB: this information must be kept and not shared with anyone.
REpresentational State Transfer (REST) refers to a group of software architecture design constraints that bring about efficient, reliable, and scalable distributed systems. A system is called RESTful when it adheres to those constraints.
To access certain data about users’ accounts we have to choose one specific authentication method: OAuth. We use some parts of the OAuth 2.0 protocol.
Authentication involves:
Notes:
Considered that the 1st point is already completed, let's complete the 2nd and the 3rd points: getting the token and sending the token in your API request.
# import the required libraries to work with Python
import urllib.request, requests, json
CLIENT_ID = "agtcjzo944zzlpydvr19ckyqxxhvb6"
CLIENT_SECRET = "77ypkhyf7uzkddi6j8lc856yahwkxh"
# make the request
def make_request(URL):
header = {"Client-ID": CLIENT_ID, "Authorization": f"Bearer {get_access_token()}" }
req = urllib.request.Request(URL, headers=header)
recv = urllib.request.urlopen(req)
return json.loads(recv.read().decode("utf-8"))
# get the token
def get_access_token():
x = requests.post(f"https://id.twitch.tv/oauth2/token?client_id={CLIENT_ID}&client_secret={CLIENT_SECRET}&grant_type=client_credentials")
return json.loads(x.text)["access_token"]
print("Access token: " + get_access_token())
Access token: otajx7guutg58p9q0xpel1r24ne39j
We got the access token. Let's make a couple of examples using Twitch's REST API:
# 1st example: starting from a list of streamers,
# get the current online streamers list and print their current type of content
def get_current_online_streams():
streamers = [
"emalloru",
"rovazzi",
"marcomontemagno",
"jakidale",
"chess24",
"abracadabra",
"enkk",
"velox",
"marcocrepaldi",
"veritas"
]
URL = "https://api.twitch.tv/helix/streams?user_login="
resps = []
for name in streamers: resps.append(make_request(URL + name))
# bolding the online streamers
bold = "\033[1m"
normal = "\033[0m"
GAME_URL = "https://api.twitch.tv/helix/games?id="
for i, r in enumerate(resps, 0):
if r["data"]:
game_id = r["data"][0]["game_id"]
game_resp = make_request(GAME_URL + game_id)
game_name = game_resp["data"][0]["name"]
print("- " + bold + streamers[i] + ": " + game_name)
else: print("- " + normal + streamers[i] + ": now offline")
get_current_online_streams()
- emalloru: now offline
- rovazzi: now offline
- marcomontemagno: now offline
- jakidale: now offline
- chess24: Chess
- abracadabra: now offline
- enkk: now offline
- velox: now offline
- marcocrepaldi: now offline
- veritas: now offline
# 2nd example: starting from a list of streamers,
# get the first 10 popular categories of streams
def get_current_top_streams():
URL = "https://api.twitch.tv/helix/games/top"
request = []
top_streams = []
request.append(make_request(URL))
# printing the top 10
data = request[0]["data"]
for i, r in enumerate(data[0:10], 1): print("{}. {}".format(i, r["name"]))
get_current_top_streams()
1. Just Chatting 2. League of Legends 3. Counter-Strike: Global Offensive 4. Grand Theft Auto V 5. Path of Exile 6. VALORANT 7. Fortnite 8. Dota 2 9. Apex Legends 10. Minecraft
A wrapper function is a subroutine (another word for a function) in a software library whose main purpose is to call a second subroutine or a system call with little or no additional computation. Wrapper functions are used to make writing computer programs easier by abstracting away the details of a subroutine's underlying implementation.
TwitchAPI is a full implementation of the Twitch API, its Webhook and PubSub in Python 3.7.
To see the full official documentation click here.
It is very simple to install that using pip:
# pip install twitchAPI
from twitchAPI.twitch import Twitch
from pprint import pprint
To see how wrappers increase readability and clarity of the code, relook:
twitch = Twitch(CLIENT_ID, CLIENT_SECRET)
# let's create a simple app authentication:
twitch.authenticate_app([])
# get the access token
print("access token: " + twitch.get_app_token())
access token: zjrkfkh3ikyyn44y4p980xrhqtje5s
# 2nd example: starting from a list of streamers,
# get the first 10 popular categories of streams
top_ten = twitch.get_top_games(None, None, 10)
for i in range(10): print("%d. " % (i+1) + top_ten["data"][i]["name"])
1. Just Chatting 2. League of Legends 3. Counter-Strike: Global Offensive 4. Grand Theft Auto V 5. Path of Exile 6. VALORANT 7. Fortnite 8. Dota 2 9. Apex Legends 10. Minecraft
We can also display this ranking in a different way, with pandas library:
# pip install pandas
import pandas as pd
stats = []
for i in range(10):
name = top_ten["data"][i]["name"]
category_id = top_ten["data"][i]["id"]
stats.append([name, category_id])
# create a new dataframe
df = pd.DataFrame(stats, columns=["name", "ID"])
df.index += 1
df.head(10)
name | ID | |
---|---|---|
1 | Just Chatting | 509658 |
2 | League of Legends | 21779 |
3 | Counter-Strike: Global Offensive | 32399 |
4 | Grand Theft Auto V | 32982 |
5 | Path of Exile | 29307 |
6 | VALORANT | 516575 |
7 | Fortnite | 33214 |
8 | Dota 2 | 29595 |
9 | Apex Legends | 511224 |
10 | Minecraft | 27471 |
# 3rd example: starting from a video ID
# get the streamer's ID and information about its stream
videoID = "965206441"
userID = twitch.get_videos(videoID)["data"][0]["user_id"]
user_name = twitch.get_videos(videoID)["data"][0]["user_name"]
print(user_name + "'s ID:", userID)
twitch.get_videos(videoID)["data"][0]
Enkk's ID: 52443328
{'id': '965206441', 'stream_id': '42122881086', 'user_id': '52443328', 'user_login': 'enkk', 'user_name': 'Enkk', 'title': 'Nooo ma quanto tempo era che non ci vedevamo? | !prime !ffz !bttv 🔴', 'description': '', 'created_at': '2021-03-27T16:15:55Z', 'published_at': '2021-03-27T16:15:55Z', 'url': 'https://www.twitch.tv/videos/965206441', 'thumbnail_url': 'https://static-cdn.jtvnw.net/cf_vods/d2nvs31859zcd8/2ce021c6b93ad159032b_enkk_42122881086_1616861745//thumb/thumb0-%{width}x%{height}.jpg', 'viewable': 'public', 'view_count': 95348, 'language': 'it', 'type': <VideoType.ARCHIVE: 'archive'>, 'duration': '11h3m0s', 'muted_segments': None}
# 4th example: get channel information for users
twitch.get_channel_information(userID)["data"][0]
{'broadcaster_id': '52443328', 'broadcaster_login': 'enkk', 'broadcaster_name': 'Enkk', 'broadcaster_language': 'it', 'game_id': '509658', 'game_name': 'Just Chatting', 'title': 'widepeepoHug | !vpn !prime !bttv d!ffz 🔴'}
# 5th example: get information on follow relationships between two Twitch users.
# information returned is sorted in order, most recent follow first.
totalFollowers = twitch.get_users_follows(None, 20, None, userID)["total"]
print(user_name + "'s current total number of followers:", totalFollowers)
Enkk's current total number of followers: 95024
Having realized that libraries facilitate the development of the code, we are now ready to analyze some data.
Using APIs to extract data from social media, as we know, is very efficient. As for the Twitch platform, for instance, an application could concern the prediction of which categories are mostly viewed, starting from a streamer. For this purpose, we are going to use two Python packages greatly useful for the web scraping's context: BeautifulSoup and Selenium. How can we do this? Let's take a look:
# get the most popular categories of one or more streamers
# pip install selenium
# import webdriver library
from selenium import webdriver
user_name = "Enkk"
driver = webdriver.Chrome("../Tutorial/chromedriver")
driver.get("https://www.twitch.tv/enkk/videos?filter=archives")
pageHTML = driver.page_source
Let's get all the archive of a streamer, with his video IDs:
# import BeautifulSoup library
from bs4 import BeautifulSoup as soup
soup = soup(driver.page_source, "html.parser")
# get videos ID
videoID = soup.findAll("a", {"class":"ScCoreLink-udwpw5-0 eCaUBq tw-full-width tw-link"})
videoIDs = []
for i in range(len(videoID)): videoIDs.append(videoID[i]["href"][8:17])
for i in range(len(videoID)): print("ID VIDEO n.", i+1, "=", videoIDs[i])
ID VIDEO n. 1 = 983903991 ID VIDEO n. 2 = 982479400 ID VIDEO n. 3 = 975220469 ID VIDEO n. 4 = 974218803 ID VIDEO n. 5 = 966463768 ID VIDEO n. 6 = 965206441 ID VIDEO n. 7 = 963931974 ID VIDEO n. 8 = 957954941 ID VIDEO n. 9 = 956600616 ID VIDEO n. 10 = 955773956 ID VIDEO n. 11 = 949261088 ID VIDEO n. 12 = 947943146 ID VIDEO n. 13 = 940733626 ID VIDEO n. 14 = 939334689 ID VIDEO n. 15 = 938667783 ID VIDEO n. 16 = 929240434 ID VIDEO n. 17 = 928188646 ID VIDEO n. 18 = 926953941 ID VIDEO n. 19 = 923214910 ID VIDEO n. 20 = 921682451
Now it is time to estimate and plot:
# get stream categories
# check if a single video contains one or more categories
box = soup.findAll("div", {"class":"tw-flex-grow-1 tw-flex-shrink-1 tw-full-width tw-item-order-2 tw-media-card-meta__text-container"})
string = "Chapters"
chapters = []
counting = []
game_names = []
total_views = []
total_chapters = []
counter = 0
j=0
for i in range(len(box)):
# get the total number of views per video using the "get_videos" function
total_view = twitch.get_videos(videoIDs[i])["data"][0]["view_count"]
total_views.append(total_view)
# get the number of chapters per video
if string in box[i].text:
main_category = ""
if int(box[i].text[-1:]) >= 7: counter += 1
chapters.append(int(box[i].text[-1:]))
n_chapters = int(chapters[-1:][0])
total_chapters.append(n_chapters)
counting.append([videoIDs[i], total_views[i], total_chapters[i], main_category])
# videos with only one category (chapter)
else:
n_chapters = "1"
main_category = box[i].text.split(user_name)[1]
game_names.append(main_category)
total_chapters.append(n_chapters)
counting.append([videoIDs[i], total_views[i], total_chapters[i], main_category])
j += 1
# count the total number of views per video
# creat a new dataframe
total = pd.DataFrame(counting, columns=["VIDEO ID", "TOTAL VIEWS", "N. CHAPTERS", "MAIN CATEGORY"])
total.index += 1
total.head(len(box))
VIDEO ID | TOTAL VIEWS | N. CHAPTERS | MAIN CATEGORY | |
---|---|---|---|---|
1 | 983903991 | 201243 | 3 | |
2 | 982479400 | 201214 | 5 | |
3 | 975220469 | 149747 | 6 | |
4 | 974218803 | 127229 | 3 | |
5 | 966463768 | 132400 | 5 | |
6 | 965206441 | 95348 | 4 | |
7 | 963931974 | 22110 | 1 | It Takes Two |
8 | 957954941 | 111964 | 3 | |
9 | 956600616 | 132408 | 3 | |
10 | 955773956 | 50508 | 2 | |
11 | 949261088 | 98557 | 3 | |
12 | 947943146 | 127323 | 2 | |
13 | 940733626 | 78513 | 4 | |
14 | 939334689 | 103240 | 3 | |
15 | 938667783 | 9513 | 1 | Valheim |
16 | 929240434 | 110335 | 4 | |
17 | 928188646 | 58389 | 4 | |
18 | 926953941 | 87753 | 1 | Just Chatting |
19 | 923214910 | 67466 | 2 | |
20 | 921682451 | 119457 | 4 |
Create a simple Python function using regex to convert category durations in minutes, in order to compare the same measures:
import re
def get_minutes(text):
regex = '\d+( )\w+'
# build a regex which can find a sequence of digits and a single word
result = re.finditer(regex, text, re.DOTALL)
# find such patterns in the given string
minutes = 0
for element in result:
fragment = element.group()
# hours
if 'hour' in fragment:
minutes += int(re.search('\d+', fragment).group()) * 60
# if there are 'hours', the associated number is multiplied by 60
# and added to the count
# minutes
elif 'minutes' in fragment:
minutes += int(re.search('\d+', fragment).group())
# seconds
else: continue
return minutes
Update our "total" dataframe calculating which is the main category of a stream:
categories = []
durations = []
countings = []
y = 0
for x in range(1, len(total)+1):
if int(total['N. CHAPTERS'].values[x-1]) > 1 and int(total['N. CHAPTERS'].values[x-1]) < 7:
# click the chapter's button
button = "/html/body/div[1]/div/div[2]/div/main/div[2]/div[3]/div/div/div[1]/div[1]/div[2]/div/div[3]/div/div/div/div/div[2]/div/div[1]/div["
button += str(x)
button += "]/article/div[1]/div/div[1]/div[3]/div/div/div[1]/button"
driver.find_element_by_xpath(button).click()
i = 1
for i in range(1, int(total['N. CHAPTERS'].values[x-1]+1)):
# get the single category
c_xpath = "/html/body/div[1]/div/div[2]/div[2]/main/div[2]/div[3]/div/div/div[1]/div[1]/div[2]/div/div[3]/div/div/div/div/div[2]/div/div[1]/div["
c_xpath += str(x)
c_xpath += "]/article/div[1]/div/div[1]/div[3]/div/div/div[2]/div/div/div/div/div[2]/div/div[3]/div/div/div["
c_xpath += str(i)
c_xpath += "]/a/div/div[2]/div[1]/p"
category = driver.find_element_by_xpath(c_xpath).text
if (category == ""): continue
categories.append(category)
# get the duration
d_xpath = "/html/body/div[1]/div/div[2]/div[2]/main/div[2]/div[3]/div/div/div[1]/div[1]/div[2]/div/div[3]/div/div/div/div/div[2]/div/div[1]/div["
d_xpath += str(x)
d_xpath += "]/article/div[1]/div/div[1]/div[3]/div/div/div[2]/div/div/div/div/div[2]/div/div[3]/div/div/div["
d_xpath += str(i)
d_xpath += "]/a/div/div[2]/div[2]/p"
text = driver.find_element_by_xpath(d_xpath).text
# converting category durations in minutes
durations.append(get_minutes(text))
# delete duplications
for j in range(0, len(categories)-1):
for k in range(j+1, len(categories)):
if (categories[j] == categories[k]):
durations[j] += durations[k]
del categories[k], durations[k]
# get the main category
index = 0
max_duration = durations[0]
for j in range(1, len(durations)):
if (durations[j] > max_duration):
max_duration = durations[j]
index = j
countings.append([videoIDs[x-1], total_views[x-1], total_chapters[x-1], categories[index]])
del categories[0:int(total['N. CHAPTERS'].values[x-1]+1)]
del durations[0:int(total['N. CHAPTERS'].values[x-1]+1)]
# too many categories
elif int(total['N. CHAPTERS'].values[x-1]) >= 7:
main_category = "too many categories"
countings.append([videoIDs[x-1], total_views[x-1], total_chapters[x-1], main_category])
continue
# only one category
else:
countings.append([videoIDs[x-1], total_views[x-1], total_chapters[x-1], game_names[y]])
y += 1
continue
# plot the new "total" dataframe
newtotal = pd.DataFrame(countings, columns=["VIDEO ID", "TOTAL VIEWS", "N. CHAPTERS", "MAIN CATEGORY"])
newtotal.index += 1
newtotal.head(len(box))
VIDEO ID | TOTAL VIEWS | N. CHAPTERS | MAIN CATEGORY | |
---|---|---|---|---|
1 | 983903991 | 201243 | 3 | Just Chatting |
2 | 982479400 | 201214 | 5 | Just Chatting |
3 | 975220469 | 149747 | 6 | Just Chatting |
4 | 974218803 | 127229 | 3 | Call of Duty: Warzone |
5 | 966463768 | 132400 | 5 | Just Chatting |
6 | 965206441 | 95348 | 4 | Just Chatting |
7 | 963931974 | 22110 | 1 | It Takes Two |
8 | 957954941 | 111964 | 3 | Just Chatting |
9 | 956600616 | 132408 | 3 | Just Chatting |
10 | 955773956 | 50508 | 2 | Just Chatting |
11 | 949261088 | 98557 | 3 | Just Chatting |
12 | 947943146 | 127323 | 2 | Just Chatting |
13 | 940733626 | 78513 | 4 | Just Chatting |
14 | 939334689 | 103240 | 3 | Just Chatting |
15 | 938667783 | 9513 | 1 | Valheim |
16 | 929240434 | 110335 | 4 | Just Chatting |
17 | 928188646 | 58389 | 4 | Chess |
18 | 926953941 | 87753 | 1 | Just Chatting |
19 | 923214910 | 67466 | 2 | Chess |
20 | 921682451 | 119457 | 4 | Just Chatting |
Once extracted the main categories and filled every i-th row and column, let's print the "top 3 popular categories" of "Enkk" streamer:
# delete duplications
del newtotal["VIDEO ID"], newtotal["N. CHAPTERS"]
for j in range(1, len(newtotal)):
for k in range(j+1, len(newtotal)+1):
if (newtotal["MAIN CATEGORY"][j] == newtotal["MAIN CATEGORY"][k]):
newtotal["TOTAL VIEWS"][j] += newtotal["TOTAL VIEWS"][k]
newtotal["MAIN CATEGORY"][k] = ""
newtotal["TOTAL VIEWS"][k] = 0
# get the main category
totalviews = newtotal["TOTAL VIEWS"][1]
for j in range(1, len(newtotal)):
if (newtotal["TOTAL VIEWS"][j] > totalviews):
top_category = newtotal["MAIN CATEGORY"][j]
totalviews = newtotal["TOTAL VIEWS"][j]
app = newtotal.sort_values(by=['TOTAL VIEWS'], ascending=False)
final = app.head(3)
final.index = ['1', '2', '3']
import warnings
warnings.filterwarnings('ignore')
final
TOTAL VIEWS | MAIN CATEGORY | |
---|---|---|
1 | 1800010 | Just Chatting |
2 | 127229 | Call of Duty: Warzone |
3 | 125855 | Chess |
Plotting the prediction using a pie chart:
import matplotlib.pyplot as plt
# data to plot
labels = final["MAIN CATEGORY"][0], final["MAIN CATEGORY"][1], final["MAIN CATEGORY"][2]
sizes = [final["TOTAL VIEWS"][0], final["TOTAL VIEWS"][1], final["TOTAL VIEWS"][2]]
colors = ['gold', 'yellowgreen', 'lightcoral']
explode = (0.2, 0.2, 0.2) # spacing between slices
# plot datas using a pie chart
plt.pie(sizes, explode=explode, labels=labels, colors=colors,
autopct='%1.1f%%', shadow=True, startangle=90)
plt.legend(labels, loc="best")
plt.axis('equal')
plt.show()
As we can see with this extraction, we have understood:
It is nice to see, with this simple prediction, how categories influence a streamer's stream. Indeed, it is also important to keep in mind that Twitch's audience is large and, with a great probability, a general user follows a stream because the category works with him.
With this data it is possible to carry out a multitude of operations: this data extraction has the aim of categorizing and answering in a pragmatic way the simple question "which is the most popular category of a streamer?".