source

한 컬렉션에서 다른 컬렉션으로 문서를 옮기다

ittop 2023. 3. 28. 22:35
반응형

한 컬렉션에서 다른 컬렉션으로 문서를 옮기다

MongoDB의 한 컬렉션에서 다른 컬렉션으로 문서를 이동하려면 어떻게 해야 합니까?예를 들어 다음과 같습니다.저는 컬렉션 A에 많은 문서를 가지고 있으며, 1개월 전의 모든 문서를 컬렉션 B로 옮기고 싶습니다(이 1개월 전의 문서는 컬렉션 A에 포함되지 않아야 합니다).

집계를 사용하여 복사할 수 있습니다.하지만 제가 하려는 것은 서류를 옮기는 입니다.문서를 이동하려면 어떤 방법을 사용할 수 있습니까?

표시된 벌크 연산 @markus-w-mahlberg는 (및 @mark-mullin의 세련됨) 효율적이지만 기술된 대로 안전하지 않습니다.대량으로삽입에 실패했지만 bulk Remove는 계속됩니다.이동할 때 기록이 손실되지 않도록 하려면 대신 다음을 사용하십시오.

function insertBatch(collection, documents) {
  var bulkInsert = collection.initializeUnorderedBulkOp();
  var insertedIds = [];
  var id;
  documents.forEach(function(doc) {
    id = doc._id;
    // Insert without raising an error for duplicates
    bulkInsert.find({_id: id}).upsert().replaceOne(doc);
    insertedIds.push(id);
  });
  bulkInsert.execute();
  return insertedIds;
}

function deleteBatch(collection, documents) {
  var bulkRemove = collection.initializeUnorderedBulkOp();
  documents.forEach(function(doc) {
    bulkRemove.find({_id: doc._id}).removeOne();
  });
  bulkRemove.execute();
}

function moveDocuments(sourceCollection, targetCollection, filter, batchSize) {
  print("Moving " + sourceCollection.find(filter).count() + " documents from " + sourceCollection + " to " + targetCollection);
  var count;
  while ((count = sourceCollection.find(filter).count()) > 0) {
    print(count + " documents remaining");
    sourceDocs = sourceCollection.find(filter).limit(batchSize);
    idsOfCopiedDocs = insertBatch(targetCollection, sourceDocs);

    targetDocs = targetCollection.find({_id: {$in: idsOfCopiedDocs}});
    deleteBatch(sourceCollection, targetDocs);
  }
  print("Done!")
}

업데이트 2

이 답변을 더 이상 상향 투표하지 마십시오.@jasongarber의 답변은 어떤 면에서든 더 좋습니다.

갱신하다

@jasongarber의 답변은 더 안전한 접근법이며, 제 답변 대신 사용해야 합니다.


1개월 이상 된 문서를 모두 이동시키고 mongoDB 2.6을 사용하는 경우 여러 작업을 수행하는 가장 효율적인 방법인 대량 작업을 사용하지 않을 이유가 없습니다.

> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
    function(doc){
      bulkInsert.insert(doc);
      bulkRemove.find({_id:doc._id}).removeOne();
    }
  )
> bulkInsert.execute()
> bulkRemove.execute()

이 방법은 매우 빠르며 벌크 삽입 중에 문제가 발생할 경우 원본 데이터가 여전히 존재한다는 장점이 있습니다.


편집

너무 하기 사용하다매번 할 수 .x다음 중 하나:

> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var x = 10000
> var counter = 0
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
    function(doc){
      bulkInsert.insert(doc);
      bulkRemove.find({_id:doc._id}).removeOne();
      counter ++
      if( counter % x == 0){
        bulkInsert.execute()
        bulkRemove.execute()
        bulkInsert = db.target.initializeUnorderedBulkOp()
        bulkRemove = db.source.initializeUnorderedBulkOp()
      }
    }
  )
> bulkInsert.execute()
> bulkRemove.execute()

삽입 및 제거:

var documentsToMove = db.collectionA.find({});
documentsToMove.forEach(function(doc) {
    db.collectionB.insert(doc);
    db.collectionA.remove(doc);
});

참고: 이 방법은 큰 문서를 보관하는 컬렉션 또는 컬렉션의 경우 매우 느릴 수 있습니다.

$out은 데이터를 사용하여 새 컬렉션을 만드는 데 사용되므로 $out을 사용하십시오.

db.oldCollection.aggregate([{$out : "newCollection"}])

