apache_beam.runners.interactive.caching.streaming_cache module

class apache_beam.runners.interactive.caching.streaming_cache.StreamingCacheSink(cache_dir, filename, sample_resolution_sec, coder=SafeFastPrimitivesCoder)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A PTransform that writes TestStreamFile(Header|Records)s to file.

This transform takes in an arbitrary element stream and writes the list of TestStream events (as TestStreamFileRecords) to file. When replayed, this will produce the best-effort replay of the original job (e.g. some elements may be produced slightly out of order from the original stream).

Note that this PTransform is assumed to be only run on a single machine where the following assumptions are correct: elements come in ordered, no two transforms are writing to the same file. This PTransform is assumed to only run correctly with the DirectRunner.

TODO(BEAM-9447): Generalize this to more source/sink types aside from file based. Also, generalize to cases where there might be multiple workers writing to the same sink.

path

Returns the path the sink leads to.

size_in_bytes

Returns the space usage in bytes of the sink.

expand(pcoll)[source]
annotations() → Dict[str, Union[bytes, str, google.protobuf.message.Message]]
default_label()
default_type_hints()
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:
{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}
Return type:Dict[str, Any]
classmethod from_runner_api(proto, context)
get_type_hints()

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

get_windowing(inputs)

Returns the window function to be associated with transform’s output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

infer_output_type(unused_input_type)
label
pipeline = None
classmethod register_urn(urn, parameter_type, constructor=None)
runner_api_requires_keyed_input()
side_inputs = ()
to_runner_api(context, has_parts=False, **extra_kwargs)
to_runner_api_parameter(unused_context)
to_runner_api_pickled(unused_context)
type_check_inputs(pvalueish)
type_check_inputs_or_outputs(pvalueish, input_or_output)
type_check_outputs(pvalueish)
with_input_types(input_type_hint)

Annotates the input type of a PTransform with a type-hint.

Parameters:input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint.
Raises:TypeError – If input_type_hint is not a valid type-hint. See apache_beam.typehints.typehints.validate_composite_type_param() for further details.
Returns:A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.
Return type:PTransform
with_output_types(type_hint)

Annotates the output type of a PTransform with a type-hint.

Parameters:type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint.
Raises:TypeError – If type_hint is not a valid type-hint. See validate_composite_type_param() for further details.
Returns:A reference to the instance of this particular PTransform object. This allows chaining type-hinting related methods.
Return type:PTransform
class apache_beam.runners.interactive.caching.streaming_cache.StreamingCacheSource(cache_dir, labels, is_cache_complete=None, coder=None)[source]

Bases: object

A class that reads and parses TestStreamFile(Header|Reader)s.

This source operates in the following way:

  1. Wait for up to timeout_secs for the file to be available.
  2. Read, parse, and emit the entire contents of the file
  3. Wait for more events to come or until is_cache_complete returns True
  4. If there are more events, then go to 2
  5. Otherwise, stop emitting.

This class is used to read from file and send its to the TestStream via the StreamingCacheManager.Reader.

read(tail)[source]

Reads all TestStreamFile(Header|TestStreamFileRecord)s from file.

This returns a generator to be able to read all lines from the given file. If tail is True, then it will wait until the cache is complete to exit. Otherwise, it will read the file only once.

class apache_beam.runners.interactive.caching.streaming_cache.StreamingCache(cache_dir, is_cache_complete=None, sample_resolution_sec=0.1)[source]

Bases: apache_beam.runners.interactive.cache_manager.CacheManager

Abstraction that holds the logic for reading and writing to cache.

size(*labels)[source]
capture_size
capture_paths
capture_keys
exists(*labels)[source]
read(*labels, **args)[source]

Returns a generator to read all records from file.

read_multiple(labels, tail=True)[source]

Returns a generator to read all records from file.

Does tail until the cache is complete. This is because it is used in the TestStreamServiceController to read from file which is only used during pipeline runtime which needs to block.

write(values, *labels)[source]

Writes the given values to cache.

clear(*labels)[source]
source(*labels)[source]

Returns the StreamingCacheManager source.

This is beam.Impulse() because unbounded sources will be marked with this and then the PipelineInstrument will replace these with a TestStream.

sink(labels, is_capture=False)[source]

Returns a StreamingCacheSink to write elements to file.

Note that this is assumed to only work in the DirectRunner as the underlying StreamingCacheSink assumes a single machine to have correct element ordering.

save_pcoder(pcoder, *labels)[source]
load_pcoder(*labels)[source]
cleanup()[source]
class Reader(headers, readers)[source]

Bases: object

Abstraction that reads from PCollection readers.

This class is an Abstraction layer over multiple PCollection readers to be used for supplying a TestStream service with events.

This class is also responsible for holding the state of the clock, injecting clock advancement events, and watermark advancement events.

read()[source]

Reads records from PCollection readers.

is_latest_version(version, *labels)

Returns if the given version number is the latest.