TFX Keras Component Tutorial에 대하여

안녕하십니까. 이번 포스팅에서는 Tensorflow TFX의 예제인 케라스 컴포넌트 튜토리얼에 대하여 다룹니다. 코드는 제가 짠것은 아니고 텐서플로우 공식 코드를 가져온것이며 위의 링크에서 확인하실수 있습니다.

TF Dev Summit 20

텐서플로우 Dev Summit에서 해당 튜토리얼에 대하여 간략하게 소개한적이 있습니다. 영어 동영상에 익숙하시면 해당 동영상을 보시는게 편하실듯 합니다.


예제 데이터 가져오기

해당 예제에서는 시카고 택시 공용 데이터를 사용 합니다. 파이썬 코드에서 이를 사용하기 위해서 데이터를 불러 옵니다.
_data_root = tempfile.mkdtemp(prefix='tfx-data')
DATA_PATH = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv'
_data_filepath = os.path.join(_data_root, "data.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)

시카고 택시 데이터의 컬럼은 아래와 같습니다.

pickup_community_areafaretrip_start_month
trip_start_hourtrip_start_daytrip_start_timestamp
pickup_latitudepickup_longitudedropoff_latitude
dropoff_longitudetrip_milespickup_census_tract
dropoff_census_tractpayment_typecompany
trip_secondsdropoff_community_areatips
해당 데이터를 통해서 마지막 데이터인 tips 데이터를 예측 하는것이 튜토리얼의 목표 입니다.
이는 20% 이상의 팁을 받을수 있는지 없는지에 대한 0과 1의 라벨 입니다.

Interactive 모드

일반 파이프라인이 아닌 notebook을 통하여 TFX를 사용하기 위해서 인터랙티브 모드를 사용 합니다.
context = InteractiveContext()

Example Gen

다운받은 csv 파일을 파이프라인으로 연결하기 위해서 example gen 함수를 호출 합니다.
example_gen = CsvExampleGen(input=external_input(_data_root))
context.run(example_gen)

StatisticsGen

StatisticsGen을 통하여 ExampleGen과 연결된 데이터의 여러 통계를 확인 할 수 있습니다. 데이터 통계 확인을 위해서 시각화 기능을 제공 합니다.
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)
%%skip_for_export
context.show(statistics_gen.outputs['statistics'])
각 훈련, 평가 분할 데이터에 대하여 아래와 같은 시각화 툴을 제공 합니다.

Numeric Features

정수 데이터에 대하여 비어있는 데이터, 평균값, 0의 비율 등을 알려 줍니다.

Categorical Features

카테고리와 같은 정수가 아닌 데이터에 대하여 빈값, 평균값 등에 대하여 알려 줍니다.



SchemaGen

위의 StatisticsGen의 결과를 바탕으로 SchemaGen에서 Schema를 생성 합니다.
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False)
context.run(schema_gen)
context.show(schema_gen.outputs['schema'])

Schema

아래와 같이 각 필드에 대하여 데이터 타입을 지정해 줍니다. 문자열 데이터는 따로 도메인 테이블을 두어서 관리 합니다.

도메인

문자열 데이터는 문자열의 값들을 따로 테이블로 관리 합니다.

ExampleValidator

위의 두 결과를 통하여 example validator에서 데이터의 anomaly를 감지 합니다.
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)
context.show(example_validator.outputs['anomalies'])

아래와 같이 각 컬럼에 대하여 Statistics 데이터에 기반하여 예상치 않는 데이터들을 알려 줍니다.
Anomaly short descriptionAnomaly long description
Feature name
'company'Unexpected string valuesExamples contain values missing from the schema: 3094 - 24059 G.L.B. Cab Co (<1%), 3319 - CD Cab Co (<1%), 4053 - 40193 Adwar H. Nikola (<1%), 4197 - Royal Star (<1%), 5006 - Salifu Bawa (<1%), 5724 - KYVI Cab Inc (<1%), 585 - 88805 Valley Cab Co (<1%), 6743 - Luhak Corp (<1%).
'payment_type'Unexpected string valuesExamples contain values missing from the schema: Prcard (<1%).
'pickup_census_tract'Column droppedThe feature was not present in any e
각 훈련, 평가용 데이터로 분할 되었으므로 훈련에는 없으나 평가에는 있는 데이터가 있을 수 있습니다. 위 코드는 해당 데이터들을 알려주었습니다.

