embulk の resume-state と config-diff の検証結果

embulk v0.9.7 の --resume-state と --config-diff の検証結果です。
--resume-state と --config-diff が使えるバージョンであれば、0.9.7 以外のバージョンでも同様の結果になると思います。
docker image は https://hub.docker.com/r/tkuchiki/embulk/ を使いました。

resume-state

読み込む CSV を用意します。失敗したものだけ読み込み直せるか確認したいので、一つだけ invalid な CSV にします。

$ mkdir in out
$ echo "1,2" > in/1_valid.csv
$ echo "3 4" > in/2_invalid.csv
$ echo "5,6" > in/3_valid.csv

以下のような設定ファイルを用意します。

$ cat config.yml
in:
  path_prefix: /opt/embulk/in/
  type: file
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 0
    allow_extra_columns: false
    allow_optional_columns: false
    stop_on_invalid_record: true
    columns:
    - {name: c0, type: long}
    - {name: c1, type: long}
out:
  type: stdout

準備ができたので embulk を実行します。

$ docker run --rm -v $(pwd)/config.yml:/tmp/config.yml -v $(pwd)/in:/opt/embulk/in -v $(pwd)/out:/opt/embulk/out -t tkuchiki/embulk run /tmp/config.yml --resume-state /opt/embulk/out/resume-state.yml
2018-08-09 09:14:50.084 +0000: Embulk v0.9.7

Gem plugin path is: /root/.embulk/lib/gems

Fetching: embulk-parser-jsonl-0.2.0.gem (100%)
Successfully installed embulk-parser-jsonl-0.2.0
1 gem installed
2018-08-09 09:15:00.100 +0000: Embulk v0.9.7
2018-08-09 09:15:01.328 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-08-09 09:15:04.917 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2018-08-09 09:15:06.031 +0000 [INFO] (main): Started Embulk v0.9.7
2018-08-09 09:15:06.097 +0000 [INFO] (0001:transaction): Listing local files at directory '/opt/embulk/in' filtering filename by prefix ''
2018-08-09 09:15:06.098 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2018-08-09 09:15:06.115 +0000 [INFO] (0001:transaction): Loading files [/opt/embulk/in/1_valid.csv, /opt/embulk/in/2_invalid.csv, /opt/embulk/in/3_valid.csv]
2018-08-09 09:15:06.172 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 6 = input tasks 3 * 2
2018-08-09 09:15:06.185 +0000 [INFO] (0001:transaction): {done:  0 / 3, running: 0}
1,2
5,6
2018-08-09 09:15:06.334 +0000 [INFO] (0001:transaction): {done:  3 / 3, running: 0}
2018-08-09 09:15:06.335 +0000 [INFO] (0001:transaction): {done:  3 / 3, running: 0}
2018-08-09 09:15:06.335 +0000 [INFO] (0001:transaction): {done:  3 / 3, running: 0}
2018-08-09 09:15:06.339 +0000 [INFO] (main): Writing resume state to '/opt/embulk/out/resume-state.yml'
2018-08-09 09:15:06.393 +0000 [INFO] (main): Resume state is written. Run the transaction again with -r option to resume or use "cleanup" subcommand to delete intermediate data.
java.lang.RuntimeException: org.embulk.spi.DataException: Invalid record at line 1: 3 4
        at org.embulk.EmbulkRunner.runInternal(EmbulkRunner.java:317)
        at org.embulk.EmbulkRunner.run(EmbulkRunner.java:156)
        at org.embulk.cli.EmbulkRun.runSubcommand(EmbulkRun.java:436)
        at org.embulk.cli.EmbulkRun.run(EmbulkRun.java:91)
        at org.embulk.cli.Main.main(Main.java:26)
