source

Python을 사용하여 sqlite3 데이터베이스 테이블로 CSV 파일 가져오기

ittop 2023. 7. 16. 17:58
반응형

Python을 사용하여 sqlite3 데이터베이스 테이블로 CSV 파일 가져오기

CSV 파일이 있는데 Python을 사용하여 이 파일을 내 sqlite3 데이터베이스로 대량 가져오기를 원합니다.명령어는 "import..."이지만 이렇게 작동할 수는 없는 것 같습니다.sqlite3에서 어떻게 하는지 예를 들어줄 수 있는 사람?저는 만약을 위해 창문을 사용하고 있습니다.감사해요.

import csv, sqlite3

con = sqlite3.connect(":memory:") # change to 'sqlite:///your_filename.db'
cur = con.cursor()
cur.execute("CREATE TABLE t (col1, col2);") # use your column names here

with open('data.csv','r') as fin: # `with` statement available in 2.5+
    # csv.DictReader uses first line in file for column headings by default
    dr = csv.DictReader(fin) # comma is default delimiter
    to_db = [(i['col1'], i['col2']) for i in dr]

cur.executemany("INSERT INTO t (col1, col2) VALUES (?, ?);", to_db)
con.commit()
con.close()

디스크에 있는 파일에 대한 SQLite 연결을 만드는 것은 독자를 위한 연습으로 남겨집니다... 그러나 이제 판다 라이브러리에서 가능한 두 개의 라이너가 있습니다.

df = pandas.read_csv(csvfile)
df.to_sql(table_name, conn, if_exists='append', index=False)

이 맞아요..importSQLite3 명령줄 프로그램의 명령어입니다.이 질문에 대한 많은 상위 답변은 네이티브 파이썬 루프를 포함하지만, 파일이 큰 경우(내 파일은 10^6 - 10^7 레코드) 판다로 모든 내용을 읽거나 네이티브 파이썬 목록 이해/루프를 사용하는 것을 피하고 싶습니다(비록 비교를 위해 시간을 두지는 않았지만).

대용량 파일의 경우 가장 좋은 방법은subprocess.run()sqlite의 가져오기 명령을 실행합니다.아래 예제에서는 테이블이 이미 존재하지만 csv 파일의 첫 번째 행에 헤더가 있다고 가정합니다.자세한 내용은 문서를 참조하십시오.

subprocess.run()

from pathlib import Path
db_name = Path('my.db').resolve()
csv_file = Path('file.csv').resolve()
result = subprocess.run(['sqlite3',
                         str(db_name),
                         '-cmd',
                         '.mode csv',
                         '.import --skip 1 ' + str(csv_file).replace('\\','\\\\')
                                 +' <table_name>'],
                        capture_output=True)

참고:: sqlite3의 파일.import명령어는 첫 번째 행을 헤더 이름으로 처리하거나 번째 x 행을 건너뛸 수 있도록 개선되었습니다(이 답변에 명시된 대로 버전 >=3.32).이전 버전의 sqlite3이 있는 경우 테이블을 먼저 생성한 다음 가져오기 전에 csv의 첫 번째 행을 제거해야 할 수 있습니다.--skip 1는 3.32 이전의 합니다.

합니다.
는 명행에서찾명는다은같다습니음과입니다.sqlite3 my.db -cmd ".mode csv" ".import file.csv table".subprocess.run()명령줄 프로세스를 실행합니다.에 대한 입니다.subprocess.run()명령으로 해석되고 모든 인수가 이어지는 문자열의 시퀀스입니다.

  • sqlite3 my.db.
  • -cmd데이터베이스 뒤에 플래그를 지정하면 여러 follow on 명령을 sqlite 프로그램에 전달할 수 있습니다.셸에서 각 명령은 따옴표로 묶어야 하지만 여기서는 시퀀스의 고유한 요소가 되기만 하면 됩니다.
  • '.mode csv'.
  • '.import --skip 1'+str(csv_file).replace('\\','\\\\')+' <table_name>' command.import 명령입니다
    프로세스는 작업을 에 하기 때문에 다음 작업을 수행합니다.-cmd따옴표로 묶은 문자열로, Windows 디렉토리 경로가 있는 경우 백슬래시를 두 배로 늘려야 합니다.

머리글 제거

질문의 요점은 아니지만, 제가 사용한 것은 이렇습니다.다시 말하지만, 어떤 시점에서도 전체 파일을 메모리로 읽고 싶지 않았습니다.

