Discussion:
HashMap which can spill to disk for Hadoop?
(too old to reply)
C G
2007-12-19 19:58:48 UTC
Permalink
Hi All:

The aggregation classes in Hadoop use a HashMap to hold unique values in memory when computing unique counts, etc. I ran into a situation on 32-node grid (4G memory/node) where a single node runs out of memory within the reduce phase trying to manage a very large HashMap. This was disappointing because the dataset is only 44M rows (4G) of data. This is a scenario where I am counting unique values associated with various events, where the total number of events is very small and the number of unique values is very high. Since the event IDs serve as keys as the number of distinct event IDs is small, there is a consequently small number of reducers running, where each reducer is expected to manage a very large HashMap of unique values.

It looks like I need to build my own unique aggregator, so I am looking for an implementation of HashMap which can spill to disk as needed. I've considered using BDB as a backing store, and I've looking into Derby's BackingStoreHashtable as well.

For the present time I can restructure my data in an attempt to get more reducers to run, but I can see in the very near future where even that will run out of memory.

Any thoughts,comments, or flames?

Thanks,
C G



---------------------------------
Looking for last minute shopping deals? Find them fast with Yahoo! Search.
Jim Kellerman
2007-12-19 20:08:42 UTC
Permalink
Have you looked at hadoop.io.MapWritable?

---
Jim Kellerman, Senior Engineer; Powerset
-----Original Message-----
Sent: Wednesday, December 19, 2007 11:59 AM
Subject: HashMap which can spill to disk for Hadoop?
The aggregation classes in Hadoop use a HashMap to hold
unique values in memory when computing unique counts, etc. I
ran into a situation on 32-node grid (4G memory/node) where a
single node runs out of memory within the reduce phase trying
to manage a very large HashMap. This was disappointing
because the dataset is only 44M rows (4G) of data. This is a
scenario where I am counting unique values associated with
various events, where the total number of events is very
small and the number of unique values is very high. Since
the event IDs serve as keys as the number of distinct event
IDs is small, there is a consequently small number of
reducers running, where each reducer is expected to manage a
very large HashMap of unique values.
It looks like I need to build my own unique aggregator, so
I am looking for an implementation of HashMap which can spill
to disk as needed. I've considered using BDB as a backing
store, and I've looking into Derby's BackingStoreHashtable as well.
For the present time I can restructure my data in an
attempt to get more reducers to run, but I can see in the
very near future where even that will run out of memory.
Any thoughts,comments, or flames?
Thanks,
C G
---------------------------------
Looking for last minute shopping deals? Find them fast with
Yahoo! Search.
Ted Dunning
2007-12-19 23:04:55 UTC
Permalink
You should also be able get quite a bit of mileage out of special purpose
HashMaps. In general, java generic collections incur large to huge
penalties for certain special cases. If you have one of these special cases
or can put up with one, then you may be able to get 1+ order of magnitude
improvement.

Take as an example a hashed set of integers. A HashSet<Integer> will
consume about 40 bytes per entry and each Integer will consume considerably
space as well. You can code a customized multiple hash set for integers
only that consumes 4 * alpha bytes were alpha ~= 2. If you really, really
wanted a set of integers and are willing to have small errors on the count
of distinct items, then this set of integers will suffice nicely.

Now, a factor of 15 improvement isn't anything to sneeze at, but it isn't
the same as unlimited.
Post by C G
The aggregation classes in Hadoop use a HashMap to hold unique values in
memory when computing unique counts, etc. I ran into a situation on 32-node
grid (4G memory/node) where a single node runs out of memory within the reduce
phase trying to manage a very large HashMap. This was disappointing because
the dataset is only 44M rows (4G) of data. This is a scenario where I am
counting unique values associated with various events, where the total number
of events is very small and the number of unique values is very high. Since
the event IDs serve as keys as the number of distinct event IDs is small,
there is a consequently small number of reducers running, where each reducer
is expected to manage a very large HashMap of unique values.
It looks like I need to build my own unique aggregator, so I am looking for
an implementation of HashMap which can spill to disk as needed. I've
considered using BDB as a backing store, and I've looking into Derby's
BackingStoreHashtable as well.
For the present time I can restructure my data in an attempt to get more
reducers to run, but I can see in the very near future where even that will
run out of memory.
Any thoughts,comments, or flames?
Thanks,
C G
---------------------------------
Looking for last minute shopping deals? Find them fast with Yahoo! Search.
Runping Qi
2007-12-20 04:20:03 UTC
Permalink
It would be nice if you can contribute a file backed hashmap, or a file
backed implementation of the unique count aggregator.

