如何使用 neuraxle 实现延迟数据加载的存储库?

Posted

技术标签:

【中文标题】如何使用 neuraxle 实现延迟数据加载的存储库?【英文标题】:How to implement a repository for lazy data loading with neuraxle? 【发布时间】:2021-02-05 04:18:39 【问题描述】:

neuraxle documentation 中有一个示例,使用存储库在管道中延迟加载数据,请参见以下代码:

from neuraxle.pipeline import Pipeline, MiniBatchSequentialPipeline
from neuraxle.base import ExecutionContext
from neuraxle.steps.column_transformer import ColumnTransformer
from neuraxle.steps.flow import TrainOnlyWrapper

training_data_ids = training_data_repository.get_all_ids()
context = ExecutionContext('caching_folder').set_service_locator(
    BaseRepository: training_data_repository
)

pipeline = Pipeline([
    ConvertIDsToLoadedData().assert_has_services(BaseRepository),
    ColumnTransformer([
        (range(0, 2), DateToCosineEncoder()),
        (3, CategoricalEnum(categeories_count=5, starts_at_zero=True)),
    ]),
    Normalizer(),
    TrainOnlyWrapper(DataShuffler()),
    MiniBatchSequentialPipeline([
        Model()
    ], batch_size=128)
]).with_context(context)

但是,没有显示如何实现BaseRepositoryConvertIDsToLoadedData 类。实现这些类的最佳方法是什么?谁能举个例子?

【问题讨论】:

【参考方案1】:

我没有检查以下是否编译,但它应该如下所示。如果您发现要更改的内容并尝试编译它,请有人编辑此答案:

class BaseDataRepository(ABC): 

    @abstractmethod
    def get_all_ids(self) -> List[int]: 
        pass

    @abstractmethod
    def get_data_from_id(self, _id: int) -> object: 
        pass

class InMemoryDataRepository(BaseDataRepository): 
    def __init__(self, ids, data): 
        self.ids: List[int] = ids
        self.data: Dict[int, object] = data

    def get_all_ids(self) -> List[int]: 
        return list(self.ids)

    def get_data_from_id(self, _id: int) -> object: 
        return self.data[_id]

class ConvertIDsToLoadedData(BaseStep): 
    def _transform_data_container(self, data_container: DataContainer, context: ExecutionContext): 
        repo: BaseDataRepository = context.get_service(BaseDataRepository)
        ids = data_container.data_inputs

        # Replace data ids by their loaded object counterpart: 
        data_container.data_inputs = [repo.get_data_from_id(_id) for _id in ids]

        return data_container, context

context = ExecutionContext('caching_folder').set_service_locator(
    BaseDataRepository: InMemoryDataRepository(ids, data)  # or insert here any other replacement class that inherits from `BaseDataRepository` when you'll change the database to a real one (e.g.: SQL) rather than a cheap "InMemory" stub. 
)

有关更新,请参阅我在此处针对此问题打开的问题:https://github.com/Neuraxio/Neuraxle/issues/421

【讨论】:

以上是关于如何使用 neuraxle 实现延迟数据加载的存储库?的主要内容,如果未能解决你的问题,请参考以下文章

如何正确实现过滤 data_inputs 的 Neuraxle 管道步骤?

如何最好地处理 Neuraxle 管道中的错误和/或丢失数据?

实现带有数据库、客户端和后端的列表的延迟加载

如何使用 mongodb 数据延迟加载角度

延迟加载的 DAL 和 BLL

数据表 延迟加载数据。如何动态传递“deferLoading”值