with open(csv, "r") as source:
    source.readline()
    with open(str(csv)+"_nohead", "w") as target:
        shutil.copyfileobj(source, target)

내 2센트(더 일반적):

import csv, sqlite3
import logging

def _get_col_datatypes(fin):
    dr = csv.DictReader(fin) # comma is default delimiter
    fieldTypes = {}
    for entry in dr:
        feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]
        if not feildslLeft: break # We're done
        for field in feildslLeft:
            data = entry[field]

            # Need data to decide
            if len(data) == 0:
                continue

            if data.isdigit():
                fieldTypes[field] = "INTEGER"
            else:
                fieldTypes[field] = "TEXT"
        # TODO: Currently there's no support for DATE in sqllite

    if len(feildslLeft) > 0:
        raise Exception("Failed to find all the columns data types - Maybe some are empty?")

    return fieldTypes


def escapingGenerator(f):
    for line in f:
        yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")


def csvToDb(csvFile, outputToFile = False):
    # TODO: implement output to file

    with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
        dt = _get_col_datatypes(fin)

        fin.seek(0)

        reader = csv.DictReader(fin)

        # Keep the order of the columns name just as in the CSV
        fields = reader.fieldnames
        cols = []

        # Set field and type
        for f in fields:
            cols.append("%s %s" % (f, dt[f]))

        # Generate create table statement:
        stmt = "CREATE TABLE ads (%s)" % ",".join(cols)

        con = sqlite3.connect(":memory:")
        cur = con.cursor()
        cur.execute(stmt)

        fin.seek(0)


        reader = csv.reader(escapingGenerator(fin))

        # Generate insert statement:
        stmt = "INSERT INTO ads VALUES(%s);" % ','.join('?' * len(cols))

        cur.executemany(stmt, reader)
        con.commit()

    return con

.importcommand는 sqlite3 명령줄 도구의 기능입니다.Python에서 이를 수행하려면 csv 모듈과 같은 Python이 가진 모든 기능을 사용하여 데이터를 로드하고 평소처럼 데이터를 삽입하기만 하면 됩니다.

이렇게 하면 sqlite3의 문서화되지 않은 동작에 의존하지 않고 삽입할 유형을 제어할 수 있습니다.

버니의 답변에 감사드립니다!약간 수정해야 했습니다. 제게 효과가 있었던 것은 다음과 같습니다.

import csv, sqlite3
conn = sqlite3.connect("pcfc.sl3")
curs = conn.cursor()
curs.execute("CREATE TABLE PCFC (id INTEGER PRIMARY KEY, type INTEGER, term TEXT, definition TEXT);")
reader = csv.reader(open('PC.txt', 'r'), delimiter='|')
for row in reader:
    to_db = [unicode(row[0], "utf8"), unicode(row[1], "utf8"), unicode(row[2], "utf8")]
    curs.execute("INSERT INTO PCFC (type, term, definition) VALUES (?, ?, ?);", to_db)
conn.commit()

내 텍스트 파일(PC.txt)은 다음과 같습니다.

1 | Term 1 | Definition 1
2 | Term 2 | Definition 2
3 | Term 3 | Definition 3
#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys, csv, sqlite3

def main():
    con = sqlite3.connect(sys.argv[1]) # database file input
    cur = con.cursor()
    cur.executescript("""
        DROP TABLE IF EXISTS t;
        CREATE TABLE t (COL1 TEXT, COL2 TEXT);
        """) # checks to see if table exists and makes a fresh table.

    with open(sys.argv[2], "rb") as f: # CSV file input
        reader = csv.reader(f, delimiter=',') # no header information with delimiter
        for row in reader:
            to_db = [unicode(row[0], "utf8"), unicode(row[1], "utf8")] # Appends data from CSV file representing and handling of text
            cur.execute("INSERT INTO neto (COL1, COL2) VALUES(?, ?);", to_db)
            con.commit()
    con.close() # closes connection to database

if __name__=='__main__':
    main()
"""
cd Final_Codes
python csv_to_db.py
CSV to SQL DB
"""

import csv
import sqlite3
import os
import fnmatch

UP_FOLDER = os.path.dirname(os.getcwd())
DATABASE_FOLDER = os.path.join(UP_FOLDER, "Databases")
DBNAME = "allCompanies_database.db"


def getBaseNameNoExt(givenPath):
    """Returns the basename of the file without the extension"""
    filename = os.path.splitext(os.path.basename(givenPath))[0]
    return filename