Transform

트랜스폼은 훈련, 서빙을 위해 피쳐 엔지니어링을 진행 합니다.

아래와 같이 constants들을 정의 합니다.

# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]

CATEGORICAL_FEATURE_KEYS = [
    'trip_start_hour', 'trip_start_day', 'trip_start_month',
    'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
    'dropoff_community_area'
]

DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']

# Number of buckets used by tf.transform for encoding each feature.
FEATURE_BUCKET_COUNT = 10

BUCKET_FEATURE_KEYS = [
    'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
    'dropoff_longitude'
]

# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
VOCAB_SIZE = 1000

# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
OOV_SIZE = 10

VOCAB_FEATURE_KEYS = [
    'payment_type',
    'company',
]

# Keys
LABEL_KEY = 'tips'
FARE_KEY = 'fare'

def transformed_name(key):
  return key + '_xf'


위의 상수 결과를 통해서 전처리 하는 코드를 다음과 같이 생성 합니다.

import tensorflow as tf
import tensorflow_transform as tft

import taxi_constants

_DENSE_FLOAT_FEATURE_KEYS = taxi_constants.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = taxi_constants.VOCAB_FEATURE_KEYS
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_BUCKET_FEATURE_KEYS = taxi_constants.BUCKET_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = taxi_constants.CATEGORICAL_FEATURE_KEYS
_FARE_KEY = taxi_constants.FARE_KEY
_LABEL_KEY = taxi_constants.LABEL_KEY
_transformed_name = taxi_constants.transformed_name


def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.
  Args:
    inputs: map from feature keys to raw not-yet-transformed features.
  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  for key in _DENSE_FLOAT_FEATURE_KEYS:
    # Preserve this feature as a dense float, setting nan's to the mean.
    outputs[_transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in _VOCAB_FEATURE_KEYS:
    # Build a vocabulary for this feature.
    outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[key]),
        top_k=_VOCAB_SIZE,
        num_oov_buckets=_OOV_SIZE)

  for key in _BUCKET_FEATURE_KEYS:
    outputs[_transformed_name(key)] = tft.bucketize(
        _fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT,
        always_return_num_quantiles=False)

  for key in _CATEGORICAL_FEATURE_KEYS:
    outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])

  # Was this passenger a big tipper?
  taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
  tips = _fill_in_missing(inputs[_LABEL_KEY])
  outputs[_transformed_name(_LABEL_KEY)] = tf.where(
      tf.math.is_nan(taxi_fare),
      tf.cast(tf.zeros_like(taxi_fare), tf.int64),
      # Test if the tip was > 20% of the fare.
      tf.cast(
          tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))

  return outputs


def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.
  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.
  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

위의 코드를 파일로 출력 한후에 exampleGen과 SchemaGen이랑 연결을 합니다.

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath(_taxi_transform_module_file))
context.run(transform)

transform의 결과로는 전처리 과정을 담은 transform_graph와 데이터를 표현한 transformed_examples가 리턴 됩니다.

Trainer

훈련을 위한 모델 컴포넌트 입니다. SchemaGen, Transform, 모델 코드를 필요로 합니다.
아래 코드에서는 keras의 wide and deep을 사용하고 있습니다.
from typing import List, Text

import os
import absl
import datetime
import tensorflow as tf
import tensorflow_transform as tft

from tfx.components.trainer.executor import TrainerFnArgs

import taxi_constants

_DENSE_FLOAT_FEATURE_KEYS = taxi_constants.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = taxi_constants.VOCAB_FEATURE_KEYS
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_BUCKET_FEATURE_KEYS = taxi_constants.BUCKET_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = taxi_constants.CATEGORICAL_FEATURE_KEYS
_MAX_CATEGORICAL_FEATURE_VALUES = taxi_constants.MAX_CATEGORICAL_FEATURE_VALUES
_LABEL_KEY = taxi_constants.LABEL_KEY
_transformed_name = taxi_constants.transformed_name


def _transformed_names(keys):
  return [_transformed_name(key) for key in keys]


def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')


def _get_serve_tf_examples_fn(model, tf_transform_output):
  """Returns a function that parses a serialized tf.Example and applies TFT."""

  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

    transformed_features = model.tft_layer(parsed_features)
    transformed_features.pop(_transformed_name(_LABEL_KEY))

    return model(transformed_features)

  return serve_tf_examples_fn


