1. Einführung in die Pipeline-Orchestrierung mit TFX
2. Aufbau einer TFX-Pipeline
import os
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.components import CsvExampleGen, StatisticsGen, SchemaGen, ExampleValidator, Transform, Trainer, Evaluator, Pusher
from tfx.proto import trainer_pb2, pusher_pb2, evaluator_pb2
from tfx.orchestration.pipeline import Pipeline
from tfx.orchestration.local.local_dag_runner import LocalDagRunner
# Setzen der Pipeline-Parameter
pipeline_name = 'my_pipeline'
pipeline_root = os.path.join('pipelines', pipeline_name)
data_root = os.path.join('data')
module_file = os.path.join('modules', 'module_file.py')
serving_model_dir = os.path.join(pipeline_root, 'serving_model')
# Definition der Pipeline-Komponenten
example_gen = CsvExampleGen(input_base=data_root)
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])
example_validator = ExampleValidator(statistics=statistics_gen.outputs['statistics'], schema=schema_gen.outputs['schema'])
transform = Transform(examples=example_gen.outputs['examples'], schema=schema_gen.outputs['schema'], preprocessing_fn='module_file.preprocessing_fn')
trainer = Trainer(
module_file=module_file,
examples=transform.outputs['transformed_examples'],
schema=schema_gen.outputs['schema'],
transform_graph=transform.outputs['transform_graph'],
train_args=trainer_pb2.TrainArgs(num_steps=10000),
eval_args=trainer_pb2.EvalArgs(num_steps=5000)
)
eval_config = evaluator_pb2.EvalConfig(
model_specs=[evaluator_pb2.ModelSpec(signature_name='eval')],
slicing_specs=[evaluator_pb2.SlicingSpec()],
metrics_specs=[evaluator_pb2.MetricsSpec(metrics=[
evaluator_pb2.MetricConfig(class_name='AUC')])]
)
evaluator = Evaluator(examples=example_gen.outputs['examples'], model_exports=trainer.outputs['model'], eval_config=eval_config)
pusher = Pusher(model=trainer.outputs['model'], push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(base_directory=serving_model_dir)))
# Aufbau der Pipeline
pipeline = Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=[example_gen, statistics_gen, schema_gen, example_validator, transform, trainer, evaluator, pusher],
enable_cache=True,
metadata_connection_config=None,
)
# Ausführen der Pipeline
LocalDagRunner().run(pipeline)
2. Datenvalidierung
from tfx.components import StatisticsGen, SchemaGen, ExampleValidator
# Annahme: example_gen ist bereits definiert
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])
example_validator = ExampleValidator(statistics=statistics_gen.outputs['statistics'], schema=schema_gen.outputs['schema'])
context.run(statistics_gen)
context.run(schema_gen)
context.run(example_validator)
3. Modelltraining und -validierung
from tfx.components import Trainer, Evaluator
from tfx.proto import trainer_pb2, evaluator_pb2
# Definition des Trainers
trainer = Trainer(
module_file='path/to/training_module.py',
examples=transform.outputs['transformed_examples'],
schema=schema_gen.outputs['schema'],
transform_graph=transform.outputs['transform_graph'],
train_args=trainer_pb2.TrainArgs(num_steps=10000),
eval_args=trainer_pb2.EvalArgs(num_steps=5000)
)
# Definition des Evaluators
eval_config = evaluator_pb2.EvalConfig(
model_specs=[evaluator_pb2.ModelSpec(signature_name='eval')],
slicing_specs=[evaluator_pb2.SlicingSpec()],
metrics_specs=[evaluator_pb2.MetricsSpec(metrics=[
evaluator_pb2.MetricConfig(class_name='AUC')])]
)
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model_exports=trainer.outputs['model'],
eval_config=eval_config
)
context.run(trainer)
context.run(evaluator)
4. Modellbereitstellung (Model Serving)
from tfx.components import Pusher
from tfx.proto import pusher_pb2
# Definition des Pushers
pusher = Pusher(
model=trainer.outputs['model'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory='path/to/serving/model'
)
)
)
context.run(pusher)
In diesem Beispiel speichert Pusher
das trainierte Modell in einem Verzeichnis, von dem aus es in einer Produktionsumgebung bereitgestellt werden kann.