Patrick Hirschi

Data Enthusiast

Software Engineer

Tech Lead

Patrick Hirschi

Data Enthusiast

Software Engineer

Tech Lead

Blog Post

Twitter Data Streaming

Twitter Data Streaming

Datenquelle Twitter

Twitter ist ein 2006 gegründeter Online-Dienst, mit welchem man Kurznachrichten (sogenannte “Tweets”, von engl. “Tweet”, also “zwitschern”) absetzen kann. Die offene Twitter API v2 des Twitter Developer Portals bietet einen programmatischen Zugriff auf Daten zu Nutzern, Tweets, Listen, Gruppen und vieles mehr.

Es gibt aktuell drei verschiedene Nutzungskategorien der API. Ausserdem ist eine vierte Nutzungskategorie «Elevated+» in Planung (Q3 2022?).

Die folgende Tabelle zeigt einen Überblick über die verschiedenen Kategorien. Zu beachten ist vor Allem die Einschränkung bezüglich des Scopes der API-Suchmethoden (letzte 7 Tage vs. volle Archiv-Suche).

EssentialElevatedAcademic / Research
500’000 Tweets pro Monat
25 Requests pro Minute
Nur die letzten 7 Tage abrufbar
2 Mio. Tweets pro Monat
50 Requests pro Minute
Nur die letzten 7 Tage abrufbar
10 Mio. Tweets pro Monat
Voller Archiv Zugang
Verbesserte Suchfunktion
100 Requests pro Minute
Twitter API v2 Nutzungskategorien

Twitter hat für den Beantragungsprozess eine Online-Eingabemaske. Es braucht eine genaue Beschreibung des Projektes, was für Daten geladen werden, wie sie prozessiert werden und auch, ob man Ergebnisse in aggregierter Form präsentiert. Der Zugang ist nicht für kommerzielle Zwecke zugelassen. Nach dem Ausfüllen des Antrages wird automatisch der Prüfprozess gestartet.

Sobald der Antrag genehmigt wurde, hat man Zugriff auf ein breites Set von API REST Methoden:

  • Tweet-Informationen
    • Suche nach bestimmten Keywords / Hashtags
    • Absetzen, ändern oder löschen von Tweets auf dem eigenen Account
    • Historische (Archiv-) Suche
    • Aggregierte Informationen (Anzahl Tweets, Anzahl Likes, Anzahl Antworten, Anzahl Re-Tweets, etc.)
  • Nutzer-Informationen
    • Suche nach Nutzer-Ids
    • Suche nach Followern von Nutzern
    • Blockierte Nutzer
    • Stummgeschaltete Nutzer
  • Listen-/Rauminformationen
    • Suche nach Gruppen, Listen und Räumen
  • Compliance
    • Abfrage von Compliance Informationen zu bestimmten Tweets

Herausforderungen

Die Aufrechterhaltung eines Echtzeit-Datenstreams über mehrere Tage bietet einige Herausforderungen. In einem produktiven Setup würde man hier eine redundante und hochverfügbare System- und Netzwerkarchitektur vorsehen. Da dies aber für private Zwecke nicht möglich ist, kommt es meist zu wiederkehrenden Unterbrüchen des Streams aufgrund von:

  • Ruhezustand Monitor / Festplatte
    • Ein Ruhezustand des Monitors bzw. der Festplatte des Host-Systems führt dazu, dass die Twitter Stream-Daten über einen längeren Zeitraum nicht mehr empfangen werden können. Twitter beendet in der Folge Streams zu Stream-Empfängern, welche die gelieferten Daten nicht mehr verarbeiten können.
  • Netzwerkausfälle
    • Egal ob das Hostsystem über Ethernet oder WLAN mit dem Router / Internet verbunden ist, ein Netzwerkausfall kann in privaten Haushalten immer mal passieren. Oft gibt es auch kleinere Unterbrüche, die von den Service-Providern verursacht werden. Auch bei einem Netzwerkausfall, führt die verzögerte Abarbeitung der Stream-Daten zu einem Abbruch des Datenstreams.
  • Wartungsarbeiten Twitter
    • Twitter gibt keine Garantie für einen unterbruchfreien Stream. Die technische Plattform, die von Twitter bereitgestellt wird, muss auch ab und zu gewartet werden. Bei diesen Wartungsarbeiten werden die Streams unterbrochen.
  • Verbindungsabbrüche
    • In den Testläufen gab es Unterbrüche, die auf keinen Fehler zurückgeführt werden konnten. Der Verdacht liegt nahe, dass längere Streams gelegentlich auch ohne Grund abgebrochen werden.
  • Äussere Einflüsse
    • Die ganz menschlichen Gründe wie Kinder, PartnerInnen o.Ä., welche ein unterbruchfreies Streaming-Erlebnis verunmöglichen (Notebook ausschalten, Kabel entfernen, etc.).