def _input_fn(file_pattern: Text,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for tuning/training.

  Args:
    file_pattern: input tfrecord file pattern.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())

  dataset = tf.data.experimental.make_batched_features_dataset(
      file_pattern=file_pattern,
      batch_size=batch_size,
      features=transformed_feature_spec,
      reader=_gzip_reader_fn,
      label_key=_transformed_name(_LABEL_KEY))

  return dataset


def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model:
  """Creates a DNN Keras model for classifying taxi data.

  Args:
    hidden_units: [int], the layer sizes of the DNN (input layer first).

  Returns:
    A keras Model.
  """
  real_valued_columns = [
      tf.feature_column.numeric_column(key, shape=())
      for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
  ]
  categorical_columns = [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
      for key in _transformed_names(_VOCAB_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0)
      for key in _transformed_names(_BUCKET_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(  # pylint: disable=g-complex-comprehension
          key,
          num_buckets=num_buckets,
          default_value=0) for key, num_buckets in zip(
              _transformed_names(_CATEGORICAL_FEATURE_KEYS),
              _MAX_CATEGORICAL_FEATURE_VALUES)
  ]
  indicator_column = [
      tf.feature_column.indicator_column(categorical_column)
      for categorical_column in categorical_columns
  ]

  model = _wide_and_deep_classifier(
      # TODO(b/139668410) replace with premade wide_and_deep keras model
      wide_columns=indicator_column,
      deep_columns=real_valued_columns,
      dnn_hidden_units=hidden_units or [100, 70, 50, 25])
  return model


def _wide_and_deep_classifier(wide_columns, deep_columns, dnn_hidden_units):
  """Build a simple keras wide and deep model.

  Args:
    wide_columns: Feature columns wrapped in indicator_column for wide (linear)
      part of the model.
    deep_columns: Feature columns for deep part of the model.
    dnn_hidden_units: [int], the layer sizes of the hidden DNN.

  Returns:
    A Wide and Deep Keras model
  """
  # Following values are hard coded for simplicity in this example,
  # However prefarably they should be passsed in as hparams.

  # Keras needs the feature definitions at compile time.
  # TODO(b/139081439): Automate generation of input layers from FeatureColumn.
  input_layers = {
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32)
      for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
  }
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_VOCAB_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_BUCKET_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_names(_CATEGORICAL_FEATURE_KEYS)
  })

  # TODO(b/144500510): SparseFeatures for feature columns + Keras.
  deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)
  for numnodes in dnn_hidden_units:
    deep = tf.keras.layers.Dense(numnodes)(deep)
  wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers)

  output = tf.keras.layers.Dense(
      1, activation='sigmoid')(
          tf.keras.layers.concatenate([deep, wide]))

  model = tf.keras.Model(input_layers, output)
  model.compile(
      loss='binary_crossentropy',
      optimizer=tf.keras.optimizers.Adam(lr=0.001),
      metrics=[tf.keras.metrics.BinaryAccuracy()])
  model.summary(print_fn=absl.logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  # Number of nodes in the first layer of the DNN
  first_dnn_layer_size = 100
  num_dnn_layers = 4
  dnn_decay_factor = 0.7

  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
  eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)

  model = _build_keras_model(
      # Construct layers sizes with exponetial decay
      hidden_units=[
          max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
          for i in range(num_dnn_layers)
      ])

  # This log path might change in the future.
  log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
  tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=log_dir, update_freq='batch')
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps,
      callbacks=[tensorboard_callback])

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

아래 코드를 통하여 Trainer를 실행 시키면 드디어 학습을 시작 합니다.

trainer = Trainer(
    module_file=os.path.abspath(_taxi_trainer_module_file),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=10000),
    eval_args=trainer_pb2.EvalArgs(num_steps=5000))
context.run(trainer)

정확도는 약 87% 정도 입니다.

Train for 10000 steps, validate for 5000 steps
    1/10000 [..............................] - ETA: 7:32:19 - loss: 0.6915 - binary_accuracy: 0.5750WARNING:tensorflow:Method (on_train_batch_end) is slow compared to the batch update (0.129259). Check your callbacks.
