apache_beam.io.concat_source module

For internal use only; no backwards-compatibility guarantees.

Concat Source, which reads the union of several other sources.

class apache_beam.io.concat_source.ConcatSource(sources)[source]

Bases: apache_beam.io.iobase.BoundedSource

For internal use only; no backwards-compatibility guarantees.

A BoundedSource that can group a set of BoundedSources.

Primarily for internal use, use the apache_beam.Flatten transform to create the union of several reads.

sources
estimate_size()[source]
split(desired_bundle_size=None, start_position=None, stop_position=None)[source]
get_range_tracker(start_position=None, stop_position=None)[source]
read(range_tracker)[source]
default_output_coder()[source]
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

Returns:A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:
{
  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('apache.org', url='http://apache.org'),
  'key5': subComponent
}
Return type:Dict[str, Any]
classmethod from_runner_api(fn_proto, context)

Converts from an FunctionSpec to a Fn object.

Prefer registering a urn with its parameter type and constructor.

is_bounded()
classmethod register_pickle_urn(pickle_urn)

Registers and implements the given urn via pickling.

classmethod register_urn(urn, parameter_type, fn=None)

Registers a urn with a constructor.

For example, if ‘beam:fn:foo’ had parameter type FooPayload, one could write RunnerApiFn.register_urn(‘bean:fn:foo’, FooPayload, foo_from_proto) where foo_from_proto took as arguments a FooPayload and a PipelineContext. This function can also be used as a decorator rather than passing the callable in as the final parameter.

A corresponding to_runner_api_parameter method would be expected that returns the tuple (‘beam:fn:foo’, FooPayload)

to_runner_api(context)

Returns an FunctionSpec encoding this Fn.

Prefer overriding self.to_runner_api_parameter.

to_runner_api_parameter(context)
class apache_beam.io.concat_source.ConcatRangeTracker(start, end, source_bundles)[source]

Bases: apache_beam.io.iobase.RangeTracker

For internal use only; no backwards-compatibility guarantees.

Range tracker for ConcatSource

Initializes ConcatRangeTracker

Parameters:
  • start – start position, a tuple of (source_index, source_position)
  • end – end position, a tuple of (source_index, source_position)
  • source_bundles – the list of source bundles in the ConcatSource
start_position()[source]
stop_position()[source]
try_claim(pos)[source]
try_split(pos)[source]
set_current_position(pos)[source]
position_at_fraction(fraction)[source]
fraction_consumed()[source]
local_to_global(source_ix, source_frac)[source]
global_to_local(frac)[source]
sub_range_tracker(source_ix)[source]
SPLIT_POINTS_UNKNOWN = <object object>
set_split_points_unclaimed_callback(callback)

Sets a callback for determining the unclaimed number of split points.

By invoking this function, a BoundedSource can set a callback function that may get invoked by the RangeTracker to determine the number of unclaimed split points. A split point is unclaimed if RangeTracker.try_claim() method has not been successfully invoked for that particular split point. The callback function accepts a single parameter, a stop position for the BoundedSource (stop_position). If the record currently being consumed by the BoundedSource is at position current_position, callback should return the number of split points within the range (current_position, stop_position). Note that, this should not include the split point that is currently being consumed by the source.

This function must be implemented by subclasses before being used.

Parameters:callback – a function that takes a single parameter, a stop position, and returns unclaimed number of split points for the source read operation that is calling this function. Value returned from callback should be either an integer larger than or equal to zero or RangeTracker.SPLIT_POINTS_UNKNOWN.
split_points()

Gives the number of split points consumed and remaining.

For a RangeTracker used by a BoundedSource (within a BoundedSource.read() invocation) this method produces a 2-tuple that gives the number of split points consumed by the BoundedSource and the number of split points remaining within the range of the RangeTracker that has not been consumed by the BoundedSource.

More specifically, given that the position of the current record being read by BoundedSource is current_position this method produces a tuple that consists of (1) number of split points in the range [self.start_position(), current_position) without including the split point that is currently being consumed. This represents the total amount of parallelism in the consumed part of the source. (2) number of split points within the range [current_position, self.stop_position()) including the split point that is currently being consumed. This represents the total amount of parallelism in the unconsumed part of the source.

Methods of the class RangeTracker including this method may get invoked by different threads, hence must be made thread-safe, e.g. by using a single lock object.

** General information about consumed and remaining number of split
points returned by this method. **
  • Before a source read (BoundedSource.read() invocation) claims the first split point, number of consumed split points is 0. This condition holds independent of whether the input is “splittable”. A splittable source is a source that has more than one split point.
  • Any source read that has only claimed one split point has 0 consumed split points since the first split point is the current split point and is still being processed. This condition holds independent of whether the input is splittable.
  • For an empty source read which never invokes RangeTracker.try_claim(), the consumed number of split points is 0. This condition holds independent of whether the input is splittable.
  • For a source read which has invoked RangeTracker.try_claim() n times, the consumed number of split points is n -1.
  • If a BoundedSource sets a callback through function set_split_points_unclaimed_callback(), RangeTracker can use that callback when determining remaining number of split points.
  • Remaining split points should include the split point that is currently being consumed by the source read. Hence if the above callback returns an integer value n, remaining number of split points should be (n + 1).
  • After last split point is claimed remaining split points becomes 1, because this unfinished read itself represents an unfinished split point.
  • After all records of the source has been consumed, remaining number of split points becomes 0 and consumed number of split points becomes equal to the total number of split points within the range being read by the source. This method does not address this condition and will continue to report number of consumed split points as (“total number of split points” - 1) and number of remaining split points as 1. A runner that performs the reading of the source can detect when all records have been consumed and adjust remaining and consumed number of split points accordingly.

** Examples **

  1. A “perfectly splittable” input which can be read in parallel down to the individual records.

    Consider a perfectly splittable input that consists of 50 split points.

  • Before a source read (BoundedSource.read() invocation) claims the first split point, number of consumed split points is 0 number of remaining split points is 50.
  • After claiming first split point, consumed number of split points is 0 and remaining number of split is 50.
  • After claiming split point #30, consumed number of split points is 29 and remaining number of split points is 21.
  • After claiming all 50 split points, consumed number of split points is 49 and remaining number of split points is 1.
  1. a “block-compressed” file format such as avroio, in which a block of records has to be read as a whole, but different blocks can be read in parallel.

    Consider a block compressed input that consists of 5 blocks.

  • Before a source read (BoundedSource.read() invocation) claims the first split point (first block), number of consumed split points is 0 number of remaining split points is 5.
  • After claiming first split point, consumed number of split points is 0 and remaining number of split is 5.
  • After claiming split point #3, consumed number of split points is 2 and remaining number of split points is 3.
  • After claiming all 5 split points, consumed number of split points is 4 and remaining number of split points is 1.
  1. an “unsplittable” input such as a cursor in a database or a gzip compressed file.

    Such an input is considered to have only a single split point. Number of consumed split points is always 0 and number of remaining split points is always 1.

By default RangeTracker` returns ``RangeTracker.SPLIT_POINTS_UNKNOWN for both consumed and remaining number of split points, which indicates that the number of split points consumed and remaining is unknown.

Returns:A pair that gives consumed and remaining number of split points. Consumed number of split points should be an integer larger than or equal to zero or RangeTracker.SPLIT_POINTS_UNKNOWN. Remaining number of split points should be an integer larger than zero or RangeTracker.SPLIT_POINTS_UNKNOWN.