def find(pattern, path):
    """Utility to find files wrt a regex search"""
    result = []
    for root, dirs, files in os.walk(path):
        for name in files:
            if fnmatch.fnmatch(name, pattern):
                result.append(os.path.join(root, name))
    return result


if __name__ == "__main__":
    Database_Path = os.path.join(DATABASE_FOLDER, DBNAME)
    # change to 'sqlite:///your_filename.db'
    csv_files = find('*.csv', DATABASE_FOLDER)

    con = sqlite3.connect(Database_Path)
    cur = con.cursor()
    for each in csv_files:
        with open(each, 'r') as fin:  # `with` statement available in 2.5+
            # csv.DictReader uses first line in file for column headings by default
            dr = csv.DictReader(fin)  # comma is default delimiter
            TABLE_NAME = getBaseNameNoExt(each)
            Cols = dr.fieldnames
            numCols = len(Cols)
            """
            for i in dr:
                print(i.values())
            """
            to_db = [tuple(i.values()) for i in dr]
            print(TABLE_NAME)
            # use your column names here
            ColString = ','.join(Cols)
            QuestionMarks = ["?"] * numCols
            ToAdd = ','.join(QuestionMarks)
            cur.execute(f"CREATE TABLE {TABLE_NAME} ({ColString});")
            cur.executemany(
                f"INSERT INTO {TABLE_NAME} ({ColString}) VALUES ({ToAdd});", to_db)
            con.commit()
    con.close()
    print("Execution Complete!")

이 기능은 폴더에 단일 .db 파일로 변환하려는 csv 파일이 많을 때 유용합니다!

파일 이름, 테이블 이름 또는 필드 이름(열 이름)을 미리 알 필요는 없습니다!

CSV 파일이 정말 큰 경우에 사용할 수 있는 솔루션은 다음과 같습니다.사용하다to_sql다른 답변에서 제안한 대로 청크 크기를 설정하여 전체 파일을 한 번에 처리하지 않도록 합니다.

import sqlite3
import pandas as pd

conn = sqlite3.connect('my_data.db')
c = conn.cursor()
users = pd.read_csv('users.csv')
users.to_sql('users', conn, if_exists='append', index = False, chunksize = 10000)

여기에 설명된 대로 Dask를 사용하여 많은 Panda DataFrames를 병렬로 작성할 수도 있습니다.

dto_sql = dask.delayed(pd.DataFrame.to_sql)
out = [dto_sql(d, 'table_name', db_url, if_exists='append', index=True)
       for d in ddf.to_delayed()]
dask.compute(*out)

자세한 내용은 여기를 참조하십시오.

프로그램의 CSV 파일을 할 수 .os.system다음과 같이 제안된 노선을 따라:

import os

cmd = """sqlite3 database.db <<< ".import input.csv mytable" """

rc = os.system(cmd)

print(rc)

요점은 데이터베이스의 파일 이름을 지정하면 데이터를 읽는 데 오류가 없을 경우 데이터가 자동으로 저장된다는 것입니다.

Guy L 솔루션(좋아요)을 기반으로 하지만 이스케이프 필드를 처리할 수 있습니다.

import csv, sqlite3

def _get_col_datatypes(fin):
    dr = csv.DictReader(fin) # comma is default delimiter
    fieldTypes = {}
    for entry in dr:
        feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]        
        if not feildslLeft: break # We're done
        for field in feildslLeft:
            data = entry[field]

            # Need data to decide
            if len(data) == 0:
                continue

            if data.isdigit():
                fieldTypes[field] = "INTEGER"
            else:
                fieldTypes[field] = "TEXT"
        # TODO: Currently there's no support for DATE in sqllite

    if len(feildslLeft) > 0:
        raise Exception("Failed to find all the columns data types - Maybe some are empty?")

    return fieldTypes


def escapingGenerator(f):
    for line in f:
        yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")


def csvToDb(csvFile,dbFile,tablename, outputToFile = False):

    # TODO: implement output to file

    with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
        dt = _get_col_datatypes(fin)

        fin.seek(0)

        reader = csv.DictReader(fin)

        # Keep the order of the columns name just as in the CSV
        fields = reader.fieldnames
        cols = []

        # Set field and type
        for f in fields:
            cols.append("\"%s\" %s" % (f, dt[f]))

        # Generate create table statement:
        stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
        print(stmt)
        con = sqlite3.connect(dbFile)
        cur = con.cursor()
        cur.execute(stmt)

        fin.seek(0)


        reader = csv.reader(escapingGenerator(fin))

        # Generate insert statement:
        stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))

        cur.executemany(stmt, reader)
        con.commit()
        con.close()