10000/10000 [==============================] - 187s 19ms/step - loss: 0.2451 - binary_accuracy: 0.8769 - val_loss: 0.2108 - val_binary_accuracy: 0.8963

학습 결과 시각화

학습 결과를 텐서 보드와 연결하여 시각화도 가능 합니다.
model_artifact_dir = trainer.outputs['model'].get()[0].uri
pp.pprint(os.listdir(model_artifact_dir))
model_dir = os.path.join(model_artifact_dir, 'serving_model_dir')
pp.pprint(os.listdir(model_dir))
%%skip_for_export

log_dir = os.path.join(model_artifact_dir, 'logs')

%load_ext tensorboard
%tensorboard --logdir {log_dir}
시각화 결과

Evaluator

TensorFlow Model Analysis를 사용하여 모델 성능을 평가 합니다. 이전 모델과 비교하여 더 나은지 평가가 가능 합니다. ExampleGen와 Trainer, config 등을 사용합니다. 
eval_config = tfma.EvalConfig(
    model_specs=[
        # This assumes a serving model with signature 'serving_default'. If
        # using estimator based EvalSavedModel, add signature_name: 'eval' and 
        # remove the label_key.
        tfma.ModelSpec(label_key='tips')
    ],
    metrics_specs=[
        tfma.MetricsSpec(
            # The metrics added here are in addition to those saved with the
            # model (assuming either a keras model or EvalSavedModel is used).
            # Any metrics added into the saved model (for example using
            # model.compile(..., metrics=[...]), etc) will be computed
            # automatically.
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount')
            ],
            # To add validation thresholds for metrics saved with the model,
            # add them keyed by metric name to the thresholds map.
            thresholds = {
                'binary_accuracy': tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.5}),
                    change_threshold=tfma.GenericChangeThreshold(
                       direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                       absolute={'value': -1e-10}))
            }
        )
    ],
    slicing_specs=[
        # An empty slice spec means the overall slice, i.e. the whole dataset.
        tfma.SlicingSpec(),
        # Data can be sliced along a feature column. In this case, data is
        # sliced along feature column trip_start_hour.
        tfma.SlicingSpec(feature_keys=['trip_start_hour'])
    ])
위의 설정을 아래와 같이 연결 합니다.
# Use TFMA to compute a evaluation statistics over features of a model and
# validate them against a baseline.

# The model resolver is only required if performing model validation in addition
# to evaluation. In this case we validate against the latest blessed model. If
# no model has been blessed before (as in this case) the evaluator will make our
# candidate the first blessed model.
model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
context.run(model_resolver)

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    # Change threshold will be ignored if there is no baseline (first run).
    eval_config=eval_config)
context.run(evaluator)

흠 문서상에는 Evaluator에서 시각화를 한다고 하지만 문서나 노트북에서는 시각화 결과를 확인할 수 없습니다.

Pusher

실제로 프러덕션 환경에 올리기 위한 코드 입니다.
해당 코드를 실행 하면 폴더에 파일이 생성 됩니다.
pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=_serving_model_dir)))
context.run(pusher)

Export

Colab 노트북으로 마지막 셀까지 실행 시키면 zip파일이 다운로드가 됩니다.

%%skip_for_export
#docs_infra: no_execute

#@markdown Run this cell to download the pipeline files as a `.zip`.

if 'google.colab' in sys.modules:
  from google.colab import files
  import zipfile
  print(zip_export_path)
  zip_export_path = os.path.join(
      tempfile.mkdtemp(), 'export.zip')
  with zipfile.ZipFile(zip_export_path, mode='w') as export_zip:
    export_zip.write(_pipeline_export_filepath)
    export_zip.write(_taxi_constants_module_file)
    export_zip.write(_taxi_transform_module_file)
    export_zip.write(_taxi_trainer_module_file)

  files.download(zip_export_path)

  • export_chicago_taxi_beam.py
  • taxi_contants.py
  • taxi_traniner.py
  • taxi_transform.py
위의 4개의 파일이 다운로드 되며 위에서 작성한 코드를 기반으로 생성이 됩니다.

댓글

이 블로그의 인기 게시물

포켓몬 고 17셀 확인 포고맵 사용 방법

고려대학교 야간대학원 중간 후기

HTTP 오류 500.19 - Internal Server Error 에러 처리법