Caused by: org.embulk.spi.DataException: Invalid record at line 1: 3 4
        at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:363)
        at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:140)
        at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.runInputTask(LocalExecutorPlugin.java:271)
        at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.access$000(LocalExecutorPlugin.java:196)
        at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:235)
        at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:232)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.embulk.standards.CsvParserPlugin$CsvRecordValidateException: java.lang.NumberFormatException: For input string: "3 4"
        at org.embulk.standards.CsvParserPlugin$1.longColumn(CsvParserPlugin.java:280)
        at org.embulk.spi.Column.visit(Column.java:48)
        at org.embulk.spi.Schema.visitColumns(Schema.java:68)
        at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:261)
        ... 9 more
Caused by: java.lang.NumberFormatException: For input string: "3 4"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Long.parseLong(Long.java:589)
        at java.lang.Long.parseLong(Long.java:631)
        at org.embulk.standards.CsvParserPlugin$1.longColumn(CsvParserPlugin.java:277)
        ... 12 more

Error: org.embulk.spi.DataException: Invalid record at line 1: 3 4

in/2_invalid.csv が文字通り invalid な CSV なので読み込みに失敗しました(ほかの2つは成功しています)。
失敗したので以下のような resume-state ファイルが出力されています。

$ cat out/resume-state.yml
exec_task: {transaction_time: '2018-08-09 09:15:06.033 UTC'}
in_task:
  DecoderTaskSources: []
  DecoderConfigs: []
  ParserTaskSource:
    SchemaConfig:
    - {name: c0, type: long}
    - {name: c1, type: long}
    Delimiter: ','
    QuotesInQuotedFields: ACCEPT_ONLY_RFC4180_ESCAPED
    Charset: UTF-8
    SkipHeaderLines: 0
    Newline: LF
    DefaultTimeZoneId: UTC
    HeaderLine: null
    StopOnInvalidRecord: true
    MaxQuotedSizeLimit: 131072
    DefaultTimestampFormat: '%Y-%m-%d %H:%M:%S.%N %z'
    CommentLineMarker: null
    TrimIfNotQuoted: false
    QuoteChar: '"'
    AllowOptionalColumns: false
    NullString: null
    DefaultDate: '1970-01-01'
    AllowExtraColumns: false
    EscapeChar: '"'
  FileInputTaskSource:
    LastPath: null
    FollowSymlinks: false
    PathPrefix: /opt/embulk/in/
    Files: [/opt/embulk/in/1_valid.csv, /opt/embulk/in/2_invalid.csv, /opt/embulk/in/3_valid.csv]
  ParserConfig:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 0
    allow_extra_columns: false
    allow_optional_columns: false
    stop_on_invalid_record: true
    columns:
    - {name: c0, type: long}
    - {name: c1, type: long}
out_task: {PrintsColumnNames: false, TimeZoneId: UTC}
in_schema:
- {index: 0, name: c0, type: long}
- {index: 1, name: c1, type: long}
out_schema:
- {index: 0, name: c0, type: long}
- {index: 1, name: c1, type: long}
in_reports:
- {}
- null
- {}
out_reports:
- {}
- {}
- null
- null
- {}
- {}

Invalid な CSV を valid な CSV に書き換えて読み込み直します。

$ echo "3,4" > in/2_invalid.csv
$ docker run --rm -v $(pwd)/config.yml:/tmp/config.yml -v $(pwd)/in:/opt/embulk/in -v $(pwd)/out:/opt/embulk/out -t tkuchiki/embulk run /tmp/config.yml --resume-state /opt/embulk/out/resume-state.yml

2018-08-09 09:16:15.729 +0000: Embulk v0.9.7
2018-08-09 09:16:17.284 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-08-09 09:16:21.046 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2018-08-09 09:16:22.230 +0000 [INFO] (main): Started Embulk v0.9.7
2018-08-09 09:16:22.345 +0000 [INFO] (0001:resume): Using local thread executor with max_threads=8 / output tasks 6 = input tasks 3 * 2
2018-08-09 09:16:22.351 +0000 [WARN] (0001:resume): Skipped resumed input task 0
2018-08-09 09:16:22.352 +0000 [WARN] (0001:resume): Skipped resumed input task 2
2018-08-09 09:16:22.352 +0000 [INFO] (0001:resume): {done:  2 / 3, running: 0}
3,4
2018-08-09 09:16:22.438 +0000 [INFO] (0001:resume): {done:  2 / 3, running: 1}
2018-08-09 09:16:22.458 +0000 [INFO] (main): Committed.
2018-08-09 09:16:22.459 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/opt/embulk/in/3_valid.csv"},"out":{}}

