머신러닝을 위한 Python 데이터 수집 코드를 다양한 소스별 코드 예제

요즘 범정부, 사회, 산업계 등등 AI에 대한 관심이 많으며, Python 을 통한 데이터 수집에 대한 관심도 많아졌습니다.

필자는 AI를 위한 원석 - 데이터 수집에 대한 python 코드를 예제로 작성해서 여러분에게 도움을 주고자 합니다

 

머신러닝을 위한 Python 데이터 수집 코드 

 

📊 포함된 데이터 수집 방법

1. 웹 스크래핑 🕷️

  • BeautifulSoup으로 뉴스, 리뷰 수집
  • HTML 파싱 및 구조화

2. API 데이터 수집 🌐

  • 날씨 데이터 (OpenWeatherMap)
  • 주식 데이터 (Alpha Vantage)
  • REST API 통합

3. 이미지 데이터 🖼️

  • URL에서 이미지 다운로드
  • Google 이미지 검색 (Selenium)
  • 레이블링 자동화

4. 소셜 미디어 🐦

  • Twitter API 연동
  • 트윗 수집 및 분석

5. 파일 처리 📁

  • CSV/Excel 로드
  • 여러 파일 통합
  • 자동 정제

6. 데이터베이스 💾

  • SQLite, MySQL 연동
  • SQL 쿼리 실행

7. 실시간 스트리밍 📡

  • 센서 데이터 시뮬레이션
  • 버퍼 관리

8. 품질 검증

  • 결측치 확인
  • 중복 제거
  • 데이터 타입 검증

🚀 빠른 시작

 
 
python
# 1. 웹 스크래핑
scraper = WebScraper("https://example.com")
articles = scraper.scrape_news_articles("url")
scraper.save_to_csv(articles, "data.csv")

# 2. API 수집
collector = APIDataCollector(api_key="YOUR_KEY")
weather = collector.collect_weather_data("Seoul")

# 3. 파일 처리
df = FileDataProcessor.load_and_clean_csv("data.csv")

# 4. 품질 검증
DataValidator.validate_dataframe(df)

📦 필요한 라이브러리

 
 
bash
pip install requests beautifulsoup4 pandas numpy
pip install selenium tweepy opencv-python pillow
pip install pymysql sqlite3

 

아래는 예제코드를 작성해 드리니 참고하시면 좋을 것 같습니다. 기타 궁금하신 사항은 y2kpro@naver.com 으로 문의주세요 ^^

 

"""
머신러닝을 위한 Python 데이터 수집 코드 모음
다양한 소스에서 데이터를 수집하고 전처리하는 방법

필요한 라이브러리 설치:
pip install requests beautifulsoup4 selenium pandas numpy scikit-learn
pip install kaggle tweepy opencv-python pillow
"""

import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
import json
import csv
import time
from datetime import datetime
import os

# ============================================
# 1. 웹 스크래핑 (BeautifulSoup)
# ============================================

class WebScraper:
    """웹 페이지에서 데이터 수집"""
    
    def __init__(self, base_url):
        self.base_url = base_url
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
    
    def scrape_news_articles(self, url):
        """뉴스 기사 수집 예제"""
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            soup = BeautifulSoup(response.content, 'html.parser')
            
            articles = []
            # 예: 뉴스 기사 추출 (실제 HTML 구조에 맞게 수정 필요)
            for article in soup.find_all('article', class_='news-item'):
                title = article.find('h2').text.strip() if article.find('h2') else ''
                content = article.find('p', class_='content').text.strip() if article.find('p', class_='content') else ''
                date = article.find('time').text.strip() if article.find('time') else ''
                
                articles.append({
                    'title': title,
                    'content': content,
                    'date': date,
                    'url': url
                })
            
            return articles
        
        except Exception as e:
            print(f"스크래핑 에러: {e}")
            return []
    
    def scrape_product_reviews(self, product_url):
        """제품 리뷰 수집"""
        reviews = []
        try:
            response = requests.get(product_url, headers=self.headers)
            soup = BeautifulSoup(response.content, 'html.parser')
            
            for review in soup.find_all('div', class_='review'):
                rating = review.find('span', class_='rating')
                text = review.find('p', class_='review-text')
                
                if rating and text:
                    reviews.append({
                        'rating': rating.text.strip(),
                        'review': text.text.strip(),
                        'timestamp': datetime.now().isoformat()
                    })
            
            return reviews
        
        except Exception as e:
            print(f"리뷰 수집 에러: {e}")
            return []
    
    def save_to_csv(self, data, filename):
        """CSV 파일로 저장"""
        if data:
            df = pd.DataFrame(data)
            df.to_csv(filename, index=False, encoding='utf-8-sig')
            print(f"데이터 저장 완료: {filename}")