다음을 사용하여 이 작업을 수행할 수 있습니다.blaze&odo

import blaze as bz
csv_path = 'data.csv'
bz.odo(csv_path, 'sqlite:///data.db::data')

을 Odo는 CSV 파일에 합니다.data.db database) 에 있는 (database)data

또는 사용합니다.odo 없이 직접, 이없 없이blaze어느 쪽이든 좋습니다.설명서 읽기

다음은 CSV 헤더를 기준으로 필드 이름을 추가할 수도 있습니다.

import sqlite3

def csv_sql(file_dir,table_name,database_name):
    con = sqlite3.connect(database_name)
    cur = con.cursor()
    # Drop the current table by: 
    # cur.execute("DROP TABLE IF EXISTS %s;" % table_name)

    with open(file_dir, 'r') as fl:
        hd = fl.readline()[:-1].split(',')
        ro = fl.readlines()
        db = [tuple(ro[i][:-1].split(',')) for i in range(len(ro))]

    header = ','.join(hd)
    cur.execute("CREATE TABLE IF NOT EXISTS %s (%s);" % (table_name,header))
    cur.executemany("INSERT INTO %s (%s) VALUES (%s);" % (table_name,header,('?,'*len(hd))[:-1]), db)
    con.commit()
    con.close()

# Example:
csv_sql('./surveys.csv','survey','eco.db')

단순성을 위해 프로젝트의 Makefile에서 sqlite3 명령줄 도구를 사용할 수 있습니다.

%.sql3: %.csv
    rm -f $@
    sqlite3 $@ -echo -cmd ".mode csv" ".import $< $*"
%.dump: %.sql3
    sqlite3 $< "select * from $*"

make test.sql3그런 다음 단일 테이블 "test"를 사용하여 기존 test.csv 파일에서 sqlite 데이터베이스를 만듭니다.그러면 됩니다.make test.dump내용을 확인합니다.

이를 통해 CSV에서도 조인을 수행할 수 있습니다.

import sqlite3
import os
import pandas as pd
from typing import List

class CSVDriver:
    def __init__(self, table_dir_path: str):
        self.table_dir_path = table_dir_path  # where tables (ie. csv files) are located
        self._con = None

    @property
    def con(self) -> sqlite3.Connection:
        """Make a singleton connection to an in-memory SQLite database"""
        if not self._con:
            self._con = sqlite3.connect(":memory:")
        return self._con
    
    def _exists(self, table: str) -> bool:
        query = """
        SELECT name
        FROM sqlite_master 
        WHERE type ='table'
        AND name NOT LIKE 'sqlite_%';
        """
        tables = self.con.execute(query).fetchall()
        return table in tables

    def _load_table_to_mem(self, table: str, sep: str = None) -> None:
        """
        Load a CSV into an in-memory SQLite database
        sep is set to None in order to force pandas to auto-detect the delimiter
        """
        if self._exists(table):
            return
        file_name = table + ".csv"
        path = os.path.join(self.table_dir_path, file_name)
        if not os.path.exists(path):
            raise ValueError(f"CSV table {table} does not exist in {self.table_dir_path}")
        df = pd.read_csv(path, sep=sep, engine="python")  # set engine to python to skip pandas' warning
        df.to_sql(table, self.con, if_exists='replace', index=False, chunksize=10000)

    def query(self, query: str) -> List[tuple]:
        """
        Run an SQL query on CSV file(s). 
        Tables are loaded from table_dir_path
        """
        tables = extract_tables(query)
        for table in tables:
            self._load_table_to_mem(table)
        cursor = self.con.cursor()
        cursor.execute(query)
        records = cursor.fetchall()
        return records

extract_message:

import sqlparse
from sqlparse.sql import IdentifierList, Identifier,  Function
from sqlparse.tokens import Keyword, DML
from collections import namedtuple
import itertools

class Reference(namedtuple('Reference', ['schema', 'name', 'alias', 'is_function'])):
    __slots__ = ()

    def has_alias(self):
        return self.alias is not None

    @property
    def is_query_alias(self):
        return self.name is None and self.alias is not None

    @property
    def is_table_alias(self):
        return self.name is not None and self.alias is not None and not self.is_function

    @property
    def full_name(self):
        if self.schema is None:
            return self.name
        else:
            return self.schema + '.' + self.name