成功しました。先程成功したファイルは表示されていないことがわかります。
成功すると resume-state ファイルは削除されます。

config-diff

config.yml は resume-state と同じものを使います。また、 in / out ディレクトリ以下のファイルはすべて削除します。
準備ができたら CSV ファイルを一つ作って embulk を実行します。

$ echo "1,2" > in/1.csv

$ docker run --rm -v $(pwd)/config.yml:/tmp/config.yml -v $(pwd)/in:/opt/embulk/in -v $(pwd)/out:/opt/embulk/out -t tkuchiki/embulk run /tmp/config.yml -c /opt/embulk/out/diff.yml
2018-08-09 09:33:14.704 +0000: Embulk v0.9.7
2018-08-09 09:33:16.178 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-08-09 09:33:20.000 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2018-08-09 09:33:21.471 +0000 [INFO] (main): Started Embulk v0.9.7
2018-08-09 09:33:21.567 +0000 [INFO] (0001:transaction): Listing local files at directory '/opt/embulk/in' filtering filename by prefix ''
2018-08-09 09:33:21.571 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2018-08-09 09:33:21.591 +0000 [INFO] (0001:transaction): Loading files [/opt/embulk/in/1.csv]
2018-08-09 09:33:21.647 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2018-08-09 09:33:21.668 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
1,2
2018-08-09 09:33:21.790 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2018-08-09 09:33:21.803 +0000 [INFO] (main): Committed.
2018-08-09 09:33:21.803 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/opt/embulk/in/1.csv"},"out":{}}

diff ファイルを確認すると、読み込んだファイルが記録されています。

$ cat out/diff.yml
in: {last_path: /opt/embulk/in/1.csv}
out: {}

CSV ファイルを一つ追加して embulk を実行します。

```console
$ echo "3,4" > in/2.csv

$ docker run --rm -v $(pwd)/config.yml:/tmp/config.yml -v $(pwd)/in:/opt/embulk/in -v $(pwd)/out:/opt/embulk/out -t tkuchiki/embulk run /tmp/config.yml -c /opt/embulk/out/diff.yml
2018-08-09 09:35:33.941 +0000: Embulk v0.9.7
2018-08-09 09:35:35.573 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-08-09 09:35:39.302 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2018-08-09 09:35:40.560 +0000 [INFO] (main): Started Embulk v0.9.7
2018-08-09 09:35:40.625 +0000 [INFO] (0001:transaction): Listing local files at directory '/opt/embulk/in' filtering filename by prefix ''
2018-08-09 09:35:40.627 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2018-08-09 09:35:40.644 +0000 [INFO] (0001:transaction): Loading files [/opt/embulk/in/2.csv]
2018-08-09 09:35:40.694 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2018-08-09 09:35:40.704 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
3,4
2018-08-09 09:35:40.827 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2018-08-09 09:35:40.833 +0000 [INFO] (main): Committed.
2018-08-09 09:35:40.833 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/opt/embulk/in/2.csv"},"out":{}}

1回目の実行時に読み込んだ in/1.csv は処理されず、新しく追加された in/2.csv だけ処理されていることがわかります。
diff ファイルが更新されています。

$ cat out/diff.yml
in: {last_path: /opt/embulk/in/2.csv}
out: {}

結果は省略しますが、この状態で再度 embulk を実行すると何も処理されずに終了します。
resume-state と config-diff は plugin が対応していないと使用できないので注意してください。
embulk-output-bigquery は resume-state に非対応、config-diff は diff を出力しますが、ファイルを追加しても何も実行されませんでした。

comment

Comments

arrow_back

Previous

goa で HTTP リクエストボディサイズの上限値を変更する

Next

実行中のプロセスの max open files の値を変更する
arrow_forward