Short of that, if you just need to count the unique values for each
event id, you can do so by using the aggregate classes with each
event-id/event-value pair as a key and simply counting the number of
occurrences of each composite key.

Runping
-----Original Message-----
Sent: Wednesday, December 19, 2007 11:59 AM
Subject: HashMap which can spill to disk for Hadoop?
The aggregation classes in Hadoop use a HashMap to hold unique
values in
memory when computing unique counts, etc. I ran into a situation on
32-
node grid (4G memory/node) where a single node runs out of memory
within
the reduce phase trying to manage a very large HashMap. This was
disappointing because the dataset is only 44M rows (4G) of data. This
is
a scenario where I am counting unique values associated with various
events, where the total number of events is very small and the number
of
unique values is very high. Since the event IDs serve as keys as the
number of distinct event IDs is small, there is a consequently small
number of reducers running, where each reducer is expected to manage a
very large HashMap of unique values.
It looks like I need to build my own unique aggregator, so I am
looking
for an implementation of HashMap which can spill to disk as needed.
I've
considered using BDB as a backing store, and I've looking into Derby's
BackingStoreHashtable as well.
For the present time I can restructure my data in an attempt to get
more
reducers to run, but I can see in the very near future where even that
will run out of memory.
Any thoughts,comments, or flames?
Thanks,
C G
---------------------------------
Looking for last minute shopping deals? Find them fast with Yahoo! Search.
C G
2007-12-20 14:30:55 UTC
Permalink
If we decide to implement our own file-backed Hashmap I'd be willing to contribute it back to the project.

We are rolling up unique counts per event ID. So we use event ID as a key, and want to count the number of unique event values. Since the number of event IDs is reasonably small (well under 100) and the universe of values is large (potentially millions) we wind up in a situation where we are pushing too much into memory.

Ted commented about special purpose Hashmaps. Unfortunatley, our event values can be up to 128 bits long so I don't think a special purpose Hashmap would work.

Thanks,
C G