Massnahmen

Leider können nicht alle von den beschriebenen Einflüssen technisch auch abgefangen werden, aber das Risiko kann mit gezielten Massnahmen reduziert werden. Sollte es trotzdem noch zu einem Unterbruch kommen, so soll dieser so kurz wie möglich gehalten werden. Um dies zu erreichen, können z.B. die folgenden Massnahmen implementiert werden:

  • Exception Handling
    • Manche Verbindungsabbrüche haben spezifische Fehlercodes, die man in Python entsprechend abfangen kann. Dies soll geloggt werden, aber das Programm nicht zum Absturz bringen. Nach einer kurzen Wartezeit (sleep) soll erneut versucht werden, die Verbindung wieder aufzubauen.
  • Restart Mechanismus
    • Das Python Skript welches den Twitter Stream etabliert wird mit Hilfe eines Bash Skriptes und dem Modul gtimeout so ausgeführt, dass es in einer unendlichen while-Schlaufe alle 10 Minuten ordentlich beendet und wieder neu gestartet wird. Selbst wenn es so mal zu einem Ausfall kommen sollte, so dauert dieser maximal 10 Minuten und dann wird das Skript wieder neugestartet.
  • Caffeinated
    • Caffeinated ist eine simple Applikation die es erlaubt, Ruhezustände vom Monitor oder der Festplatte konsequent zu verzögern oder zu verhindern.
  • Kommandozeile
    • Um die Internetverbindung aufrecht zu erhalten, wird über die Kommandozeile eine unendliche Ping-Schlaufe auf den Google DNS Server 8.8.8.8 abgesetzt.

Aufbau Datenstream

Twitter Daten können als gefilterter Stream über die Twitter API v2 geladen werden. Jeder Tweet der die Bedingungen von einem der Filter erfüllt, wird entsprechend mit dem Tag des Filters markiert und kann in der Folge in Python noch weiterverarbeitet werden. Typischerweise werden die Daten aber in Rohform (JSON) in einem Dokumentenstore (z.B. MongoDB) persistiert.

Testszenario

Für den Aufbau eines Twitter API Streams mit der Streaming APIv2 kann mit dem persönlichen Account auf dem Twitter Developer Portal eine Applikation und ein zugehöriges API Token eingerichtet werden.

Das API Token wird benötigt, um sich gegenüber der API zu authentifizieren. In der Folge kann mit dem Python requests Modul die REST API aufgerufen und abgefragt werden. Die Resultate werden als json übergeben und ohne weitere Prozessierung in der dafür eingerichteten mongoDB gespeichert.

Es sollen ausschliesslich Tweets zu drei spezifischen Themenbereichen, welche derzeit weltweit für Schlagzeilen sorgen, geladen werden:

  • Konflikt in der Ukraine
  • Covid 19
  • Klimakrise

Das dafür verwendete Skript sieht wie folgt aus:

import json
import sys
import socket
from datetime import datetime
import time
import requests
import os
import threading
import pymongo



# To set the bearer token in the environment variables:
# https://www.toptal.com/apache/apache-spark-streaming-twitter
# https://github.com/twitterdev/Twitter-API-v2-sample-code/blob/main/Filtered-Stream/filtered_stream.py
# safer option is to register it as an environment variable
# bearer_token = os.environ.get("BEARER_TOKEN")

bearer_token = "<<BEARER_TOKEN>>"

# Mongo DB Database Connection Details
mongoclient = pymongo.MongoClient("mongodb://localhost:27017/")

mydb = mongoclient["twitter_streaming_data"]
mycol = mydb["tweets"]

# stream start
stream_start_time = datetime.now()

def bearer_oauth(r):
	"""
	Method required by bearer token authentication.
	"""

	r.headers["Authorization"] = f"Bearer {bearer_token}"
	r.headers["User-Agent"] = "v2FilteredStreamPython"
	return r


