apache_beam.io.mongodbio module¶
This module implements IO classes to read and write data on MongoDB.
Read from MongoDB¶
ReadFromMongoDB
is a PTransform
that reads from a configured
MongoDB source and returns a PCollection
of dict representing MongoDB
documents.
To configure MongoDB source, the URI to connect to MongoDB server, database
name, collection name needs to be provided.
Example usage:
pipeline | ReadFromMongoDB(uri='mongodb://localhost:27017',
db='testdb',
coll='input')
To read from MongoDB Atlas, use bucket_auto
option to enable
@bucketAuto
MongoDB aggregation instead of splitVector
command which is a high-privilege function that cannot be assigned
to any user in Atlas.
Example usage:
pipeline | ReadFromMongoDB(uri='mongodb+srv://user:pwd@cluster0.mongodb.net',
db='testdb',
coll='input',
bucket_auto=True)
Write to MongoDB:¶
WriteToMongoDB
is a PTransform
that writes MongoDB documents to
configured sink, and the write is conducted through a mongodb bulk_write of
ReplaceOne
operations. If the document’s _id field already existed in the
MongoDB collection, it results in an overwrite, otherwise, a new document
will be inserted.
Example usage:
pipeline | WriteToMongoDB(uri='mongodb://localhost:27017',
db='testdb',
coll='output',
batch_size=10)
No backward compatibility guarantees. Everything in this module is experimental.
-
class
apache_beam.io.mongodbio.
ReadFromMongoDB
(uri='mongodb://localhost:27017', db=None, coll=None, filter=None, projection=None, extra_client_params=None, bucket_auto=False)[source]¶ Bases:
apache_beam.transforms.ptransform.PTransform
A
PTransform
to read MongoDB documents into aPCollection
.Initialize a
ReadFromMongoDB
Parameters: - uri (str) – The MongoDB connection string following the URI format.
- db (str) – The MongoDB database name.
- coll (str) – The MongoDB collection name.
- filter – A bson.SON object specifying elements which must be present for a document to be included in the result set.
- projection – A list of field names that should be returned in the result set or a dict specifying the fields to include or exclude.
- extra_client_params (dict) – Optional MongoClient parameters.
- bucket_auto (bool) – If
True
, use MongoDB $bucketAuto aggregation to split collection into bundles instead of splitVector command, which does not work with MongoDB Atlas. IfFalse
(the default), use splitVector command for bundling.
Returns: -
annotations
() → Dict[str, Union[bytes, str, google.protobuf.message.Message]]¶
-
default_label
()¶
-
default_type_hints
()¶
-
display_data
()¶ Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
Returns: A dictionary containing key:value
pairs. The value might be an integer, float or string value; aDisplayDataItem
for values that have more data (e.g. short value, label, url); or aHasDisplayData
instance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
Return type: Dict[str, Any]
-
classmethod
from_runner_api
(proto, context)¶
-
get_type_hints
()¶ Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
-
get_windowing
(inputs)¶ Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
-
infer_output_type
(unused_input_type)¶
-
label
¶
-
pipeline
= None¶
-
classmethod
register_urn
(urn, parameter_type, constructor=None)¶
-
runner_api_requires_keyed_input
()¶
-
side_inputs
= ()¶
-
to_runner_api
(context, has_parts=False, **extra_kwargs)¶
-
to_runner_api_parameter
(unused_context)¶
-
to_runner_api_pickled
(unused_context)¶
-
type_check_inputs
(pvalueish)¶
-
type_check_inputs_or_outputs
(pvalueish, input_or_output)¶
-
type_check_outputs
(pvalueish)¶
-
with_input_types
(input_type_hint)¶ Annotates the input type of a
PTransform
with a type-hint.Parameters: input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint
.Raises: TypeError
– If input_type_hint is not a valid type-hint. Seeapache_beam.typehints.typehints.validate_composite_type_param()
for further details.Returns: A reference to the instance of this particular PTransform
object. This allows chaining type-hinting related methods.Return type: PTransform
-
with_output_types
(type_hint)¶ Annotates the output type of a
PTransform
with a type-hint.Parameters: type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint
.Raises: TypeError
– If type_hint is not a valid type-hint. Seevalidate_composite_type_param()
for further details.Returns: A reference to the instance of this particular PTransform
object. This allows chaining type-hinting related methods.Return type: PTransform
-
class
apache_beam.io.mongodbio.
WriteToMongoDB
(uri='mongodb://localhost:27017', db=None, coll=None, batch_size=100, extra_client_params=None)[source]¶ Bases:
apache_beam.transforms.ptransform.PTransform
WriteToMongoDB is a
PTransform
that writes aPCollection
of mongodb document to the configured MongoDB server.In order to make the document writes idempotent so that the bundles are retry-able without creating duplicates, the PTransform added 2 transformations before final write stage: a
GenerateId
transform and aReshuffle
transform.:----------------------------------------------- Pipeline --> |GenerateId --> Reshuffle --> WriteToMongoSink| ----------------------------------------------- (WriteToMongoDB)
The
GenerateId
transform adds a random and unique*_id* field to the documents if they don’t already have one, it uses the same format as MongoDB default. TheReshuffle
transform makes sure that no fusion happens betweenGenerateId
and the final write stage transform,so that the set of documents and their unique IDs are not regenerated if final write step is retried due to a failure. This prevents duplicate writes of the same document with different unique IDs.Parameters: - uri (str) – The MongoDB connection string following the URI format
- db (str) – The MongoDB database name
- coll (str) – The MongoDB collection name
- batch_size (int) – Number of documents per bulk_write to MongoDB, default to 100
- extra_client_params (dict) –
Optional MongoClient parameters as keyword arguments
Returns: -
annotations
() → Dict[str, Union[bytes, str, google.protobuf.message.Message]]¶
-
default_label
()¶
-
default_type_hints
()¶
-
display_data
()¶ Returns the display data associated to a pipeline component.
It should be reimplemented in pipeline components that wish to have static display data.
Returns: A dictionary containing key:value
pairs. The value might be an integer, float or string value; aDisplayDataItem
for values that have more data (e.g. short value, label, url); or aHasDisplayData
instance that has more display data that should be picked up. For example:{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }
Return type: Dict[str, Any]
-
classmethod
from_runner_api
(proto, context)¶
-
get_type_hints
()¶ Gets and/or initializes type hints for this object.
If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.
-
get_windowing
(inputs)¶ Returns the window function to be associated with transform’s output.
By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).
-
infer_output_type
(unused_input_type)¶
-
label
¶
-
pipeline
= None¶
-
classmethod
register_urn
(urn, parameter_type, constructor=None)¶
-
runner_api_requires_keyed_input
()¶
-
side_inputs
= ()¶
-
to_runner_api
(context, has_parts=False, **extra_kwargs)¶
-
to_runner_api_parameter
(unused_context)¶
-
to_runner_api_pickled
(unused_context)¶
-
type_check_inputs
(pvalueish)¶
-
type_check_inputs_or_outputs
(pvalueish, input_or_output)¶
-
type_check_outputs
(pvalueish)¶
-
with_input_types
(input_type_hint)¶ Annotates the input type of a
PTransform
with a type-hint.Parameters: input_type_hint (type) – An instance of an allowed built-in type, a custom class, or an instance of a TypeConstraint
.Raises: TypeError
– If input_type_hint is not a valid type-hint. Seeapache_beam.typehints.typehints.validate_composite_type_param()
for further details.Returns: A reference to the instance of this particular PTransform
object. This allows chaining type-hinting related methods.Return type: PTransform
-
with_output_types
(type_hint)¶ Annotates the output type of a
PTransform
with a type-hint.Parameters: type_hint (type) – An instance of an allowed built-in type, a custom class, or a TypeConstraint
.Raises: TypeError
– If type_hint is not a valid type-hint. Seevalidate_composite_type_param()
for further details.Returns: A reference to the instance of this particular PTransform
object. This allows chaining type-hinting related methods.Return type: PTransform