Runping Qi <runping-ZXvpkYn067l8UrSeD/***@public.gmane.org> wrote:

It would be nice if you can contribute a file backed hashmap, or a file
backed implementation of the unique count aggregator.

Short of that, if you just need to count the unique values for each
event id, you can do so by using the aggregate classes with each
event-id/event-value pair as a key and simply counting the number of
occurrences of each composite key.

Runping
-----Original Message-----
Sent: Wednesday, December 19, 2007 11:59 AM
Subject: HashMap which can spill to disk for Hadoop?
The aggregation classes in Hadoop use a HashMap to hold unique
values in
memory when computing unique counts, etc. I ran into a situation on
32-
node grid (4G memory/node) where a single node runs out of memory
within
the reduce phase trying to manage a very large HashMap. This was
disappointing because the dataset is only 44M rows (4G) of data. This
is
a scenario where I am counting unique values associated with various
events, where the total number of events is very small and the number
of
unique values is very high. Since the event IDs serve as keys as the
number of distinct event IDs is small, there is a consequently small
number of reducers running, where each reducer is expected to manage a
very large HashMap of unique values.
It looks like I need to build my own unique aggregator, so I am
looking
for an implementation of HashMap which can spill to disk as needed.
I've
considered using BDB as a backing store, and I've looking into Derby's
BackingStoreHashtable as well.
For the present time I can restructure my data in an attempt to get
more
reducers to run, but I can see in the very near future where even that
will run out of memory.
Any thoughts,comments, or flames?
Thanks,
C G
---------------------------------
Looking for last minute shopping deals? Find them fast with Yahoo!
Search.
---------------------------------
Be a better friend, newshound, and know-it-all with Yahoo! Mobile. Try it now.
Jeff Eastman
2007-12-20 15:15:25 UTC
Permalink
I've brought up a small cluster and uploaded some large files. The
master node is cu027 and it seems to be getting an unfair percentage of
the blocks allocated to it, especially compared to cu171 which has the
same size disk. Can somebody shed some light on the reasons for this?



Jeff





Node

Last Contact

Admin State

Size (GB)

Used (%)

Blocks

cu009
<http://cu009.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

0

In Service

69.22

75.07

81

cu027
<http://cu027.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

1

In Service

291.24

59.74

333

cu028
<http://cu028.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

1

In Service

71.19

35.34

76

cu034
<http://cu034.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

1

In Service

71.19

17.79

81

cu035
<http://cu035.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

0

In Service

71.19

18.49

64

cu045
<http://cu045.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

1

In Service

71.19

16.68

70

cu050
<http://cu050.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

2

In Service

71.19

31.99

69

cu062
<http://cu062.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

2

In Service

71.19

39.64

72

cu063
<http://cu063.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

2

In Service

71.19

25.03

77

cu171
<http://cu171.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

2

In Service

291.24

13.05

79
Jeff Eastman
2007-12-20 15:22:56 UTC
Permalink
Oooh, too much information. I won't to that again<grin>. Here's the
table in plain text.


Node Last Contact State Size (GB) Used (%) Blocks
cu009 0 In Service 69.22 75.07
81
cu027 1 In Service 291.24 59.74
333
cu028 1 In Service 71.19 35.34
76
cu034 1 In Service 71.19 17.79
81
cu035 0 In Service 71.19 18.49
64
cu045 1 In Service 71.19 16.68
70
cu050 2 In Service 71.19 31.99
69
cu062 2 In Service 71.19 39.64
72
cu063 2 In Service 71.19 25.03
77
cu171 2 In Service 291.24 13.05
79

Jeff
dhruba Borthakur
2007-12-20 17:37:55 UTC
Permalink
Hi Jeff,

Did you run the file-upload command on the master node itself? The DFS
client attempts to store one replica of the data on the node on which
the DFSClient is running.

To get a uniform distribution, it would be good if you upload your data
from multiple nodes in your cluster.

Thanks,
dhruba


-----Original Message-----
From: Jeff Eastman [mailto:jeastman-r6XmE1ReyPisTnJN9+***@public.gmane.org]
Sent: Thursday, December 20, 2007 7:15 AM
To: hadoop-user-PPu3vs9EauNd/SJB6HiN2Ni2O/***@public.gmane.org
Subject: DFS Block Allocation

I've brought up a small cluster and uploaded some large files. The
master node is cu027 and it seems to be getting an unfair percentage of
the blocks allocated to it, especially compared to cu171 which has the
same size disk. Can somebody shed some light on the reasons for this?



Jeff





Node

Last Contact

Admin State

Size (GB)

Used (%)

Blocks

cu009
<http://cu009.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

0

In Service

69.22

75.07

81

cu027
<http://cu027.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

1

In Service

291.24

59.74

333

cu028
<http://cu028.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

1

In Service

71.19

35.34

76

cu034
<http://cu034.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

1

In Service

71.19

17.79

81

cu035
<http://cu035.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

0

In Service

71.19

18.49

64

cu045
<http://cu045.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

1

In Service

71.19

16.68

70

cu050
<http://cu050.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

2

In Service

71.19

31.99

69

cu062
<http://cu062.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

2

In Service

71.19

39.64

72

cu063
<http://cu063.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

2

In Service

71.19

25.03

77

cu171
<http://cu171.cubit.sp.collab.net:50075/browseDirectory.jsp?namenodeInfo
Port=50070&dir=%2F>

2

In Service

291.24

13.05

79
Jeff Eastman
2007-12-20 18:16:48 UTC
Permalink
Thanks Dhruba,

That makes sense. The data was already on the master node and I did not
consider that I could upload from other nodes too. The distribution on
the slave nodes is uniform and your response explains why the one other
bigger box did not get a larger number of blocks. Noting your use of the
word "attempts", can I conclude that at some point it might be
impossible to upload blocks from a local file to the DFS on the same
node and at that point the blocks would all be loaded elsewhere?

Jeff

-----Original Message-----
From: dhruba Borthakur [mailto:dhruba-ZXvpkYn067l8UrSeD/***@public.gmane.org]
Sent: Thursday, December 20, 2007 9:38 AM
To: hadoop-user-PPu3vs9EauNd/SJB6HiN2Ni2O/***@public.gmane.org
Subject: RE: DFS Block Allocation

Hi Jeff,

Did you run the file-upload command on the master node itself? The DFS
client attempts to store one replica of the data on the node on which
the DFSClient is running.

To get a uniform distribution, it would be good if you upload your data
from multiple nodes in your cluster.

Thanks,
dhruba


-----Original Message-----
From: Jeff Eastman [mailto:jeastman-r6XmE1ReyPisTnJN9+***@public.gmane.org]
Sent: Thursday, December 20, 2007 7:15 AM
To: hadoop-user-PPu3vs9EauNd/SJB6HiN2Ni2O/***@public.gmane.org
Subject: DFS Block Allocation

I've brought up a small cluster and uploaded some large files. The
master node is cu027 and it seems to be getting an unfair percentage of
the blocks allocated to it, especially compared to cu171 which has the
same size disk. Can somebody shed some light on the reasons for this?



Jeff
Ted Dunning
2007-12-20 20:02:26 UTC
Permalink
Yes.

I try to always upload data from a machine that is not part of the cluster
for exactly that reason.

I still find that I need to rebalance due to a strange problem in placement.
My datanodes have 10x different sized HDFS disks and I suspect that the
upload is picking datanodes uniformly rather than according to available
space.

Oddly enough, my rebalancing code works well. All it does is iterate
through all files of interest, increasing the replication count for 30
seconds and then decreasing it again (obviously this has to thread to
manipulate more than 2 files per minute). The replication code seems to
select a home for new blocks more correctly than the original placement.
Noting your use of the word "attempts", can I conclude that at some point it
might be impossible to upload blocks from a local file to the DFS on the same
node and at that point the blocks would all be loaded elsewhere?
Jeff Eastman
2007-12-20 22:35:09 UTC
Permalink
Ted,

I'm still learning, obviously. I was not aware one could upload from any
machine other than the master (which did seem overly restrictive), and
uploading from one outside the cloud would be even better. Can you give
me a pointer on how to accomplish this? Is there a relevant FAQ or
documents I have missed?

My experience with balancing is similar to yours; the upload is uniform,
independent of disk size or availability. I will try rebalancing.

Thanks,
Jeff

-----Original Message-----
From: Ted Dunning [mailto:tdunning-GzNPid7y/***@public.gmane.org]
Sent: Thursday, December 20, 2007 12:02 PM
To: hadoop-user-PPu3vs9EauNd/SJB6HiN2Ni2O/***@public.gmane.org
Subject: Re: DFS Block Allocation


Yes.

I try to always upload data from a machine that is not part of the
cluster
for exactly that reason.

I still find that I need to rebalance due to a strange problem in
placement.
My datanodes have 10x different sized HDFS disks and I suspect that the
upload is picking datanodes uniformly rather than according to available
space.

Oddly enough, my rebalancing code works well. All it does is iterate
through all files of interest, increasing the replication count for 30
seconds and then decreasing it again (obviously this has to thread to
manipulate more than 2 files per minute). The replication code seems to
select a home for new blocks more correctly than the original placement.
Noting your use of the word "attempts", can I conclude that at some point it
might be impossible to upload blocks from a local file to the DFS on the same
node and at that point the blocks would all be loaded elsewhere?
Ted Dunning
2007-12-20 23:00:08 UTC
Permalink
Just copy the hadoop distro directory to the other machine and use whatever
command you were using before.

A program that uses hadoop just have to have access to all of the nodes
across the net. It doesn't assume anything else.
.... Can you give me a pointer on how to accomplish this (upload from other
machine)?
C G
2007-12-21 01:52:32 UTC
Permalink
Hmmm....this thread is very interesting - I didn't know most of the stuff mentioned here.

Ted, when you say "copy in the distro" do you need to include the configuration files from the running grid? You don't need to actually start HDFS on this node do you?

If I'm following this approach correctly, I would want to have an "xfer server" whose job it is to essentially run dfs -copyFromLocal on all inbound-to-HDFS data. Once I'm certain that my data has copied correctly, I can delete the local files on the xfer server.

This is great news, as my current system wastes a lot of time copying data from data acquisition servers to the master node. If I can copy to HDFS directly from ny acquisition servers then I am a happy guy....

Thanks,
C G


Ted Dunning <tdunning-GzNPid7y/***@public.gmane.org> wrote:


Just copy the hadoop distro directory to the other machine and use whatever
command you were using before.

A program that uses hadoop just have to have access to all of the nodes
across the net. It doesn't assume anything else.
.... Can you give me a pointer on how to accomplish this (upload from other
machine)?
---------------------------------
Be a better friend, newshound, and know-it-all with Yahoo! Mobile. Try it now.
Ted Dunning
2007-12-21 02:01:50 UTC
Permalink
Post by C G
Ted, when you say "copy in the distro" do you need to include the
configuration files from the running grid? You don't need to actually start
HDFS on this node do you?
You are correct. You only need the config files (and the hadoop script
helps make things easier).
Post by C G
If I'm following this approach correctly, I would want to have an "xfer
server" whose job it is to essentially run dfs -copyFromLocal on all
inbound-to-HDFS data. Once I'm certain that my data has copied correctly, I
can delete the local files on the xfer server.
Yes.
Post by C G
This is great news, as my current system wastes a lot of time copying data
from data acquisition servers to the master node. If I can copy to HDFS
directly from ny acquisition servers then I am a happy guy....
You are a happy guy.

If your acquisition systems can see all of your datanodes.
C G
2007-12-21 06:11:33 UTC
Permalink
I am indeed a happy man...our data acq. systems can see and interact with the compute grid proper so configuring nodes outside the grid to speak with HDFS should be reasonably straightforward.

C G

Ted Dunning <tdunning-GzNPid7y/***@public.gmane.org> wrote:



On 12/20/07 5:52 PM, "C G"
Post by C G
Ted, when you say "copy in the distro" do you need to include the
configuration files from the running grid? You don't need to actually start
HDFS on this node do you?
You are correct. You only need the config files (and the hadoop script
helps make things easier).
Post by C G
If I'm following this approach correctly, I would want to have an "xfer
server" whose job it is to essentially run dfs -copyFromLocal on all
inbound-to-HDFS data. Once I'm certain that my data has copied correctly, I
can delete the local files on the xfer server.
Yes.
Post by C G
This is great news, as my current system wastes a lot of time copying data
from data acquisition servers to the master node. If I can copy to HDFS
directly from ny acquisition servers then I am a happy guy....
You are a happy guy.

If your acquisition systems can see all of your datanodes.




---------------------------------
Never miss a thing. Make Yahoo your homepage.
Eric Baldeschwieler
2007-12-26 15:53:02 UTC
Permalink
With a secondary sort on the values during the shuffle, nothing would
need to be kept in memory, since it could all be counted in a single
scan. Right? Wouldn't that be a much more efficient solution?
Post by C G
If we decide to implement our own file-backed Hashmap I'd be
willing to contribute it back to the project.
We are rolling up unique counts per event ID. So we use event ID
as a key, and want to count the number of unique event values.
Since the number of event IDs is reasonably small (well under 100)
and the universe of values is large (potentially millions) we wind
up in a situation where we are pushing too much into memory.
Ted commented about special purpose Hashmaps. Unfortunatley, our
event values can be up to 128 bits long so I don't think a special
purpose Hashmap would work.
Thanks,
C G
It would be nice if you can contribute a file backed hashmap, or a file
backed implementation of the unique count aggregator.
Short of that, if you just need to count the unique values for each
event id, you can do so by using the aggregate classes with each
event-id/event-value pair as a key and simply counting the number of
occurrences of each composite key.
Runping
-----Original Message-----
Sent: Wednesday, December 19, 2007 11:59 AM
Subject: HashMap which can spill to disk for Hadoop?
The aggregation classes in Hadoop use a HashMap to hold unique
values in
memory when computing unique counts, etc. I ran into a situation on
32-
node grid (4G memory/node) where a single node runs out of memory
within
the reduce phase trying to manage a very large HashMap. This was
disappointing because the dataset is only 44M rows (4G) of data. This
is
a scenario where I am counting unique values associated with various
events, where the total number of events is very small and the number
of
unique values is very high. Since the event IDs serve as keys as the
number of distinct event IDs is small, there is a consequently small
number of reducers running, where each reducer is expected to
manage a
very large HashMap of unique values.
It looks like I need to build my own unique aggregator, so I am
looking
for an implementation of HashMap which can spill to disk as needed.
I've
considered using BDB as a backing store, and I've looking into Derby's
BackingStoreHashtable as well.
For the present time I can restructure my data in an attempt to get
more
reducers to run, but I can see in the very near future where even that
will run out of memory.
Any thoughts,comments, or flames?
Thanks,
C G
---------------------------------
Looking for last minute shopping deals? Find them fast with Yahoo!
Search.
---------------------------------
Be a better friend, newshound, and know-it-all with Yahoo! Mobile.
Try it now.
Ted Dunning
2007-12-26 20:22:58 UTC
Permalink
Sounds much better to me.
Post by Eric Baldeschwieler
With a secondary sort on the values during the shuffle, nothing would
need to be kept in memory, since it could all be counted in a single
scan. Right? Wouldn't that be a much more efficient solution?
Post by C G
If we decide to implement our own file-backed Hashmap I'd be
willing to contribute it back to the project.
We are rolling up unique counts per event ID. So we use event ID
as a key, and want to count the number of unique event values.
Since the number of event IDs is reasonably small (well under 100)
and the universe of values is large (potentially millions) we wind
up in a situation where we are pushing too much into memory.
Ted commented about special purpose Hashmaps. Unfortunatley, our
event values can be up to 128 bits long so I don't think a special
purpose Hashmap would work.
Thanks,
C G
It would be nice if you can contribute a file backed hashmap, or a file
backed implementation of the unique count aggregator.
Short of that, if you just need to count the unique values for each
event id, you can do so by using the aggregate classes with each
event-id/event-value pair as a key and simply counting the number of
occurrences of each composite key.
Runping
-----Original Message-----
Sent: Wednesday, December 19, 2007 11:59 AM
Subject: HashMap which can spill to disk for Hadoop?
The aggregation classes in Hadoop use a HashMap to hold unique
values in
memory when computing unique counts, etc. I ran into a situation on
32-
node grid (4G memory/node) where a single node runs out of memory
within
the reduce phase trying to manage a very large HashMap. This was
disappointing because the dataset is only 44M rows (4G) of data. This
is
a scenario where I am counting unique values associated with various
events, where the total number of events is very small and the number
of
unique values is very high. Since the event IDs serve as keys as the
number of distinct event IDs is small, there is a consequently small
number of reducers running, where each reducer is expected to
manage a
very large HashMap of unique values.
It looks like I need to build my own unique aggregator, so I am
looking
for an implementation of HashMap which can spill to disk as needed.
I've
considered using BDB as a backing store, and I've looking into Derby's
BackingStoreHashtable as well.
For the present time I can restructure my data in an attempt to get
more
reducers to run, but I can see in the very near future where even that
will run out of memory.
Any thoughts,comments, or flames?
Thanks,
C G
---------------------------------
Looking for last minute shopping deals? Find them fast with Yahoo!
Search.
---------------------------------
Be a better friend, newshound, and know-it-all with Yahoo! Mobile.
Try it now.
Rui Shi
2007-12-21 03:06:27 UTC
Permalink
Hi,

I am confused a bit. What is the difference if I use "hadoop distcp" to upload files? I assume "hadoop distcp" using multiple trackers to upload files in parallel.

Thanks,

Rui

----- Original Message ----
From: Ted Dunning <tdunning-GzNPid7y/***@public.gmane.org>
To: hadoop-user-PPu3vs9EauNd/SJB6HiN2Ni2O/***@public.gmane.org
Sent: Thursday, December 20, 2007 6:01:50 PM
Subject: Re: DFS Block Allocation
Post by C G
Ted, when you say "copy in the distro" do you need to include the
configuration files from the running grid? You don't need to
actually start
Post by C G
HDFS on this node do you?
You are correct. You only need the config files (and the hadoop script
helps make things easier).
Post by C G
If I'm following this approach correctly, I would want to have an
"xfer
Post by C G
server" whose job it is to essentially run dfs -copyFromLocal on all
inbound-to-HDFS data. Once I'm certain that my data has copied
correctly, I
Post by C G
can delete the local files on the xfer server.
Yes.
Post by C G
This is great news, as my current system wastes a lot of time
copying data
Post by C G
from data acquisition servers to the master node. If I can copy to
HDFS
Post by C G
directly from ny acquisition servers then I am a happy guy....
You are a happy guy.

If your acquisition systems can see all of your datanodes.







____________________________________________________________________________________
Never miss a thing. Make Yahoo your home page.
http://www.yahoo.com/r/hs
Ted Dunning
2007-12-21 03:12:46 UTC
Permalink
Distcp is a map-reduce program where the maps read the files. This means
that all of your tasknodes have to be able to read the files in question.

Many times it is easier to have a writer push the files at the cluster,
especially if you are reading data from a conventional unix file system. It
would be a VERY bad idea to mount an NFS file system on an entire cluster.
Post by Rui Shi
Hi,
I am confused a bit. What is the difference if I use "hadoop distcp" to upload
files? I assume "hadoop distcp" using multiple trackers to upload files in
parallel.
Thanks,
Rui
----- Original Message ----
Sent: Thursday, December 20, 2007 6:01:50 PM
Subject: Re: DFS Block Allocation
Post by C G
Ted, when you say "copy in the distro" do you need to include the
configuration files from the running grid? You don't need to
actually start
Post by C G
HDFS on this node do you?
You are correct. You only need the config files (and the hadoop script
helps make things easier).
Post by C G
If I'm following this approach correctly, I would want to have an
"xfer
Post by C G
server" whose job it is to essentially run dfs -copyFromLocal on all
inbound-to-HDFS data. Once I'm certain that my data has copied
correctly, I
Post by C G
can delete the local files on the xfer server.
Yes.
Post by C G
This is great news, as my current system wastes a lot of time
copying data
Post by C G
from data acquisition servers to the master node. If I can copy to
HDFS
Post by C G
directly from ny acquisition servers then I am a happy guy....
You are a happy guy.
If your acquisition systems can see all of your datanodes.
______________________________________________________________________________
______
Never miss a thing. Make Yahoo your home page.
http://www.yahoo.com/r/hs
Joydeep Sen Sarma
2007-12-21 04:30:19 UTC
Permalink
i presume you meant that the act of 'mounting' itself is not bad - but letting the entire cluster start reading from a hapless filer is :-)

i have actually found it very useful to upload files though map-reduce. we have periodic jobs that are in effect tailing nfs files and copying data to hdfs. because of random job placement, data is uniformly distributed. and because we run periodically, we usually don't need more than a task or two to copy in parallel.

the nice thing is that if we do ever fall behind (network glitches, filer overload, whatever) - the code automatically increases the number of readers to catch up (with certain bounds on number of concurrent readers). (something i would have lot more trouble doing outside of Hadoop)

the low hanging fruit we can contribute back are improvements to distcp (wildcards, parallel transfer of large text files) - but the larger setup is interesting (almost like a self-adjusting parallel rsync) that probably needs more generalization for wider use.

________________________________

From: Ted Dunning [mailto:tdunning-GzNPid7y/***@public.gmane.org]
Sent: Thu 12/20/2007 7:12 PM
To: hadoop-user-PPu3vs9EauNd/SJB6HiN2Ni2O/***@public.gmane.org
Subject: Re: DFS Block Allocation




Distcp is a map-reduce program where the maps read the files. This means
that all of your tasknodes have to be able to read the files in question.

Many times it is easier to have a writer push the files at the cluster,
especially if you are reading data from a conventional unix file system. It
would be a VERY bad idea to mount an NFS file system on an entire cluster.
Post by Rui Shi
Hi,
I am confused a bit. What is the difference if I use "hadoop distcp" to upload
files? I assume "hadoop distcp" using multiple trackers to upload files in
parallel.
Thanks,
Rui
----- Original Message ----
Sent: Thursday, December 20, 2007 6:01:50 PM
Subject: Re: DFS Block Allocation
Post by C G
Ted, when you say "copy in the distro" do you need to include the
configuration files from the running grid? You don't need to
actually start
Post by C G
HDFS on this node do you?
You are correct. You only need the config files (and the hadoop script
helps make things easier).
Post by C G
If I'm following this approach correctly, I would want to have an
"xfer
Post by C G
server" whose job it is to essentially run dfs -copyFromLocal on all
inbound-to-HDFS data. Once I'm certain that my data has copied
correctly, I
Post by C G
can delete the local files on the xfer server.
Yes.
Post by C G
This is great news, as my current system wastes a lot of time
copying data
Post by C G
from data acquisition servers to the master node. If I can copy to
HDFS
Post by C G
directly from ny acquisition servers then I am a happy guy....
You are a happy guy.
If your acquisition systems can see all of your datanodes.
______________________________________________________________________________
______
Never miss a thing. Make Yahoo your home page.
http://www.yahoo.com/r/hs
Ted Dunning
2007-12-21 06:50:44 UTC
Permalink
Yeah... We have that as well, but I put strict limits on how many readers
are allowed on any NFS data source. With well organized reads, even a
single machine can cause serious load on an ordinary NFS server. I have had
very bad experiences where lots of maps read from a single source; the worst
was when a bunch of map functions all initialized themselves from a single
mySQL server which promptly went fairly catatonic.
Post by Joydeep Sen Sarma
the nice thing is that if we do ever fall behind (network glitches, filer
overload, whatever) - the code automatically increases the number of readers
to catch up (with certain bounds on number of concurrent readers).
Loading...