网站后台管理破解wordpress 页面设置
ETL的过程
1、数据抽取:确定数据源,定义数据接口,选择数据抽取方法(主动抽取或由源系统推送)。
 2、数据清洗:处理不完整数据、错误数据、重复数据等,确保数据的准确性和一致性。(是数据转换的一部分)
 3、数据转换:进行空值处理、数据标准统一、数据拆分、数据验证、数据替换和数据关联等操作。
 4、规则检查:根据业务需求进行数据质量和业务规则的校验。
 5、数据加载:将数据缓冲区的数据加载到目标数据库或数据仓库中,可能是全量加载或增量加载。
1、数据抽取(Extract)
选择抽取策略
- 全量抽取 
- 特点:一次性抽取所有数据,适合数据量较小或首次抽取的场景。
 - 实现方式:直接查询整个表或读取整个文件。
 
 - 增量抽取 
- 特点:仅抽取发生变化的数据,适合数据量较大且需要频繁更新的场景。
 - 常用技术:
 - 时间戳:通过记录最后更新时间来抽取新增或修改的数据。
 - CDC(Change Data Capture):通过数据库日志或触发器捕获数据变化
 
 
数据抽取
-  
数据库抽取
- mysql、oracle等
 
 -  
文件抽取
- 读取文件:使用文件读取库(如csv)。
 
 -  