# ============================================
# 2. API를 통한 데이터 수집
# ============================================

class APIDataCollector:
    """REST API를 통한 데이터 수집"""
    
    def __init__(self, api_key=None):
        self.api_key = api_key
    
    def collect_weather_data(self, city, days=7):
        """날씨 API 데이터 수집 (예: OpenWeatherMap)"""
        base_url = "http://api.openweathermap.org/data/2.5/forecast"
        
        params = {
            'q': city,
            'appid': self.api_key,
            'units': 'metric',
            'cnt': days * 8  # 3시간 간격
        }
        
        try:
            response = requests.get(base_url, params=params)
            data = response.json()
            
            weather_data = []
            for item in data.get('list', []):
                weather_data.append({
                    'timestamp': item['dt_txt'],
                    'temperature': item['main']['temp'],
                    'humidity': item['main']['humidity'],
                    'pressure': item['main']['pressure'],
                    'weather': item['weather'][0]['description'],
                    'wind_speed': item['wind']['speed']
                })
            
            return pd.DataFrame(weather_data)
        
        except Exception as e:
            print(f"날씨 데이터 수집 에러: {e}")
            return pd.DataFrame()
    
    def collect_stock_data(self, symbol, start_date, end_date):
        """주식 데이터 수집 (예: Alpha Vantage)"""
        # 실제 구현 시 API 키와 엔드포인트 필요
        url = f"https://www.alphavantage.co/query"
        params = {
            'function': 'TIME_SERIES_DAILY',
            'symbol': symbol,
            'apikey': self.api_key,
            'outputsize': 'full'
        }
        
        try:
            response = requests.get(url, params=params)
            data = response.json()
            
            time_series = data.get('Time Series (Daily)', {})
            
            stock_data = []
            for date, values in time_series.items():
                stock_data.append({
                    'date': date,
                    'open': float(values['1. open']),
                    'high': float(values['2. high']),
                    'low': float(values['3. low']),
                    'close': float(values['4. close']),
                    'volume': int(values['5. volume'])
                })
            
            df = pd.DataFrame(stock_data)
            df['date'] = pd.to_datetime(df['date'])
            df = df.sort_values('date')
            
            return df
        
        except Exception as e:
            print(f"주식 데이터 수집 에러: {e}")
            return pd.DataFrame()

# ============================================
# 3. 이미지 데이터 수집
# ============================================