def _is_subselect(parsed):
    if not parsed.is_group:
        return False
    for item in parsed.tokens:
        if item.ttype is DML and item.value.upper() in ('SELECT', 'INSERT',
                                                        'UPDATE', 'CREATE', 'DELETE'):
            return True
    return False


def _identifier_is_function(identifier):
    return any(isinstance(t, Function) for t in identifier.tokens)


def _extract_from_part(parsed):
    tbl_prefix_seen = False
    for item in parsed.tokens:
        if item.is_group:
            for x in _extract_from_part(item):
                yield x
        if tbl_prefix_seen:
            if _is_subselect(item):
                for x in _extract_from_part(item):
                    yield x
            # An incomplete nested select won't be recognized correctly as a
            # sub-select. eg: 'SELECT * FROM (SELECT id FROM user'. This causes
            # the second FROM to trigger this elif condition resulting in a
            # StopIteration. So we need to ignore the keyword if the keyword
            # FROM.
            # Also 'SELECT * FROM abc JOIN def' will trigger this elif
            # condition. So we need to ignore the keyword JOIN and its variants
            # INNER JOIN, FULL OUTER JOIN, etc.
            elif item.ttype is Keyword and (
                    not item.value.upper() == 'FROM') and (
                    not item.value.upper().endswith('JOIN')):
                tbl_prefix_seen = False
            else:
                yield item
        elif item.ttype is Keyword or item.ttype is Keyword.DML:
            item_val = item.value.upper()
            if (item_val in ('COPY', 'FROM', 'INTO', 'UPDATE', 'TABLE') or
                    item_val.endswith('JOIN')):
                tbl_prefix_seen = True
        # 'SELECT a, FROM abc' will detect FROM as part of the column list.
        # So this check here is necessary.
        elif isinstance(item, IdentifierList):
            for identifier in item.get_identifiers():
                if (identifier.ttype is Keyword and
                        identifier.value.upper() == 'FROM'):
                    tbl_prefix_seen = True
                    break


def _extract_table_identifiers(token_stream):
    for item in token_stream:
        if isinstance(item, IdentifierList):
            for ident in item.get_identifiers():
                try:
                    alias = ident.get_alias()
                    schema_name = ident.get_parent_name()
                    real_name = ident.get_real_name()
                except AttributeError:
                    continue
                if real_name:
                    yield Reference(schema_name, real_name,
                                    alias, _identifier_is_function(ident))
        elif isinstance(item, Identifier):
            yield Reference(item.get_parent_name(), item.get_real_name(),
                            item.get_alias(), _identifier_is_function(item))
        elif isinstance(item, Function):
            yield Reference(item.get_parent_name(), item.get_real_name(),
                            item.get_alias(), _identifier_is_function(item))


def extract_tables(sql):
    # let's handle multiple statements in one sql string
    extracted_tables = []
    statements = list(sqlparse.parse(sql))
    for statement in statements:
        stream = _extract_from_part(statement)
        extracted_tables.append([ref.name for ref in _extract_table_identifiers(stream)])
    return list(itertools.chain(*extracted_tables))

(제)를 가정함)account.csv그리고.tojoin.csv에 존재하는./path/to/files):

db_path = r"/path/to/files"
driver = CSVDriver(db_path)
query = """
SELECT tojoin.col_to_join 
FROM account
LEFT JOIN tojoin
ON account.a = tojoin.a
"""
driver.query(query)
import csv, sqlite3

def _get_col_datatypes(fin):
    dr = csv.DictReader(fin) # comma is default delimiter
    fieldTypes = {}
    for entry in dr:
        feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]        
        if not feildslLeft: break # We're done
        for field in feildslLeft:
            data = entry[field]

        # Need data to decide
        if len(data) == 0:
            continue

        if data.isdigit():
            fieldTypes[field] = "INTEGER"
        else:
            fieldTypes[field] = "TEXT"
    # TODO: Currently there's no support for DATE in sqllite

if len(feildslLeft) > 0:
    raise Exception("Failed to find all the columns data types - Maybe some are empty?")

return fieldTypes


def escapingGenerator(f):
    for line in f:
        yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")


def csvToDb(csvFile,dbFile,tablename, outputToFile = False):

    # TODO: implement output to file

    with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
        dt = _get_col_datatypes(fin)

        fin.seek(0)

        reader = csv.DictReader(fin)

        # Keep the order of the columns name just as in the CSV
        fields = reader.fieldnames
        cols = []

        # Set field and type
        for f in fields:
            cols.append("\"%s\" %s" % (f, dt[f]))

        # Generate create table statement:
        stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
        print(stmt)
        con = sqlite3.connect(dbFile)
        cur = con.cursor()
        cur.execute(stmt)

        fin.seek(0)


        reader = csv.reader(escapingGenerator(fin))

        # Generate insert statement:
        stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))

        cur.executemany(stmt, reader)
        con.commit()
        con.close()