API 抽取
- HTTP 请求:使用 HTTP 客户端库(如 
requests)发送请求,获取 API 返回的数据。 
 - HTTP 请求:使用 HTTP 客户端库(如 
 -  
消息队列抽取
- 订阅消息:使用消息队列客户端(如 Kafka Consumer)订阅消息并获取数据。
 
 
抽取开源工具
-  
Apache NiFi
- 特点:提供可视化界面,支持实时和批量数据抽取,内置多种数据源连接器(如数据库、API、文件系统等)。
 - 适用场景:适合需要实时数据流处理的场景,支持复杂的数据路由和转换逻辑。
 
 -  
Apache Kafka
 
- 特点:分布式流处理平台,支持高吞吐量的实时数据抽取和传输。
 - 适用场景:适合需要实时数据流处理的场景,常与 Spark Streaming 或 Flink 结合使用。
 
- Talend Open Studio
 
- 特点:提供图形化界面,支持多种数据源(如数据库、文件、API)的抽取,支持代码生成。
 - 适用场景:适合中小型项目,支持快速开发和部署。
 
- Apache Sqoop
 
- 特点:专门用于在 Hadoop 和关系型数据库之间传输数据,支持增量抽取。
 - 适用场景:适合大数据场景,尤其是 Hadoop 生态系统的数据抽取。
 
- Logstash
 
- 特点:主要用于日志数据的抽取和传输,支持多种输入和输出插件。
 - 适用场景:适合日志数据的实时抽取和处理。
 
数据抽取例子
一、数据抽取的场景
假设我们需要从以下三个数据源中抽取数据:
- MySQL 数据库:抽取用户表(
users)中的数据。 - CSV 文件:抽取一个包含订单信息的文件(
orders.csv)。 - API:从一个公开的 API 中抽取天气数据。
 
二、数据抽取的实现
1. 从 MySQL 数据库抽取数据
- 工具:Python + 
pymysql或SQLAlchemy。 - 步骤: 
- 连接数据库。
 - 执行 SQL 查询。
 - 将查询结果保存到 DataFrame 或文件中。
 
 
import pandas as pd
from sqlalchemy import create_engine# 数据库连接配置
db_config = {'host': 'localhost','user': 'root','password': 'password','database': 'test_db'
}# 创建数据库连接
engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")# 执行 SQL 查询
query = "SELECT * FROM users"  # 假设 users 表包含 id, name, age 字段
df_users = pd.read_sql(query, engine)# 输出结果
print("从 MySQL 抽取的用户数据:")
print(df_users)
 
2. 从 CSV 文件抽取数据
- 工具:Python + 
pandas。 - 步骤: 
- 读取 CSV 文件。
 - 将数据加载到 DataFrame 中。
 
 
import pandas as pd# 读取 CSV 文件
df_orders = pd.read_csv('orders.csv')  # 假设 orders.csv 包含 order_id, user_id, amount 字段# 输出结果
print("从 CSV 文件抽取的订单数据:")
print(df_orders)
 
3. 从 API 抽取数据
- 工具:Python + 
requests。 - 步骤: 
- 发送 HTTP 请求到 API。
 - 解析返回的 JSON 数据。
 - 将数据保存到 DataFrame 中。
 
 
import requests
import pandas as pd# API 配置
api_url = "https://api.weatherapi.com/v1/current.json"
api_key = "your_api_key"  # 替换为你的 API Key
params = {'key': api_key,'q': 'Beijing'  # 查询北京的天气
}# 发送 HTTP 请求
response = requests.get(api_url, params=params)
data = response.json()  # 解析 JSON 数据# 将数据保存到 DataFrame
weather_data = {'location': data['location']['name'],'temperature': data['current']['temp_c'],'condition': data['current']['condition']['text']
}
df_weather = pd.DataFrame([weather_data])# 输出结果
print("从 API 抽取的天气数据:")
print(df_weather) 
三、完整代码示例
以下是整合了上述三种数据抽取方式的完整代码:
import pandas as pd
from sqlalchemy import create_engine
import requests# 1. 从 MySQL 数据库抽取数据
def extract_from_mysql():# 数据库连接配置db_config = {'host': 'localhost','user': 'root','password': 'password','database': 'test_db'}# 创建数据库连接engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")# 执行 SQL 查询query = "SELECT * FROM users"df_users = pd.read_sql(query, engine)return df_users# 2. 从 CSV 文件抽取数据
def extract_from_csv():df_orders = pd.read_csv('orders.csv')return df_orders# 3. 从 API 抽取数据
def extract_from_api():api_url = "https://api.weatherapi.com/v1/current.json"api_key = "your_api_key"  # 替换为你的 API Keyparams = {'key': api_key,'q': 'Beijing'}response = requests.get(api_url, params=params)data = response.json()weather_data = {'location': data['location']['name'],'temperature': data['current']['temp_c'],'condition': data['current']['condition']['text']}df_weather = pd.DataFrame([weather_data])return df_weather# 主函数
if __name__ == "__main__":# 抽取数据df_users = extract_from_mysql()df_orders = extract_from_csv()df_weather = extract_from_api()# 输出结果print("从 MySQL 抽取的用户数据:")print(df_users)print("\n从 CSV 文件抽取的订单数据:")print(df_orders)print("\n从 API 抽取的天气数据:")print(df_weather) 
四、运行结果
假设数据如下:
- MySQL 用户表:
 
| id | name | age | 
|---|---|---|
| 1 | Alice | 25 | 
| 2 | Bob | 30 | 
- CSV 订单文件:
 
| order_id | user_id | amount | 
|---|---|---|
| 101 | 1 | 100.0 | 
| 102 | 2 | 200.0 | 
- API 天气数据:
 
| location | temperature | condition | 
|---|---|---|
| Beijing | 20.0 | Sunny | 
运行代码后,输出如下:
从 MySQL 抽取的用户数据:id   name  age
0   1  Alice   25
1   2    Bob   30从 CSV 文件抽取的订单数据:order_id  user_id  amount
0       101        1   100.0
1       102        2   200.0从 API 抽取的天气数据:location  temperature condition
0  Beijing         20.0     Sunny 
五、总结
数据抽取是 ETL 流程的第一步,通常涉及从多种数据源(如数据库、文件、API)中提取数据。通过 Python 和相关库(如 pandas、SQLAlchemy、requests),可以轻松实现数据抽取任务。你可以根据实际需求扩展这个例子,比如支持增量抽取、处理异常情况等。希望这个例子对你有帮助!
原文地址:码农小站
 公众号:码农小站
