apache_beam.io.range_trackers module¶
iobase.RangeTracker implementations provided with Apache Beam.
-
class
apache_beam.io.range_trackers.
OffsetRangeTracker
(start, end)[source]¶ Bases:
apache_beam.io.iobase.RangeTracker
A ‘RangeTracker’ for non-negative positions of type ‘long’.
-
OFFSET_INFINITY
= inf¶
-
last_record_start
¶
-
last_attempted_record_start
¶ Return current value of last_attempted_record_start.
last_attempted_record_start records a valid position that tried to be claimed by calling try_claim(). This value is only updated by try_claim() no matter try_claim() returns True or False.
-
SPLIT_POINTS_UNKNOWN
= <object object>¶
-
-
class
apache_beam.io.range_trackers.
OrderedPositionRangeTracker
(start_position=None, stop_position=None)[source]¶ Bases:
apache_beam.io.iobase.RangeTracker
An abstract base class for range trackers whose positions are comparable.
Subclasses only need to implement the mapping from position ranges to and from the closed interval [0, 1].
-
UNSTARTED
= <object object>¶
-
fraction_to_position
(fraction, start, end)[source]¶ Converts a fraction between 0 and 1 to a position between start and end.
-
SPLIT_POINTS_UNKNOWN
= <object object>¶
-
set_current_position
(position)¶ Updates the last-consumed position to the given position.
A source may invoke this method for records that do not start at split points. This may modify the internal state of the
RangeTracker
. If the record starts at a split point, methodtry_claim()
must be invoked instead of this method.Parameters: position – starting position of a record being read by a source.
-
set_split_points_unclaimed_callback
(callback)¶ Sets a callback for determining the unclaimed number of split points.
By invoking this function, a
BoundedSource
can set a callback function that may get invoked by theRangeTracker
to determine the number of unclaimed split points. A split point is unclaimed ifRangeTracker.try_claim()
method has not been successfully invoked for that particular split point. The callback function accepts a single parameter, a stop position for the BoundedSource (stop_position). If the record currently being consumed by theBoundedSource
is at position current_position, callback should return the number of split points within the range (current_position, stop_position). Note that, this should not include the split point that is currently being consumed by the source.This function must be implemented by subclasses before being used.
Parameters: callback – a function that takes a single parameter, a stop position, and returns unclaimed number of split points for the source read operation that is calling this function. Value returned from callback should be either an integer larger than or equal to zero or RangeTracker.SPLIT_POINTS_UNKNOWN
.
-
split_points
()¶ Gives the number of split points consumed and remaining.
For a
RangeTracker
used by aBoundedSource
(within aBoundedSource.read()
invocation) this method produces a 2-tuple that gives the number of split points consumed by theBoundedSource
and the number of split points remaining within the range of theRangeTracker
that has not been consumed by theBoundedSource
.More specifically, given that the position of the current record being read by
BoundedSource
is current_position this method produces a tuple that consists of (1) number of split points in the range [self.start_position(), current_position) without including the split point that is currently being consumed. This represents the total amount of parallelism in the consumed part of the source. (2) number of split points within the range [current_position, self.stop_position()) including the split point that is currently being consumed. This represents the total amount of parallelism in the unconsumed part of the source.Methods of the class
RangeTracker
including this method may get invoked by different threads, hence must be made thread-safe, e.g. by using a single lock object.- ** General information about consumed and remaining number of split
- points returned by this method. **
- Before a source read (
BoundedSource.read()
invocation) claims the first split point, number of consumed split points is 0. This condition holds independent of whether the input is “splittable”. A splittable source is a source that has more than one split point. - Any source read that has only claimed one split point has 0 consumed split points since the first split point is the current split point and is still being processed. This condition holds independent of whether the input is splittable.
- For an empty source read which never invokes
RangeTracker.try_claim()
, the consumed number of split points is 0. This condition holds independent of whether the input is splittable. - For a source read which has invoked
RangeTracker.try_claim()
n times, the consumed number of split points is n -1. - If a
BoundedSource
sets a callback through functionset_split_points_unclaimed_callback()
,RangeTracker
can use that callback when determining remaining number of split points. - Remaining split points should include the split point that is currently being consumed by the source read. Hence if the above callback returns an integer value n, remaining number of split points should be (n + 1).
- After last split point is claimed remaining split points becomes 1, because this unfinished read itself represents an unfinished split point.
- After all records of the source has been consumed, remaining number of split points becomes 0 and consumed number of split points becomes equal to the total number of split points within the range being read by the source. This method does not address this condition and will continue to report number of consumed split points as (“total number of split points” - 1) and number of remaining split points as 1. A runner that performs the reading of the source can detect when all records have been consumed and adjust remaining and consumed number of split points accordingly.
- Before a source read (
** Examples **
A “perfectly splittable” input which can be read in parallel down to the individual records.
Consider a perfectly splittable input that consists of 50 split points.
- Before a source read (
BoundedSource.read()
invocation) claims the first split point, number of consumed split points is 0 number of remaining split points is 50. - After claiming first split point, consumed number of split points is 0 and remaining number of split is 50.
- After claiming split point #30, consumed number of split points is 29 and remaining number of split points is 21.
- After claiming all 50 split points, consumed number of split points is 49 and remaining number of split points is 1.
a “block-compressed” file format such as
avroio
, in which a block of records has to be read as a whole, but different blocks can be read in parallel.Consider a block compressed input that consists of 5 blocks.
- Before a source read (
BoundedSource.read()
invocation) claims the first split point (first block), number of consumed split points is 0 number of remaining split points is 5. - After claiming first split point, consumed number of split points is 0 and remaining number of split is 5.
- After claiming split point #3, consumed number of split points is 2 and remaining number of split points is 3.
- After claiming all 5 split points, consumed number of split points is 4 and remaining number of split points is 1.
an “unsplittable” input such as a cursor in a database or a gzip compressed file.
Such an input is considered to have only a single split point. Number of consumed split points is always 0 and number of remaining split points is always 1.
By default
RangeTracker` returns ``RangeTracker.SPLIT_POINTS_UNKNOWN
for both consumed and remaining number of split points, which indicates that the number of split points consumed and remaining is unknown.Returns: A pair that gives consumed and remaining number of split points. Consumed number of split points should be an integer larger than or equal to zero or RangeTracker.SPLIT_POINTS_UNKNOWN
. Remaining number of split points should be an integer larger than zero orRangeTracker.SPLIT_POINTS_UNKNOWN
.
-
-
class
apache_beam.io.range_trackers.
UnsplittableRangeTracker
(range_tracker)[source]¶ Bases:
apache_beam.io.iobase.RangeTracker
A RangeTracker that always ignores split requests.
This can be used to make a given
RangeTracker
object unsplittable by ignoring all calls totry_split()
. All other calls will be delegated to the givenRangeTracker
.Initializes UnsplittableRangeTracker.
Parameters: range_tracker (RangeTracker) – a RangeTracker
to which all method calls expect calls totry_split()
will be delegated.-
SPLIT_POINTS_UNKNOWN
= <object object>¶
-
-
class
apache_beam.io.range_trackers.
LexicographicKeyRangeTracker
(start_position=None, stop_position=None)[source]¶ Bases:
apache_beam.io.range_trackers.OrderedPositionRangeTracker
A range tracker that tracks progress through a lexicographically ordered keyspace of strings.
-
classmethod
fraction_to_position
(fraction, start=None, end=None)[source]¶ Linearly interpolates a key that is lexicographically fraction of the way between start and end.
-
classmethod
position_to_fraction
(key, start=None, end=None)[source]¶ Returns the fraction of keys in the range [start, end) that are less than the given key.
-
SPLIT_POINTS_UNKNOWN
= <object object>¶
-
UNSTARTED
= <object object>¶
-
fraction_consumed
()¶
-
position_at_fraction
(fraction)¶
-
set_current_position
(position)¶ Updates the last-consumed position to the given position.
A source may invoke this method for records that do not start at split points. This may modify the internal state of the
RangeTracker
. If the record starts at a split point, methodtry_claim()
must be invoked instead of this method.Parameters: position – starting position of a record being read by a source.
-
set_split_points_unclaimed_callback
(callback)¶ Sets a callback for determining the unclaimed number of split points.
By invoking this function, a
BoundedSource
can set a callback function that may get invoked by theRangeTracker
to determine the number of unclaimed split points. A split point is unclaimed ifRangeTracker.try_claim()
method has not been successfully invoked for that particular split point. The callback function accepts a single parameter, a stop position for the BoundedSource (stop_position). If the record currently being consumed by theBoundedSource
is at position current_position, callback should return the number of split points within the range (current_position, stop_position). Note that, this should not include the split point that is currently being consumed by the source.This function must be implemented by subclasses before being used.
Parameters: callback – a function that takes a single parameter, a stop position, and returns unclaimed number of split points for the source read operation that is calling this function. Value returned from callback should be either an integer larger than or equal to zero or RangeTracker.SPLIT_POINTS_UNKNOWN
.
-
split_points
()¶ Gives the number of split points consumed and remaining.
For a
RangeTracker
used by aBoundedSource
(within aBoundedSource.read()
invocation) this method produces a 2-tuple that gives the number of split points consumed by theBoundedSource
and the number of split points remaining within the range of theRangeTracker
that has not been consumed by theBoundedSource
.More specifically, given that the position of the current record being read by
BoundedSource
is current_position this method produces a tuple that consists of (1) number of split points in the range [self.start_position(), current_position) without including the split point that is currently being consumed. This represents the total amount of parallelism in the consumed part of the source. (2) number of split points within the range [current_position, self.stop_position()) including the split point that is currently being consumed. This represents the total amount of parallelism in the unconsumed part of the source.Methods of the class
RangeTracker
including this method may get invoked by different threads, hence must be made thread-safe, e.g. by using a single lock object.- ** General information about consumed and remaining number of split
- points returned by this method. **
- Before a source read (
BoundedSource.read()
invocation) claims the first split point, number of consumed split points is 0. This condition holds independent of whether the input is “splittable”. A splittable source is a source that has more than one split point. - Any source read that has only claimed one split point has 0 consumed split points since the first split point is the current split point and is still being processed. This condition holds independent of whether the input is splittable.
- For an empty source read which never invokes
RangeTracker.try_claim()
, the consumed number of split points is 0. This condition holds independent of whether the input is splittable. - For a source read which has invoked
RangeTracker.try_claim()
n times, the consumed number of split points is n -1. - If a
BoundedSource
sets a callback through functionset_split_points_unclaimed_callback()
,RangeTracker
can use that callback when determining remaining number of split points. - Remaining split points should include the split point that is currently being consumed by the source read. Hence if the above callback returns an integer value n, remaining number of split points should be (n + 1).
- After last split point is claimed remaining split points becomes 1, because this unfinished read itself represents an unfinished split point.
- After all records of the source has been consumed, remaining number of split points becomes 0 and consumed number of split points becomes equal to the total number of split points within the range being read by the source. This method does not address this condition and will continue to report number of consumed split points as (“total number of split points” - 1) and number of remaining split points as 1. A runner that performs the reading of the source can detect when all records have been consumed and adjust remaining and consumed number of split points accordingly.
- Before a source read (
** Examples **
A “perfectly splittable” input which can be read in parallel down to the individual records.
Consider a perfectly splittable input that consists of 50 split points.
- Before a source read (
BoundedSource.read()
invocation) claims the first split point, number of consumed split points is 0 number of remaining split points is 50. - After claiming first split point, consumed number of split points is 0 and remaining number of split is 50.
- After claiming split point #30, consumed number of split points is 29 and remaining number of split points is 21.
- After claiming all 50 split points, consumed number of split points is 49 and remaining number of split points is 1.
a “block-compressed” file format such as
avroio
, in which a block of records has to be read as a whole, but different blocks can be read in parallel.Consider a block compressed input that consists of 5 blocks.
- Before a source read (
BoundedSource.read()
invocation) claims the first split point (first block), number of consumed split points is 0 number of remaining split points is 5. - After claiming first split point, consumed number of split points is 0 and remaining number of split is 5.
- After claiming split point #3, consumed number of split points is 2 and remaining number of split points is 3.
- After claiming all 5 split points, consumed number of split points is 4 and remaining number of split points is 1.
an “unsplittable” input such as a cursor in a database or a gzip compressed file.
Such an input is considered to have only a single split point. Number of consumed split points is always 0 and number of remaining split points is always 1.
By default
RangeTracker` returns ``RangeTracker.SPLIT_POINTS_UNKNOWN
for both consumed and remaining number of split points, which indicates that the number of split points consumed and remaining is unknown.Returns: A pair that gives consumed and remaining number of split points. Consumed number of split points should be an integer larger than or equal to zero or RangeTracker.SPLIT_POINTS_UNKNOWN
. Remaining number of split points should be an integer larger than zero orRangeTracker.SPLIT_POINTS_UNKNOWN
.
-
start_position
()¶
-
stop_position
()¶
-
try_claim
(position)¶
-
try_split
(position)¶
-
classmethod