메모리가 부족하지 않도록 CSV에서 데이터베이스로의 데이터 전송을 청크로 분할해야 할 수 있습니다.이 작업은 다음과 같이 수행할 수 있습니다.

import csv
import sqlite3
from operator import itemgetter

# Establish connection
conn = sqlite3.connect("mydb.db")

# Create the table 
conn.execute(
    """
    CREATE TABLE persons(
        person_id INTEGER,
        last_name TEXT, 
        first_name TEXT, 
        address TEXT
    )
    """
)

# These are the columns from the csv that we want
cols = ["person_id", "last_name", "first_name", "address"]

# If the csv file is huge, we instead add the data in chunks
chunksize = 10000

# Parse csv file and populate db in chunks
with conn, open("persons.csv") as f:
    reader = csv.DictReader(f)

    chunk = []
    for i, row in reader: 

        if i % chunksize == 0 and i > 0:
            conn.executemany(
                """
                INSERT INTO persons
                    VALUES(?, ?, ?, ?)
                """, chunk
            )
            chunk = []

        items = itemgetter(*cols)(row)
        chunk.append(items)

여기 제 버전이 있습니다. 변환할 '.csv' 파일을 선택하도록 요청하면 이미 작동합니다.

from multiprocessing import current_process
import pandas as pd
import sqlite3 
import os
from tkinter import Tk
from tkinter.filedialog import askopenfilename
from pathlib import Path

def csv_to_db(csv_filedir):

    if not Path(csv_filedir).is_file():                         # if needed ask for user input of CVS file
        current_path = os.getcwd()
        Tk().withdraw()                                     
        csv_filedir = askopenfilename(initialdir=current_path) 

    try:
        data = pd.read_csv(csv_filedir)                             # load CSV file
    except:
        print("Something went wrong when opening to the file")
        print(csv_filedir)

    csv_df = pd.DataFrame(data)
    csv_df = csv_df.fillna('NULL')                              # make NaN = to 'NULL' for SQL format

    [path,filename] = os.path.split(csv_filedir)                # define path and filename 
    [filename,_] = os.path.splitext(filename)
    database_filedir = os.path.join(path, filename + '.db')

    conn = sqlite3.connect(database_filedir)                    # connect to SQL server

    [fields_sql, header_sql_string] = create_sql_fields(csv_df)

    # CREATE EMPTY DATABASE
    create_sql = ''.join(['CREATE TABLE IF NOT EXISTS ' + filename + ' (' + fields_sql + ')'])
    cursor = conn.cursor()
    cursor.execute(create_sql)
    
    # INSERT EACH ROW IN THE SQL DATABASE
    for irow in csv_df.itertuples():
        insert_values_string = ''.join(['INSERT INTO ', filename, header_sql_string, ' VALUES ('])
        insert_sql = f"{insert_values_string} {irow[1]}, '{irow[2]}','{irow[3]}', {irow[4]}, '{irow[5]}' )"
        print(insert_sql)
        cursor.execute(insert_sql)

    # COMMIT CHANGES TO DATABASE AND CLOSE CONNECTION
    conn.commit()
    conn.close()

    print('\n' + csv_filedir + ' \n converted to \n' + database_filedir)

    return database_filedir


def create_sql_fields(df):                                          # gather the headers of the CSV and create two strings 
    fields_sql = []                                                 # str1 = var1 TYPE, va2, TYPE ...
    header_names = []                                               # str2 = var1, var2, var3, var4
    for col in range(0,len(df.columns)):
        fields_sql.append(df.columns[col])
        fields_sql.append(str(df.dtypes[col]))

        header_names.append(df.columns[col])
        if col != len(df.columns)-1:
            fields_sql.append(',')
            header_names.append(',')

    fields_sql = ' '.join(fields_sql)
    fields_sql = fields_sql.replace('int64','integer')
    fields_sql = fields_sql.replace('float64','integer')
    fields_sql = fields_sql.replace('object','text')

    header_sql_string = '(' + ''.join(header_names) + ')'
    
    return fields_sql, header_sql_string


csv_to_db('')

언급URL : https://stackoverflow.com/questions/2887878/importing-a-csv-file-into-a-sqlite3-database-table-using-python

반응형