그 후 드롭을 사용합니다.

db.oldCollection.drop()

범위 쿼리를 사용하여 sourceCollection에서 데이터를 가져오고 커서 데이터를 변수와 루프 상태로 유지한 후 대상 수집에 삽입할 수 있습니다.

 var doc = db.sourceCollection.find({
        "Timestamp":{
              $gte:ISODate("2014-09-01T00:00:00Z"),
              $lt:ISODate("2014-10-01T00:00:00Z")
        }
 });

 doc.forEach(function(doc){
    db.targetCollection.insert(doc);
 })

도움이 됐으면 좋겠다!!

첫 번째 옵션(mongo 덤프 사용)

1. 컬렉션에서 덤프를 가져옵니다.

mongodump -d db -c source_collection

2. 컬렉션에서 복원

mongorestore -d db -c target _ collection dir = mongore / db _ name / source _ collection . bson

두 번째 옵션

실행 중인 Aggregate

db.getCollection('source_collection').aggregate([ { $match: {"emailAddress": "apitester@mailinator.com"}}}}, { $out: "target_collection" } ])

세 번째 옵션(최저 속도)

통과 for 루프의 실행

db.getCollection('source_collection').find().Find().For Each(function(docs){ db.getCollection('target_collection')').insert(docs); } print("롤백 완료!");

성능 관점에서 문서를 하나씩 삭제하는 것보다 하나의 명령어(특히 쿼리 부분에 대한 인덱스가 있는 경우)를 사용하여 많은 문서를 제거하는 것이 좋습니다.

예를 들어 다음과 같습니다.

db.source.find({$gte: start, $lt: end}).forEach(function(doc){
   db.target.insert(doc);
});
db.source.remove({$gte: start, $lt: end});

이것은 @Markus W Mahlberg의 개작입니다.

호의에 대한 보답 - 함

function moveDocuments(sourceCollection,targetCollection,filter) {
    var bulkInsert = targetCollection.initializeUnorderedBulkOp();
    var bulkRemove = sourceCollection.initializeUnorderedBulkOp();
    sourceCollection.find(filter)
        .forEach(function(doc) {
        bulkInsert.insert(doc);
        bulkRemove.find({_id:doc._id}).removeOne();
        }
  )
  bulkInsert.execute();
  bulkRemove.execute();
}

사용 예

var x = {dsid:{$exists: true}};
moveDocuments(db.pictures,db.artifacts,x)

최상위 요소 dsid가 있는 모든 문서를 사진에서 아티팩트 모음으로 이동하려면

최신 mongo 'bulkWrite' 조작(여기서 문서 읽기)을 사용하고 전체 프로세스를 비동기 상태로 유지하여 완료 여부에 따라 더 넓은 스크립트의 일부로 실행할 수 있도록 @jasongarber의 답변에 대한 업데이트를 보여 줍니다.

async function moveDocuments (sourceCollection, targetCollection, filter) {
  const sourceDocs = await sourceCollection.find(filter)

  console.log(`Moving ${await sourceDocs.count()} documents from ${sourceCollection.collectionName} to ${targetCollection.collectionName}`)

  const idsOfCopiedDocs = await insertDocuments(targetCollection, sourceDocs)

  const targetDocs = await targetCollection.find({_id: {$in: idsOfCopiedDocs}})
  await deleteDocuments(sourceCollection, targetDocs)

  console.log('Done!')
}

async function insertDocuments (collection, documents) {
  const insertedIds = []
  const bulkWrites = []

  await documents.forEach(doc => {
    const {_id} = doc

    insertedIds.push(_id)
    bulkWrites.push({
      replaceOne: {
        filter: {_id},
        replacement: doc,
        upsert: true,
      },
    })
  })

  if (bulkWrites.length) await collection.bulkWrite(bulkWrites, {ordered: false})

  return insertedIds
}

async function deleteDocuments (collection, documents) {
  const bulkWrites = []

  await documents.forEach(({_id}) => {
    bulkWrites.push({
      deleteOne: {
        filter: {_id},
      },
    })
  })

  if (bulkWrites.length) await collection.bulkWrite(bulkWrites, {ordered: false})
}

MongoDB 3.0 이상에서는 다음 구문을 사용하여 copyTo 명령을 사용할 수 있습니다.

db.source_collection.copyTo("target_collection")

다음으로 drop 명령어를 사용하여 오래된 컬렉션을 삭제할 수 있습니다.

db.source_collection.drop()

저는 @markus-w-mahlberg의 답변을 좋아하지만, 때로는 사람들을 위해 조금 더 심플하게 할 필요가 있다는 것을 알게 되었습니다.그 때문에, 이하의 기능이 몇개인가 있습니다.그와 마찬가지로 벌크 오퍼레이터와 함께 작업을 수행할 수 있지만 이 코드는 Mongo의 신구 시스템과 동일하게 작동합니다.

function parseNS(ns){
    //Expects we are forcing people to not violate the rules and not doing "foodb.foocollection.month.day.year" if they do they need to use an array.
    if (ns instanceof Array){
        database =  ns[0];
        collection = ns[1];
    }
    else{
        tNS =  ns.split(".");
        if (tNS.length > 2){
            print('ERROR: NS had more than 1 period in it, please pass as an [ "dbname","coll.name.with.dots"] !');
            return false;
        }
        database = tNS[0];
        collection = tNS[1];
    }
    return {database: database,collection: collection};
}

function insertFromCollection( sourceNS,  destNS, query, batchSize, pauseMS){
    //Parse and check namespaces
    srcNS = parseNS(sourceNS);
    destNS = parseNS(destNS);
    if ( srcNS == false ||  destNS == false){return false;}

    batchBucket = new Array();
    totalToProcess = db.getDB(srcNS.database).getCollection(srcNS.collection).find(query,{_id:1}).count();
    currentCount = 0;
    print("Processed "+currentCount+"/"+totalToProcess+"...");
    db.getDB(srcNS.database).getCollection(srcNS.collection).find(query).addOption(DBQuery.Option.noTimeout).forEach(function(doc){
        batchBucket.push(doc);
        if ( batchBucket.length > batchSize){
            db.getDB(destNS.database).getCollection(destNS.collection)insert(batchBucket);
            currentCount += batchBucket.length;
            batchBucket = [];
            sleep (pauseMS);
            print("Processed "+currentCount+"/"+totalToProcess+"...");       
        }
    }
    print("Completed");
}

/** Example Usage:
        insertFromCollection("foo.bar","foo2.bar",{"type":"archive"},1000,20);    

'우리'를 수 요.db.getSiblingDB(srcNS.database).getCollection(srcNS.collection).remove(query,true)할 수 있습니다.이렇게 쉽게 코드를 만들어 다시 시작할 수 있습니다.

1500만 건의 문서를 위해 2297건의 컬렉션을 가지고 있었지만 일부 컬렉션은 비어 있었습니다.

복사만 사용스크립트는 실패했지만 이 스크립트의 최적화에 의해:

db.getCollectionNames().forEach(function(collname) {
    var c = db.getCollection(collname).count();
    if(c!==0){
      db.getCollection(collname).copyTo('master-collection');
      print('Copied collection ' + collname);
    }
});

다 잘 될 것 같아요

NB: copyTo는 읽기/쓰기 작업을 차단하기 때문에 권장되지 않습니다.따라서 이 작업 중에 데이터베이스를 사용할 수 없다는 것을 알고 있으면 충분하다고 생각합니다.

저 같은 경우에는 각각 효과가 없었습니다.그래서 나는 몇 가지 변화를 만들어야 했다.

var kittySchema = new mongoose.Schema({
name: String
});

var Kitten = mongoose.model('Kitten', kittySchema);

var catSchema = new mongoose.Schema({
name: String
});

var Cat = mongoose.model('Cat', catSchema);

이것은 두 컬렉션 모두에 대한 모델입니다.

`function Recursion(){
Kitten.findOne().lean().exec(function(error, results){
    if(!error){
        var objectResponse = results;
        var RequiredId = objectResponse._id;
        delete objectResponse._id;
        var swap = new Cat(objectResponse);
        swap.save(function (err) {
           if (err) {
               return err;
           }
           else {
               console.log("SUCCESSFULL");
               Kitten.deleteOne({ _id: RequiredId }, function(err) {
                if (!err) {
                        console.log('notification!');
                }
                else {
                        return err;
                }
            });
               Recursion();
           }
        });
    }
    if (err) {
        console.log("No object found");
        // return err;
    }
})
}`

저는 bulkinsert와 pymongo의 일괄 삭제 방법을 사용하여 한번에 1000개의 레코드를 획득할 계획이었습니다.

소스 및 타깃 모두

  1. mongodb 객체를 생성하여 데이터베이스에 연결합니다.

  2. 벌크 오브젝트를 인스턴스화합니다.참고: 벌크 개체의 백업도 만들었습니다.이렇게 하면 오류가 발생했을 때 삽입 또는 삭제를 롤백할 수 있습니다.예:

    소스용 // replace this with mongodb object creation logic source_db_obj = db_help.create_db_obj(source_db, source_col) source_bulk = source_db_obj.initialize_ordered_bulk_op() source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
    타겟의 경우 // replace this with mogodb object creation logic target_db_obj = db_help.create_db_obj(target_db, target_col) target_bulk = target_db_obj.initialize_ordered_bulk_op() target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()

  3. 필터 조건과 일치하는 소스 레코드를 가져옵니다.

    source_find_find = source_db_obj.find(필터)

  4. 소스 레코드를 루프합니다.

    대상 및 소스 대량 작업 생성

    현재 날짜/시간을 포함하는 archived_at 필드를 대상 컬렉션에 추가합니다.

    //replace this with the logic to obtain the UTCtime. doc['archived_at'] = db_help.getUTCTime() target_bulk.insert(document) source_bulk.remove(document)

    오류 또는 예외 발생 시 롤백하려면 target_bak 및 source_back 작업을 생성합니다.

    target_bulk_bak.find({'_id':doc['_id']}).remove_one() source_bulk_bak.insert(doc) //remove the extra column doc.pop('archieved_at', None)

  5. 레코드가 1000으로 카운트되면 타깃(벌크 삽입 및 소스 벌크 제거)을 실행합니다.주의: 이 메서드는 target_bulk 및 source_bulk 개체를 실행합니다.

    execute_insert_remove(source_module, target_module)

  6. 예외가 발생하면 target_bulk_bak 제거 및 source_bulk_bak 설명을 실행합니다.그러면 변경이 롤백됩니다.mongodb는 롤백이 없기 때문에 이 해킹을 생각해 냈습니다.

    execute_secute_insert_remove(소스_secute_bak, 타겟_secute_bak)

  7. 마지막으로 소스 및 타깃 벌크 및 벌크_bak 개체를 다시 초기화합니다.이것은 한 번만 사용할 수 있기 때문에 필요합니다.

  8. 완전한 코드

        def execute_bulk_insert_remove(source_bulk, target_bulk):
            try:
                target_bulk.execute()
                source_bulk.execute()
            except BulkWriteError as bwe:
                raise Exception(
                    "could not archive document, reason:    {}".format(bwe.details))
    
        def archive_bulk_immediate(filter, source_db, source_col, target_db, target_col):
            """
            filter: filter criteria for backup
            source_db: source database name
            source_col: source collection name
            target_db: target database name
            target_col: target collection name
            """
            count = 0
            bulk_count = 1000
    
            source_db_obj = db_help.create_db_obj(source_db, source_col)
            source_bulk = source_db_obj.initialize_ordered_bulk_op()
            source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
    
            target_db_obj = db_help.create_db_obj(target_db, target_col)
            target_bulk = target_db_obj.initialize_ordered_bulk_op()
            target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()
    
            source_find_results = source_db_obj.find(filter)
    
            start = datetime.now()
    
            for doc in source_find_results:
                doc['archived_at'] = db_help.getUTCTime()
    
                target_bulk.insert(doc)
                source_bulk.find({'_id': doc['_id']}).remove_one()
                target_bulk_bak.find({'_id': doc['_id']}).remove_one()
                doc.pop('archieved_at', None)
                source_bulk_bak.insert(doc)
    
                count += 1
    
                if count % 1000 == 0:
                    logger.info("count: {}".format(count))
                    try:
                        execute_bulk_insert_remove(source_bulk, target_bulk)
                    except BulkWriteError as bwe:
                        execute_bulk_insert_remove(source_bulk_bak, target_bulk_bak)
                        logger.info("Bulk Write Error: {}".format(bwe.details))
                        raise
    
                    source_bulk = source_db_obj.initialize_ordered_bulk_op()
                    source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
    
                    target_bulk = target_db_obj.initialize_ordered_bulk_op()
                    target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()
    
            end = datetime.now()
    
            logger.info("archived {} documents to {} in ms.".format(
                count, target_col, (end - start)))
    

언급URL : https://stackoverflow.com/questions/27039083/mongodb-move-documents-from-one-collection-to-another-collection

반응형