class ImageDataCollector:
    """이미지 데이터 수집 및 저장"""
    
    def __init__(self, save_dir='images'):
        self.save_dir = save_dir
        os.makedirs(save_dir, exist_ok=True)
    
    def download_images_from_urls(self, image_urls, labels=None):
        """URL 리스트에서 이미지 다운로드"""
        downloaded = []
        
        for idx, url in enumerate(image_urls):
            try:
                response = requests.get(url, stream=True, timeout=10)
                response.raise_for_status()
                
                # 파일명 생성
                label = labels[idx] if labels else 'unknown'
                filename = f"{label}_{idx}_{int(time.time())}.jpg"
                filepath = os.path.join(self.save_dir, filename)
                
                # 이미지 저장
                with open(filepath, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                
                downloaded.append({
                    'filename': filename,
                    'label': label,
                    'url': url
                })
                
                print(f"다운로드 완료: {filename}")
                time.sleep(0.5)  # API 제한 고려
                
            except Exception as e:
                print(f"이미지 다운로드 실패 ({url}): {e}")
        
        return downloaded
    
    def search_and_download_google_images(self, query, num_images=100):
        """Google 이미지 검색 및 다운로드 (Selenium 필요)"""
        # 실제 구현에는 Selenium과 ChromeDriver 필요
        # 여기서는 개념적 구조만 제시
        
        from selenium import webdriver
        from selenium.webdriver.common.by import By
        import time
        
        driver = webdriver.Chrome()
        search_url = f"https://www.google.com/search?q={query}&tbm=isch"
        driver.get(search_url)
        
        # 스크롤하여 이미지 로드
        for _ in range(5):
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(2)
        
        # 이미지 URL 수집
        images = driver.find_elements(By.CSS_SELECTOR, "img.rg_i")
        image_urls = []
        
        for img in images[:num_images]:
            try:
                img.click()
                time.sleep(1)
                # 큰 이미지 URL 추출 로직
                # 실제 구현 필요
            except:
                pass
        
        driver.quit()
        return image_urls

# ============================================
# 4. 텍스트 데이터 수집 (소셜 미디어)
# ============================================

class SocialMediaCollector:
    """소셜 미디어 데이터 수집"""
    
    def __init__(self, api_key, api_secret):
        self.api_key = api_key
        self.api_secret = api_secret
    
    def collect_twitter_data(self, query, count=100):
        """Twitter API를 통한 트윗 수집"""
        # tweepy 라이브러리 사용
        try:
            import tweepy
            
            auth = tweepy.OAuthHandler(self.api_key, self.api_secret)
            api = tweepy.API(auth)
            
            tweets = []
            for tweet in tweepy.Cursor(api.search_tweets, q=query, lang='ko').items(count):
                tweets.append({
                    'text': tweet.text,
                    'created_at': tweet.created_at,
                    'user': tweet.user.screen_name,
                    'retweets': tweet.retweet_count,
                    'likes': tweet.favorite_count
                })
            
            return pd.DataFrame(tweets)
        
        except Exception as e:
            print(f"Twitter 데이터 수집 에러: {e}")
            return pd.DataFrame()

# ============================================
# 5. CSV/Excel 파일 처리
# ============================================

class FileDataProcessor:
    """파일 기반 데이터 처리"""
    
    @staticmethod
    def load_and_clean_csv(filepath, encoding='utf-8'):
        """CSV 파일 로드 및 기본 정제"""
        try:
            df = pd.read_csv(filepath, encoding=encoding)
            
            # 기본 정제
            df = df.drop_duplicates()  # 중복 제거
            df = df.dropna(thresh=len(df.columns) * 0.5)  # 50% 이상 결측치 행 제거
            
            print(f"데이터 로드 완료: {len(df)} 행")
            print(f"컬럼: {df.columns.tolist()}")
            print(f"\n결측치 정보:\n{df.isnull().sum()}")
            
            return df
        
        except Exception as e:
            print(f"파일 로드 에러: {e}")
            return None
    
    @staticmethod
    def process_multiple_files(directory, file_pattern='*.csv'):
        """디렉토리 내 여러 파일 통합 처리"""
        import glob
        
        all_files = glob.glob(os.path.join(directory, file_pattern))
        
        df_list = []
        for file in all_files:
            df = pd.read_csv(file)
            df['source_file'] = os.path.basename(file)
            df_list.append(df)
        
        if df_list:
            combined_df = pd.concat(df_list, ignore_index=True)
            print(f"총 {len(all_files)}개 파일, {len(combined_df)}개 행 통합")
            return combined_df
        
        return pd.DataFrame()

# ============================================
# 6. 데이터베이스에서 수집
# ============================================

class DatabaseCollector:
    """데이터베이스에서 데이터 수집"""
    
    def __init__(self, db_type='sqlite'):
        self.db_type = db_type
    
    def collect_from_sqlite(self, db_path, query):
        """SQLite 데이터베이스에서 데이터 수집"""
        import sqlite3
        
        try:
            conn = sqlite3.connect(db_path)
            df = pd.read_sql_query(query, conn)
            conn.close()
            
            print(f"데이터베이스에서 {len(df)}개 행 수집")
            return df
        
        except Exception as e:
            print(f"데이터베이스 수집 에러: {e}")
            return pd.DataFrame()
    
    def collect_from_mysql(self, host, user, password, database, query):
        """MySQL 데이터베이스에서 데이터 수집"""
        try:
            import pymysql
            
            connection = pymysql.connect(
                host=host,
                user=user,
                password=password,
                database=database
            )
            
            df = pd.read_sql_query(query, connection)
            connection.close()
            
            return df
        
        except Exception as e:
            print(f"MySQL 수집 에러: {e}")
            return pd.DataFrame()

# ============================================
# 7. 실시간 스트리밍 데이터 수집
# ============================================

class StreamingDataCollector:
    """실시간 스트리밍 데이터 수집"""
    
    def __init__(self, buffer_size=1000):
        self.buffer_size = buffer_size
        self.data_buffer = []
    
    def collect_sensor_data(self, duration_seconds=60):
        """센서 데이터 시뮬레이션"""
        import random
        
        start_time = time.time()
        collected_data = []
        
        while time.time() - start_time < duration_seconds:
            # 센서 데이터 시뮬레이션
            data_point = {
                'timestamp': datetime.now().isoformat(),
                'temperature': random.uniform(20, 30),
                'humidity': random.uniform(40, 70),
                'pressure': random.uniform(1010, 1020)
            }
            
            collected_data.append(data_point)
            time.sleep(0.1)  # 0.1초마다 수집
        
        return pd.DataFrame(collected_data)

# ============================================
# 8. 사용 예제
# ============================================

def main():
    """데이터 수집 파이프라인 예제"""
    
    # 1. 웹 스크래핑
    print("=== 웹 스크래핑 ===")
    scraper = WebScraper("https://example.com")
    # articles = scraper.scrape_news_articles("https://example.com/news")
    # scraper.save_to_csv(articles, "news_data.csv")
    
    # 2. API 데이터 수집
    print("\n=== API 데이터 수집 ===")
    api_collector = APIDataCollector(api_key="YOUR_API_KEY")
    # weather_df = api_collector.collect_weather_data("Seoul", days=7)
    # weather_df.to_csv("weather_data.csv", index=False)
    
    # 3. 이미지 수집
    print("\n=== 이미지 수집 ===")
    image_collector = ImageDataCollector(save_dir="dataset/images")
    image_urls = [
        "https://example.com/image1.jpg",
        "https://example.com/image2.jpg"
    ]
    labels = ["cat", "dog"]
    # downloaded = image_collector.download_images_from_urls(image_urls, labels)
    
    # 4. CSV 파일 처리
    print("\n=== CSV 데이터 처리 ===")
    # df = FileDataProcessor.load_and_clean_csv("data.csv")
    
    # 5. 데이터베이스 수집
    print("\n=== 데이터베이스 수집 ===")
    db_collector = DatabaseCollector()
    # df = db_collector.collect_from_sqlite("database.db", "SELECT * FROM users")
    
    # 6. 스트리밍 데이터
    print("\n=== 스트리밍 데이터 수집 ===")
    streaming = StreamingDataCollector()
    # sensor_df = streaming.collect_sensor_data(duration_seconds=10)
    # print(sensor_df.head())
    
    print("\n데이터 수집 완료!")

# ============================================
# 9. 데이터 품질 검증
# ============================================

class DataValidator:
    """수집된 데이터 품질 검증"""
    
    @staticmethod
    def validate_dataframe(df):
        """데이터프레임 품질 검증"""
        report = {
            'total_rows': len(df),
            'total_columns': len(df.columns),
            'missing_values': df.isnull().sum().to_dict(),
            'duplicate_rows': df.duplicated().sum(),
            'data_types': df.dtypes.to_dict()
        }
        
        print("=== 데이터 품질 리포트 ===")
        print(f"총 행 수: {report['total_rows']}")
        print(f"총 컬럼 수: {report['total_columns']}")
        print(f"중복 행 수: {report['duplicate_rows']}")
        print(f"\n결측치:\n{pd.Series(report['missing_values'])}")
        
        return report

if __name__ == "__main__":
    main()
TAGS.

Comments