The Automated World Statistics Extraction project is an innovative integration of Python, Twitter, and Google Suite, designed to transform the unstructured data of tweets into a well-organized, analyzable format. Utilizing Python, the project extracts specific tweets containing world statistics from a dedicated Twitter feed. These tweets, often containing crucial data on various global metrics, are then parsed and transformed into a structured format, making the data easily accessible and analyzable. The project targets tweets with specific structures and data types, ensuring that the extracted information is relevant and valuable.
A cornerstone of this project is automation and real-time data update. The Python script is scheduled to run every six hours, ensuring that the data extracted is always current and relevant. This continuous update mechanism is crucial in the fast-paced environment of Twitter, where data and information are continuously updated. The extracted data is then organized and stored in Google Sheets, making use of the gspread library. This integration ensures that the data is not only stored but is also easily accessible and can be shared among multiple users, enhancing collaboration and analysis.
Visual representation of data is integral in making complex datasets understandable. In this stride, the project employs Plotly and DASH to generate insightful and interactive charts from the structured data. These visual representations are then stored as PNG images in Google Drive and embedded into Google Sheets, ensuring that users have both numerical and visual data at their fingertips. This integration of data extraction, transformation, storage, and visualization underscores the project’s comprehensive approach to making Twitter’s vast and unstructured data landscape accessible, analyzable, and actionable for decision-makers and analysts.
import time
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
# Set up ChromeDriver options and preferences
chrome_options = Options()
chrome_options.add_argument("--start-maximized")
prefs = {"profile.managed_default_content_settings.images": 2}
chrome_options.add_experimental_option("prefs", prefs)
# Set ChromeDriver path
chrome_path = r"C:\Users\My PC\Downloads\chromedriver_win32 (1)\chromedriver.exe"
# Initialize ChromeDriver
driver = webdriver.Chrome(executable_path=chrome_path, options=chrome_options)
# Navigate to Twitter Stats Feed page
driver.get("https://twitter.com/stats_feed")
# Wait for the page to load
time.sleep(15)
def expand_tweet(tweet_element):
"""Click "Show more" button in a tweet to reveal the full text."""
try:
show_more_button = tweet_element.find_element(By.XPATH, ".//span[contains(@class, 'css-901oao') and contains(@class, 'css-16my406') and contains(@class, 'r-poiln3') and contains(@class, 'r-bcqeeo') and contains(@class, 'r-qvutc0')]")
actions = ActionChains(driver)
actions.move_to_element(show_more_button).click().perform()
time.sleep(2)
except:
pass
def collect_tweets():
"""Collect tweet texts from the page."""
tweet_texts = []
tweet_elements = driver.find_elements(By.XPATH, "//div[@lang]")
for tweet_element in tweet_elements:
expand_tweet(tweet_element)
tweet_texts.append(tweet_element.text)
return tweet_texts
def scroll_down():
"""Scroll down the page."""
driver.execute_script("window.scrollBy(0, window.innerHeight)")
time.sleep(2)
tweets = []
for i in range(400):
tweets.extend(collect_tweets())
scroll_down()
# Create a pandas DataFrame from tweets list
df = pd.DataFrame({"tweet": tweets})
C:\Users\My PC\AppData\Local\Temp\ipykernel_22620\2124966370.py:2: DeprecationWarning: executable_path has been deprecated, please pass in a Service object driver = webdriver.Chrome(executable_path=chrome_path, options=chrome_options)
def clean_tweet(tweet):
lines = tweet.split('\n')
if 'Voir plus' in lines[-1]:
lines = lines[:-1]
return '\n'.join(lines)
df['tweet'] = df['tweet'].apply(clean_tweet)
df.to_csv("tweets.csv", index=False)
# close the browser
driver.quit()
import pandas as pd
import re
import io
# Read the CSV file as binary data
with open('tweets.csv', 'rb') as file:
binary_data = file.read()
# Replace "’" with "'" and decode the binary data using the correct encoding
decoded_data = binary_data.decode('utf-8').replace("’", "'")
# Write the decoded data to a temporary StringIO object
temp_csv = io.StringIO(decoded_data)
# Read the data from the StringIO object into a DataFrame
df = pd.read_csv(temp_csv)
# Remove rows that don't contain the newline character "\n"
df = df[df.apply(lambda row: row.astype(str).str.contains('\n').any(), axis=1)]
# Remove rows that don't contain the newline character "\n"
df = df[df.apply(lambda row: row.astype(str).str.contains(':').any(), axis=1)]
# Remove rows where the first line of the text in the first column doesn't contain ":"
df = df[df.iloc[:, 0].apply(lambda x: ':' in x.split('\n')[0])]
# Define a function to check if the first line of the text has a value to the right of ":"
def has_value_after_colon(text):
first_line = text.split('\n')[0]
match = re.search(r':\s*\S+', first_line)
return match is not None
# Remove rows where the first line of the text in the first column has a value to the right of ":"
df = df[~df.iloc[:, 0].apply(has_value_after_colon)]
# Replace "’" with "'" in the entire DataFrame
df = df.replace("’", "'", regex=True)
df = df.replace("Apple’s", "Apple's", regex=True)
# Define a function to check if the text has only one line
def has_multiple_lines(text):
return len(text.split('\n')) > 1
# Remove rows where the text in the first column has only one line
df = df[df.iloc[:, 0].apply(has_multiple_lines)]
# Create a new column containing the first line of the text in the first column
df['first_line'] = df.iloc[:, 0].apply(lambda x: x.split('\n')[0])
# Remove duplicates based on the 'first_line' column, keeping only the first occurrence
df = df.drop_duplicates(subset='first_line', keep='first')
# Drop the 'first_line' column
df = df.drop(columns=['first_line'])
# Write the filtered DataFrame to a new CSV file
df.to_csv('filtered_tweets.csv', index=False, encoding='utf-8')
import pandas as pd
import re
import chardet
import ftfy
!pip install ftfy
Defaulting to user installation because normal site-packages is not writeable Requirement already satisfied: ftfy in c:\users\my pc\appdata\roaming\python\python39\site-packages (6.1.1) Requirement already satisfied: wcwidth>=0.2.5 in c:\programdata\anaconda3\lib\site-packages (from ftfy) (0.2.5)
# Detect the encoding of the input file
with open('filtered_tweets.csv', 'rb') as file:
detected_encoding = chardet.detect(file.read())['encoding']
# Read the CSV file into a DataFrame
df = pd.read_csv('filtered_tweets.csv', encoding=detected_encoding)
# Function to process a single tweet and return a list of dictionaries with title, item, and value
def process_tweet(tweet):
lines = tweet.split('\n')
title = lines[0].strip()
items_values = [line.split(':', 1) for line in lines[1:] if ':' in line]
result = [{'Title': title, 'Item': item.strip(), 'Value': value.strip()} for item, value in items_values]
return result
# Process all tweets in the DataFrame and create a new DataFrame with the extracted information
data = []
for tweet in df.iloc[:, 0]:
data.extend(process_tweet(tweet))
new_df = pd.DataFrame(data)
# Write the new DataFrame to a CSV file
new_df.to_csv('transformed_tweets.csv', index=False, encoding=detected_encoding)
# Read the CSV file into a DataFrame
df2 = pd.read_csv('transformed_tweets.csv', encoding=detected_encoding)
# Use ftfy library to fix the encoding issues in the 'Title' column
df2['Title'] = df2['Title'].apply(lambda x: ftfy.fix_text(x))
# Define the patterns for matching values
patterns = {
'currency': r'^\$\d[\d,]*(\.\d+)?$',
'numeric': r'^\d[\d,]*$',
'float': r'^\d[\d,]*(\.\d+)?$',
'percentage': r'^\d[\d,]*(\.\d+)?%$',
}
# Create an empty DataFrame to store the matching rows and non-matching rows
matching_rows = pd.DataFrame(columns=['Title', 'Item', 'Value', 'Unit'])
non_matching_rows = pd.DataFrame(columns=['Title', 'Item', 'Value', 'Unit'])
# Iterate through the rows of df2 and match the patterns
for index, row in df2.iterrows():
value_str = str(row['Value'])
value_split = re.split(r'(\d[\d,.]*(?:\.\d+)?(?:%|\$)?)', value_str, maxsplit=1)
matched = False
if len(value_split) == 3:
for pattern in patterns.values():
if re.match(pattern, value_split[1]):
matched = True
break
if matched:
new_row = pd.DataFrame(
{'Title': [row['Title']], 'Item': [row['Item']], 'Value': [value_split[1]], 'Unit': [value_split[2].strip()]})
matching_rows = pd.concat([matching_rows, new_row], ignore_index=True)
else:
non_matching_rows = pd.concat([non_matching_rows, row.to_frame().T], ignore_index=True)
# Remove rows with unique titles from both DataFrames
matching_rows = matching_rows[matching_rows.duplicated(subset=['Title'], keep=False)]
non_matching_rows = non_matching_rows[non_matching_rows.duplicated(subset=['Title'], keep=False)]
# Save the updated matching_rows and non_matching_rows DataFrames to CSV files
matching_rows.to_csv('matching_rows.csv', index=False, encoding=detected_encoding)
non_matching_rows.to_csv('non_matching_rows.csv', index=False, encoding=detected_encoding)
import plotly.graph_objs as go
import dash
from dash import dcc
from dash import html
from dash.dependencies import Input, Output
# Read the matching_rows CSV file into a DataFrame
df = pd.read_csv('matching_rows.csv')
# Group the matching rows by Title
grouped = df.groupby('Title')
colors = ['#FFC107', '#FF5733', '#C70039', '#900C3F', '#581845',
'#1E8449', '#00FFFF', '#6C3483', '#F5B041', '#F1948A',
'#8E44AD', '#1ABC9C', '#D35400', '#3498DB', '#BDC3C7']
# Iterate through the unique Title values and create a separate plot for each one
for title in df['Title'].unique():
# Filter the matching rows by Title
group = grouped.get_group(title)
# Define the data and layout for the plotly chart
data = []
layout = go.Layout(
title=title,
xaxis=dict(title='Title', autorange='reversed'),
yaxis=dict(title='Value'),
hovermode='closest',
barmode='group'
)
# Iterate through the groups and add traces to the plotly chart
for i, item in enumerate(group['Item'].unique()):
sub_group = group[group['Item'] == item]
trace = go.Bar(
x=sub_group['Title'],
y=sub_group['Value'],
name=item,
marker=dict(color=colors[i % len(colors)])
)
data.append(trace)
# Create the plotly figure and plot the chart
fig = go.Figure(data=data, layout=layout)
fig.update_layout(barmode='group')
fig.show()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) ~\AppData\Local\Temp\ipykernel_28728\3557242639.py in <module> 1 # Iterate through the unique Title values and create a separate plot for each one ----> 2 for title in df['Title'].unique(): 3 # Filter the matching rows by Title 4 group = grouped.get_group(title) 5 NameError: name 'df' is not defined