def set_rules(delete):
	# Define three rules for the filtered streaming:
	#	- Filter for tweets from the russia/ukraine conflict
	#	- Filter for tweets regarding climate change
	#	- Filter for tweets regarding covid-19
	sample_rules = [
		{"value": "(Russland OR Ukraine OR Putin OR Selenski OR Lwiw OR Kiew OR Mariupol OR Klitschko OR Lawrow) lang:de -is:retweet", "tag": "Russland"},
		{"value": "(Klima OR Klimawandel OR Energiepolitik OR erneuerbar OR Solaranlage OR Klimaerwärmung) lang:de -is:retweet", "tag": "Klimawandel"},
		{"value": "(Covid OR Corona OR SARS-CoV-2 OR Omikron OR Impfplicht OR Impfung) lang:de -is:retweet", "tag": "Covid"}
	]
	payload = {"add": sample_rules}
	response = requests.post(
		"https://api.twitter.com/2/tweets/search/stream/rules",
		auth=bearer_oauth,
		json=payload,
	)
	if response.status_code != 201:
		raise Exception(
			"Cannot add rules (HTTP {}): {}".format(response.status_code, response.text)
		)
	print(json.dumps(response.json()))


def get_stream(set):
	print(str(datetime.now()) + " - [THREAD ID] " + str(threading.get_ident()))
	response = requests.get(
	"https://api.twitter.com/2/tweets/search/stream?tweet.fields=author_id,created_at,source,entities", 
	auth=bearer_oauth, 
	stream=True,
	)
	print(str(datetime.now()) + " - HTTP Response Status for Stream Definition: ",response.status_code)
	if response.status_code != 200:
		raise Exception(
			"Cannot get stream (HTTP {}): {}".format(
				response.status_code, response.text
			)
		)
	
	print(str(datetime.now()) + " - Start reading Tweets from Stream...")
	count = 0
	# Iterate through each streamed tweet
	for response_line in response.iter_lines():
		if response_line:
			try:
				full_tweet = json.loads(response_line)
				#tweet_json_string = json.dumps(full_tweet)
				# Useful for debugging purposes
				# print("Tweet Text: " + tweet_json_string)
				#print ("------------------------------------------")
				mycol.insert_one(full_tweet)
				count += 1
				if count % 100 == 0:
					print(str(datetime.now()) + " - Successfully processed " + str(count) + " Tweets.")
			except:
				e = sys.exc_info()[0]
				print("%s - Error: %s" % str(datetime.now()),e)

	if response.raw.closed:
		# Disconnect has happened
		print("%s - Error... Stream was forcibly closed by Twitter: %s. Raising Exception now!" 
				% str(datetime.now()),response.raw.closed)
		raise Exception

	

		

def get_rules():
	response = requests.get(
		"https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth
	)
	if response.status_code != 200:
		raise Exception(
			"Cannot get rules (HTTP {}): {}".format(response.status_code, response.text)
		)
	print(json.dumps(response.json()))
	return response.json()


def delete_all_rules(rules):
	if rules is None or "data" not in rules:
		return None

	ids = list(map(lambda rule: rule["id"], rules["data"]))
	payload = {"delete": {"ids": ids}}
	response = requests.post(
		"https://api.twitter.com/2/tweets/search/stream/rules",
		auth=bearer_oauth,
		json=payload
	)
	if response.status_code != 200:
		print(json.dumps(response.json()))
		raise Exception(
			"Cannot delete rules (HTTP {}): {}".format(
				response.status_code, response.text
			)
		)
	

def main():
	rules = get_rules()
	delete = delete_all_rules(rules)
	set = set_rules(delete)
	print(str(stream_start_time) + " - [STARTING] start time")
	while(True):
		try:
			print(str(datetime.now()) + " - [STARTING] stream is starting in new thread...")
			thread = threading.Thread(target=get_stream(set))
			thread.start()
		except Exception as e:
			print(str(datetime.now()) + " - Caught the exception: " + str(e) 
					+ ". Wait for 30 seconds and try again.")
			#thread.join()
			time.sleep(30)

if __name__ == "__main__":
	main()

Wie im Kapitel „Massnahmen“ beschrieben, kann man für die bessere Stabilität ein zusätzliches Bash Skript nutzen, welches alle x Minuten (hier alle 10 Minuten) den Stream neu startet.

restart=""
while true; do
    gtimeout 600 python3 /Users/ph/Projects/twitter_spark_streaming/twitter_api_stream.py $restart
    restart="restart"
done 

Der Output sieht dann wie folgt aus:

Beispiel Output
Taggs:
Write a comment